Skip to Content
AgentExamples

Examples

Real-world patterns for building SNMP agents.

System Monitoring

Monitor CPU, memory, and disk usage:

system_monitor.py
import snmpkit from snmpkit.agent import Agent, Updater try: import psutil except ImportError: raise ImportError("Install psutil: pip install psutil") class SystemUpdater(Updater): """Exposes system metrics via SNMP.""" async def update(self): # CPU usage (1.0) self.set_GAUGE32("1.0", int(psutil.cpu_percent())) # Memory usage (2.x) mem = psutil.virtual_memory() self.set_GAUGE32("2.1.0", int(mem.total / 1024 / 1024)) # Total MB self.set_GAUGE32("2.2.0", int(mem.used / 1024 / 1024)) # Used MB self.set_GAUGE32("2.3.0", int(mem.percent)) # Percent # Disk usage (3.x) disk = psutil.disk_usage("/") self.set_GAUGE32("3.1.0", int(disk.total / 1024 / 1024 / 1024)) # Total GB self.set_GAUGE32("3.2.0", int(disk.used / 1024 / 1024 / 1024)) # Used GB self.set_GAUGE32("3.3.0", int(disk.percent)) # Percent # Load average (4.x) - Unix only try: load1, load5, load15 = psutil.getloadavg() self.set_GAUGE32("4.1.0", int(load1 * 100)) # 1-min * 100 self.set_GAUGE32("4.2.0", int(load5 * 100)) # 5-min * 100 self.set_GAUGE32("4.3.0", int(load15 * 100)) # 15-min * 100 except AttributeError: pass # Windows doesn't have load average async def main(): agent = Agent(agent_id="system-monitor") agent.register("1.3.6.1.4.1.12345.1", SystemUpdater(), freq=5) await agent.start() snmpkit.run(main())

Query:

snmpwalk -v2c -c public localhost 1.3.6.1.4.1.12345.1

Database Metrics

Expose database statistics:

postgres_monitor.py
import snmpkit from snmpkit.agent import Agent, Updater import asyncpg class PostgresUpdater(Updater): def __init__(self, dsn: str): super().__init__() self.dsn = dsn self.pool = None async def update(self): if self.pool is None: self.pool = await asyncpg.create_pool(self.dsn) async with self.pool.acquire() as conn: # Connection count count = await conn.fetchval( "SELECT count(*) FROM pg_stat_activity" ) self.set_GAUGE32("1.0", count) # Database size (MB) size = await conn.fetchval( "SELECT pg_database_size(current_database()) / 1024 / 1024" ) self.set_GAUGE32("2.0", int(size)) # Transaction stats stats = await conn.fetchrow( "SELECT xact_commit, xact_rollback FROM pg_stat_database " "WHERE datname = current_database()" ) self.set_COUNTER64("3.0", stats["xact_commit"]) self.set_COUNTER64("4.0", stats["xact_rollback"]) async def main(): agent = Agent(agent_id="postgres-monitor") agent.register( "1.3.6.1.4.1.12345.2", PostgresUpdater("postgresql://localhost/mydb"), freq=10, ) await agent.start() snmpkit.run(main())

HTTP API Integration

Expose metrics from an external API:

api_monitor.py
import snmpkit from snmpkit.agent import Agent, Updater import httpx class APIUpdater(Updater): def __init__(self, base_url: str, api_key: str | None = None): super().__init__() self.base_url = base_url self.headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} self.client = None async def update(self): if self.client is None: self.client = httpx.AsyncClient(headers=self.headers) try: resp = await self.client.get(f"{self.base_url}/stats", timeout=5.0) resp.raise_for_status() data = resp.json() self.set_GAUGE32("1.0", data.get("active_users", 0)) self.set_GAUGE32("2.0", data.get("requests_per_minute", 0)) self.set_COUNTER64("3.0", data.get("total_requests", 0)) self.set_GAUGE32("4.0", data.get("error_rate_percent", 0)) except httpx.HTTPError as e: # Keep previous values on error pass async def main(): agent = Agent(agent_id="api-monitor") agent.register( "1.3.6.1.4.1.12345.3", APIUpdater("https://api.example.com", api_key="secret"), freq=30, ) await agent.start() snmpkit.run(main())

Table Data

Expose tabular data (e.g., network interfaces, processes):

table_agent.py
import snmpkit from snmpkit.agent import Agent, Updater import psutil class InterfaceTableUpdater(Updater): """Exposes network interface statistics as an SNMP table.""" async def update(self): self.clear() interfaces = psutil.net_if_stats() io_counters = psutil.net_io_counters(pernic=True) for idx, (name, stats) in enumerate(interfaces.items(), start=1): # ifIndex.idx self.set_INTEGER(f"1.{idx}", idx) # ifName.idx self.set_OCTETSTRING(f"2.{idx}", name) # ifOperStatus.idx (1=up, 2=down) self.set_INTEGER(f"3.{idx}", 1 if stats.isup else 2) # ifSpeed.idx (bits per second) self.set_GAUGE32(f"4.{idx}", stats.speed * 1_000_000) # ifInOctets.idx, ifOutOctets.idx if name in io_counters: io = io_counters[name] self.set_COUNTER64(f"5.{idx}", io.bytes_recv) self.set_COUNTER64(f"6.{idx}", io.bytes_sent) async def main(): agent = Agent(agent_id="interface-table") agent.register("1.3.6.1.4.1.12345.4", InterfaceTableUpdater(), freq=5) await agent.start() snmpkit.run(main())

Query the table:

# Walk entire table snmpwalk -v2c -c public localhost 1.3.6.1.4.1.12345.4 # Get specific column (interface names) snmpwalk -v2c -c public localhost 1.3.6.1.4.1.12345.4.2

Multi-Tenant with Contexts

Serve different data based on SNMP context:

multitenant_agent.py
import snmpkit from snmpkit.agent import Agent, Updater class TenantUpdater(Updater): def __init__(self, tenant_id: str, data: dict): super().__init__() self.tenant_id = tenant_id self.data = data async def update(self): self.set_OCTETSTRING("1.0", self.tenant_id) self.set_INTEGER("2.0", self.data.get("user_count", 0)) self.set_GAUGE32("3.0", self.data.get("storage_mb", 0)) async def main(): agent = Agent(agent_id="multitenant") # Same OID, different contexts tenants = { "acme": {"user_count": 150, "storage_mb": 5000}, "globex": {"user_count": 75, "storage_mb": 2500}, "initech": {"user_count": 300, "storage_mb": 10000}, } for tenant_id, data in tenants.items(): agent.register( "1.3.6.1.4.1.12345.5", TenantUpdater(tenant_id, data), context=tenant_id, ) await agent.start() snmpkit.run(main())

Query with context:

# Query specific tenant snmpwalk -v2c -c public -n acme localhost 1.3.6.1.4.1.12345.5 snmpwalk -v2c -c public -n globex localhost 1.3.6.1.4.1.12345.5

Writable Configuration

Allow SNMP SET to modify configuration:

config_agent.py
import snmpkit from snmpkit.agent import Agent, Updater, SetHandler # Configuration store config = { "hostname": "myserver", "log_level": 3, # 1=error, 2=warn, 3=info, 4=debug "max_connections": 100, } class ConfigUpdater(Updater): async def update(self): self.set_OCTETSTRING("1.0", config["hostname"]) self.set_INTEGER("2.0", config["log_level"]) self.set_INTEGER("3.0", config["max_connections"]) class ConfigSetHandler(SetHandler): async def test(self, oid, value): if oid == "1.0": # hostname if len(str(value)) > 64: raise ValueError("Hostname too long (max 64)") elif oid == "2.0": # log_level if int(value) not in (1, 2, 3, 4): raise ValueError("Log level must be 1-4") elif oid == "3.0": # max_connections if not (1 <= int(value) <= 10000): raise ValueError("Max connections must be 1-10000") async def commit(self, oid, value): if oid == "1.0": config["hostname"] = str(value) elif oid == "2.0": config["log_level"] = int(value) elif oid == "3.0": config["max_connections"] = int(value) print(f"Config updated: {config}") async def main(): agent = Agent(agent_id="config-agent") agent.register("1.3.6.1.4.1.12345.6", ConfigUpdater()) agent.register_set("1.3.6.1.4.1.12345.6", ConfigSetHandler()) await agent.start() snmpkit.run(main())

Set values:

# Change hostname snmpset -v2c -c private localhost 1.3.6.1.4.1.12345.6.1.0 s "newhost" # Change log level snmpset -v2c -c private localhost 1.3.6.1.4.1.12345.6.2.0 i 4

Alert Thresholds with Traps

Send traps when thresholds are exceeded:

alert_agent.py
import snmpkit from snmpkit.agent import Agent, Updater from snmpkit.core import Oid, Value, VarBind import psutil class AlertUpdater(Updater): def __init__(self, cpu_threshold: int = 90, mem_threshold: int = 90): super().__init__() self.cpu_threshold = cpu_threshold self.mem_threshold = mem_threshold self._cpu_alert_sent = False self._mem_alert_sent = False async def update(self): cpu = int(psutil.cpu_percent()) mem = int(psutil.virtual_memory().percent) self.set_GAUGE32("1.0", cpu) self.set_GAUGE32("2.0", mem) # CPU alert if cpu > self.cpu_threshold and not self._cpu_alert_sent: await self.send_trap( "1.3.6.1.4.1.12345.0.1", # cpuHighTrap VarBind(Oid("1.3.6.1.4.1.12345.7.1.0"), Value.Gauge32(cpu)), ) self._cpu_alert_sent = True elif cpu <= self.cpu_threshold: self._cpu_alert_sent = False # Memory alert if mem > self.mem_threshold and not self._mem_alert_sent: await self.send_trap( "1.3.6.1.4.1.12345.0.2", # memHighTrap VarBind(Oid("1.3.6.1.4.1.12345.7.2.0"), Value.Gauge32(mem)), ) self._mem_alert_sent = True elif mem <= self.mem_threshold: self._mem_alert_sent = False async def main(): agent = Agent(agent_id="alert-agent") agent.register( "1.3.6.1.4.1.12345.7", AlertUpdater(cpu_threshold=80, mem_threshold=85), freq=5, ) await agent.start() snmpkit.run(main())

Next Steps

Last updated on