Examples
Real-world patterns for building SNMP agents.
System Monitoring
Monitor CPU, memory, and disk usage:
system_monitor.py
import snmpkit
from snmpkit.agent import Agent, Updater
try:
import psutil
except ImportError:
raise ImportError("Install psutil: pip install psutil")
class SystemUpdater(Updater):
"""Exposes system metrics via SNMP."""
async def update(self):
# CPU usage (1.0)
self.set_GAUGE32("1.0", int(psutil.cpu_percent()))
# Memory usage (2.x)
mem = psutil.virtual_memory()
self.set_GAUGE32("2.1.0", int(mem.total / 1024 / 1024)) # Total MB
self.set_GAUGE32("2.2.0", int(mem.used / 1024 / 1024)) # Used MB
self.set_GAUGE32("2.3.0", int(mem.percent)) # Percent
# Disk usage (3.x)
disk = psutil.disk_usage("/")
self.set_GAUGE32("3.1.0", int(disk.total / 1024 / 1024 / 1024)) # Total GB
self.set_GAUGE32("3.2.0", int(disk.used / 1024 / 1024 / 1024)) # Used GB
self.set_GAUGE32("3.3.0", int(disk.percent)) # Percent
# Load average (4.x) - Unix only
try:
load1, load5, load15 = psutil.getloadavg()
self.set_GAUGE32("4.1.0", int(load1 * 100)) # 1-min * 100
self.set_GAUGE32("4.2.0", int(load5 * 100)) # 5-min * 100
self.set_GAUGE32("4.3.0", int(load15 * 100)) # 15-min * 100
except AttributeError:
pass # Windows doesn't have load average
async def main():
agent = Agent(agent_id="system-monitor")
agent.register("1.3.6.1.4.1.12345.1", SystemUpdater(), freq=5)
await agent.start()
snmpkit.run(main())Query:
snmpwalk -v2c -c public localhost 1.3.6.1.4.1.12345.1Database Metrics
Expose database statistics:
PostgreSQL
postgres_monitor.py
import snmpkit
from snmpkit.agent import Agent, Updater
import asyncpg
class PostgresUpdater(Updater):
def __init__(self, dsn: str):
super().__init__()
self.dsn = dsn
self.pool = None
async def update(self):
if self.pool is None:
self.pool = await asyncpg.create_pool(self.dsn)
async with self.pool.acquire() as conn:
# Connection count
count = await conn.fetchval(
"SELECT count(*) FROM pg_stat_activity"
)
self.set_GAUGE32("1.0", count)
# Database size (MB)
size = await conn.fetchval(
"SELECT pg_database_size(current_database()) / 1024 / 1024"
)
self.set_GAUGE32("2.0", int(size))
# Transaction stats
stats = await conn.fetchrow(
"SELECT xact_commit, xact_rollback FROM pg_stat_database "
"WHERE datname = current_database()"
)
self.set_COUNTER64("3.0", stats["xact_commit"])
self.set_COUNTER64("4.0", stats["xact_rollback"])
async def main():
agent = Agent(agent_id="postgres-monitor")
agent.register(
"1.3.6.1.4.1.12345.2",
PostgresUpdater("postgresql://localhost/mydb"),
freq=10,
)
await agent.start()
snmpkit.run(main())HTTP API Integration
Expose metrics from an external API:
api_monitor.py
import snmpkit
from snmpkit.agent import Agent, Updater
import httpx
class APIUpdater(Updater):
def __init__(self, base_url: str, api_key: str | None = None):
super().__init__()
self.base_url = base_url
self.headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
self.client = None
async def update(self):
if self.client is None:
self.client = httpx.AsyncClient(headers=self.headers)
try:
resp = await self.client.get(f"{self.base_url}/stats", timeout=5.0)
resp.raise_for_status()
data = resp.json()
self.set_GAUGE32("1.0", data.get("active_users", 0))
self.set_GAUGE32("2.0", data.get("requests_per_minute", 0))
self.set_COUNTER64("3.0", data.get("total_requests", 0))
self.set_GAUGE32("4.0", data.get("error_rate_percent", 0))
except httpx.HTTPError as e:
# Keep previous values on error
pass
async def main():
agent = Agent(agent_id="api-monitor")
agent.register(
"1.3.6.1.4.1.12345.3",
APIUpdater("https://api.example.com", api_key="secret"),
freq=30,
)
await agent.start()
snmpkit.run(main())Table Data
Expose tabular data (e.g., network interfaces, processes):
table_agent.py
import snmpkit
from snmpkit.agent import Agent, Updater
import psutil
class InterfaceTableUpdater(Updater):
"""Exposes network interface statistics as an SNMP table."""
async def update(self):
self.clear()
interfaces = psutil.net_if_stats()
io_counters = psutil.net_io_counters(pernic=True)
for idx, (name, stats) in enumerate(interfaces.items(), start=1):
# ifIndex.idx
self.set_INTEGER(f"1.{idx}", idx)
# ifName.idx
self.set_OCTETSTRING(f"2.{idx}", name)
# ifOperStatus.idx (1=up, 2=down)
self.set_INTEGER(f"3.{idx}", 1 if stats.isup else 2)
# ifSpeed.idx (bits per second)
self.set_GAUGE32(f"4.{idx}", stats.speed * 1_000_000)
# ifInOctets.idx, ifOutOctets.idx
if name in io_counters:
io = io_counters[name]
self.set_COUNTER64(f"5.{idx}", io.bytes_recv)
self.set_COUNTER64(f"6.{idx}", io.bytes_sent)
async def main():
agent = Agent(agent_id="interface-table")
agent.register("1.3.6.1.4.1.12345.4", InterfaceTableUpdater(), freq=5)
await agent.start()
snmpkit.run(main())Query the table:
# Walk entire table
snmpwalk -v2c -c public localhost 1.3.6.1.4.1.12345.4
# Get specific column (interface names)
snmpwalk -v2c -c public localhost 1.3.6.1.4.1.12345.4.2Multi-Tenant with Contexts
Serve different data based on SNMP context:
multitenant_agent.py
import snmpkit
from snmpkit.agent import Agent, Updater
class TenantUpdater(Updater):
def __init__(self, tenant_id: str, data: dict):
super().__init__()
self.tenant_id = tenant_id
self.data = data
async def update(self):
self.set_OCTETSTRING("1.0", self.tenant_id)
self.set_INTEGER("2.0", self.data.get("user_count", 0))
self.set_GAUGE32("3.0", self.data.get("storage_mb", 0))
async def main():
agent = Agent(agent_id="multitenant")
# Same OID, different contexts
tenants = {
"acme": {"user_count": 150, "storage_mb": 5000},
"globex": {"user_count": 75, "storage_mb": 2500},
"initech": {"user_count": 300, "storage_mb": 10000},
}
for tenant_id, data in tenants.items():
agent.register(
"1.3.6.1.4.1.12345.5",
TenantUpdater(tenant_id, data),
context=tenant_id,
)
await agent.start()
snmpkit.run(main())Query with context:
# Query specific tenant
snmpwalk -v2c -c public -n acme localhost 1.3.6.1.4.1.12345.5
snmpwalk -v2c -c public -n globex localhost 1.3.6.1.4.1.12345.5Writable Configuration
Allow SNMP SET to modify configuration:
config_agent.py
import snmpkit
from snmpkit.agent import Agent, Updater, SetHandler
# Configuration store
config = {
"hostname": "myserver",
"log_level": 3, # 1=error, 2=warn, 3=info, 4=debug
"max_connections": 100,
}
class ConfigUpdater(Updater):
async def update(self):
self.set_OCTETSTRING("1.0", config["hostname"])
self.set_INTEGER("2.0", config["log_level"])
self.set_INTEGER("3.0", config["max_connections"])
class ConfigSetHandler(SetHandler):
async def test(self, oid, value):
if oid == "1.0": # hostname
if len(str(value)) > 64:
raise ValueError("Hostname too long (max 64)")
elif oid == "2.0": # log_level
if int(value) not in (1, 2, 3, 4):
raise ValueError("Log level must be 1-4")
elif oid == "3.0": # max_connections
if not (1 <= int(value) <= 10000):
raise ValueError("Max connections must be 1-10000")
async def commit(self, oid, value):
if oid == "1.0":
config["hostname"] = str(value)
elif oid == "2.0":
config["log_level"] = int(value)
elif oid == "3.0":
config["max_connections"] = int(value)
print(f"Config updated: {config}")
async def main():
agent = Agent(agent_id="config-agent")
agent.register("1.3.6.1.4.1.12345.6", ConfigUpdater())
agent.register_set("1.3.6.1.4.1.12345.6", ConfigSetHandler())
await agent.start()
snmpkit.run(main())Set values:
# Change hostname
snmpset -v2c -c private localhost 1.3.6.1.4.1.12345.6.1.0 s "newhost"
# Change log level
snmpset -v2c -c private localhost 1.3.6.1.4.1.12345.6.2.0 i 4Alert Thresholds with Traps
Send traps when thresholds are exceeded:
alert_agent.py
import snmpkit
from snmpkit.agent import Agent, Updater
from snmpkit.core import Oid, Value, VarBind
import psutil
class AlertUpdater(Updater):
def __init__(self, cpu_threshold: int = 90, mem_threshold: int = 90):
super().__init__()
self.cpu_threshold = cpu_threshold
self.mem_threshold = mem_threshold
self._cpu_alert_sent = False
self._mem_alert_sent = False
async def update(self):
cpu = int(psutil.cpu_percent())
mem = int(psutil.virtual_memory().percent)
self.set_GAUGE32("1.0", cpu)
self.set_GAUGE32("2.0", mem)
# CPU alert
if cpu > self.cpu_threshold and not self._cpu_alert_sent:
await self.send_trap(
"1.3.6.1.4.1.12345.0.1", # cpuHighTrap
VarBind(Oid("1.3.6.1.4.1.12345.7.1.0"), Value.Gauge32(cpu)),
)
self._cpu_alert_sent = True
elif cpu <= self.cpu_threshold:
self._cpu_alert_sent = False
# Memory alert
if mem > self.mem_threshold and not self._mem_alert_sent:
await self.send_trap(
"1.3.6.1.4.1.12345.0.2", # memHighTrap
VarBind(Oid("1.3.6.1.4.1.12345.7.2.0"), Value.Gauge32(mem)),
)
self._mem_alert_sent = True
elif mem <= self.mem_threshold:
self._mem_alert_sent = False
async def main():
agent = Agent(agent_id="alert-agent")
agent.register(
"1.3.6.1.4.1.12345.7",
AlertUpdater(cpu_threshold=80, mem_threshold=85),
freq=5,
)
await agent.start()
snmpkit.run(main())Next Steps
- Performance - Benchmark details
- Advanced - Configuration options
Last updated on