Skip to Content
ManagerExamples

Examples

Real-world patterns for SNMP network management.

Device Discovery

Scan a subnet to find SNMP-enabled devices:

discovery.py
import asyncio from snmpkit.manager import Manager from snmpkit.manager.exceptions import TimeoutError async def discover_device(ip: str) -> dict | None: try: async with Manager(ip, timeout=1.0, retries=1) as mgr: descr = await mgr.get("1.3.6.1.2.1.1.1.0") name = await mgr.get("1.3.6.1.2.1.1.5.0") return {"ip": ip, "name": str(name), "description": str(descr)} except TimeoutError: return None async def scan_subnet(network: str, start: int = 1, end: int = 254): base = ".".join(network.split(".")[:3]) tasks = [discover_device(f"{base}.{i}") for i in range(start, end + 1)] results = await asyncio.gather(*tasks) return [r for r in results if r is not None] async def main(): devices = await scan_subnet("192.168.1.0") for device in devices: print(f"{device['ip']}: {device['name']} - {device['description'][:50]}") asyncio.run(main())

Use short timeouts and minimal retries for discovery to avoid long waits on unresponsive IPs.

Interface Monitoring

Monitor interface traffic on a network device:

interface_monitor.py
import asyncio from dataclasses import dataclass from snmpkit.manager import Manager IF_MIB = "1.3.6.1.2.1.2.2.1" @dataclass class Interface: index: int name: str status: int in_octets: int out_octets: int async def get_interfaces(mgr: Manager) -> list[Interface]: interfaces = [] async for oid, value in mgr.bulk_walk(f"{IF_MIB}.2"): idx = int(oid.split(".")[-1]) interfaces.append(Interface( index=idx, name=str(value), status=0, in_octets=0, out_octets=0, )) for iface in interfaces: iface.status = int(await mgr.get(f"{IF_MIB}.8.{iface.index}")) iface.in_octets = int(await mgr.get(f"{IF_MIB}.10.{iface.index}")) iface.out_octets = int(await mgr.get(f"{IF_MIB}.16.{iface.index}")) return interfaces async def main(): async with Manager("192.168.1.1") as mgr: interfaces = await get_interfaces(mgr) for iface in interfaces: status = "up" if iface.status == 1 else "down" print(f"{iface.name}: {status}, in={iface.in_octets}, out={iface.out_octets}") asyncio.run(main())

Polling Multiple Devices

Poll system metrics from multiple devices concurrently:

multi_poll.py
import asyncio from dataclasses import dataclass from snmpkit.manager import Manager from snmpkit.manager.exceptions import TimeoutError SYSTEM_OIDS = { "name": "1.3.6.1.2.1.1.5.0", "uptime": "1.3.6.1.2.1.1.3.0", "contact": "1.3.6.1.2.1.1.4.0", "location": "1.3.6.1.2.1.1.6.0", } @dataclass class DeviceStatus: host: str name: str | None = None uptime: int | None = None contact: str | None = None location: str | None = None error: str | None = None async def poll_device(host: str, community: str = "public") -> DeviceStatus: status = DeviceStatus(host=host) try: async with Manager(host, community=community, timeout=3.0) as mgr: values = await mgr.get_many(*SYSTEM_OIDS.values()) status.name = str(values[0]) status.uptime = int(values[1]) status.contact = str(values[2]) status.location = str(values[3]) except TimeoutError: status.error = "timeout" except Exception as e: status.error = str(e) return status async def poll_all(hosts: list[str], community: str = "public"): tasks = [poll_device(host, community) for host in hosts] return await asyncio.gather(*tasks) async def main(): hosts = ["192.168.1.1", "192.168.1.2", "192.168.1.10", "192.168.1.254"] results = await poll_all(hosts) for status in results: if status.error: print(f"{status.host}: ERROR - {status.error}") else: uptime_days = status.uptime // (100 * 60 * 60 * 24) print(f"{status.host}: {status.name} (up {uptime_days} days)") asyncio.run(main())

Rate-Limited Polling

Control concurrency with semaphores to avoid overwhelming the network:

rate_limited.py
import asyncio from snmpkit.manager import Manager async def poll_with_limit( hosts: list[str], oid: str, max_concurrent: int = 50, ): semaphore = asyncio.Semaphore(max_concurrent) async def poll_one(host: str): async with semaphore: try: async with Manager(host, timeout=2.0, retries=1) as mgr: value = await mgr.get(oid) return (host, value, None) except Exception as e: return (host, None, str(e)) tasks = [poll_one(host) for host in hosts] return await asyncio.gather(*tasks) async def main(): hosts = [f"192.168.1.{i}" for i in range(1, 255)] results = await poll_with_limit(hosts, "1.3.6.1.2.1.1.1.0", max_concurrent=25) success = sum(1 for _, v, e in results if v is not None) print(f"Discovered {success} devices out of {len(hosts)} scanned") asyncio.run(main())

IF-MIB Table Query

Efficiently query the full interface table:

if_table.py
import asyncio from snmpkit.manager import Manager IF_COLUMNS = { "ifIndex": "1", "ifDescr": "2", "ifType": "3", "ifMtu": "4", "ifSpeed": "5", "ifPhysAddress": "6", "ifAdminStatus": "7", "ifOperStatus": "8", "ifInOctets": "10", "ifInErrors": "14", "ifOutOctets": "16", "ifOutErrors": "20", } async def get_if_table(mgr: Manager) -> list[dict]: table = {} base = "1.3.6.1.2.1.2.2.1" for col_name, col_id in IF_COLUMNS.items(): async for oid, value in mgr.bulk_walk(f"{base}.{col_id}", bulk_size=25): idx = oid.split(".")[-1] if idx not in table: table[idx] = {"ifIndex": int(idx)} table[idx][col_name] = value return list(table.values()) async def main(): async with Manager("192.168.1.1") as mgr: interfaces = await get_if_table(mgr) for iface in interfaces: name = iface.get("ifDescr", "unknown") status = "up" if iface.get("ifOperStatus") == 1 else "down" speed = int(iface.get("ifSpeed", 0)) // 1_000_000 in_mb = int(iface.get("ifInOctets", 0)) / 1024 / 1024 out_mb = int(iface.get("ifOutOctets", 0)) / 1024 / 1024 print(f"{name}: {status} @ {speed}Mbps (in={in_mb:.1f}MB, out={out_mb:.1f}MB)") asyncio.run(main())

Traffic Delta Monitoring

Calculate traffic rates over time:

traffic_delta.py
import asyncio from dataclasses import dataclass from snmpkit.manager import Manager @dataclass class TrafficSample: timestamp: float in_octets: int out_octets: int async def monitor_interface(host: str, if_index: int, interval: float = 5.0): base = f"1.3.6.1.2.1.2.2.1" in_oid = f"{base}.10.{if_index}" out_oid = f"{base}.16.{if_index}" prev: TrafficSample | None = None async with Manager(host) as mgr: while True: now = asyncio.get_event_loop().time() values = await mgr.get_many(in_oid, out_oid) current = TrafficSample(now, int(values[0]), int(values[1])) if prev is not None: dt = current.timestamp - prev.timestamp in_rate = (current.in_octets - prev.in_octets) * 8 / dt out_rate = (current.out_octets - prev.out_octets) * 8 / dt print(f"In: {in_rate/1e6:.2f} Mbps, Out: {out_rate/1e6:.2f} Mbps") prev = current await asyncio.sleep(interval) asyncio.run(monitor_interface("192.168.1.1", if_index=2))

Counter values can wrap. For production use, handle 32-bit counter overflow (every ~34GB at 10Gbps) or use 64-bit counters (ifHCInOctets).

SNMP Trap Receiver

trap_receiver.py
import asyncio from snmpkit.manager import TrapReceiver async def handle_trap(trap): print(f"Trap from {trap.source}:") print(f" Enterprise: {trap.enterprise}") print(f" Uptime: {trap.uptime}") for varbind in trap.varbinds: print(f" {varbind.oid} = {varbind.value}") async def main(): receiver = TrapReceiver(port=162) receiver.on_trap(handle_trap) print("Listening for traps on port 162...") await receiver.start() asyncio.run(main())

Error Recovery Pattern

Robust polling with error handling and backoff:

robust_poll.py
import asyncio from snmpkit.manager import Manager from snmpkit.manager.exceptions import TimeoutError, SnmpError async def poll_with_recovery( host: str, oids: list[str], interval: float = 60.0, max_failures: int = 3, ): failures = 0 backoff = 1.0 while True: try: async with Manager(host, timeout=5.0, retries=2) as mgr: values = await mgr.get_many(*oids) failures = 0 backoff = 1.0 yield {oid: value for oid, value in zip(oids, values)} except TimeoutError: failures += 1 if failures >= max_failures: yield {"error": "device unreachable", "host": host} backoff = min(backoff * 2, 300.0) await asyncio.sleep(backoff) continue except SnmpError as e: yield {"error": str(e), "host": host} await asyncio.sleep(interval) async def main(): oids = ["1.3.6.1.2.1.1.3.0", "1.3.6.1.2.1.1.5.0"] async for result in poll_with_recovery("192.168.1.1", oids, interval=30.0): if "error" in result: print(f"Error: {result['error']}") else: print(f"Poll successful: {result}") asyncio.run(main())

Metrics to Prometheus

Export SNMP metrics for Prometheus scraping:

prometheus_exporter.py
import asyncio from prometheus_client import start_http_server, Gauge from snmpkit.manager import Manager if_in_octets = Gauge("snmp_if_in_octets", "Interface input octets", ["host", "interface"]) if_out_octets = Gauge("snmp_if_out_octets", "Interface output octets", ["host", "interface"]) if_status = Gauge("snmp_if_status", "Interface operational status", ["host", "interface"]) async def collect_metrics(host: str, interval: float = 30.0): base = "1.3.6.1.2.1.2.2.1" async with Manager(host) as mgr: while True: names = {} async for oid, value in mgr.bulk_walk(f"{base}.2"): idx = oid.split(".")[-1] names[idx] = str(value) for idx, name in names.items(): status = int(await mgr.get(f"{base}.8.{idx}")) in_oct = int(await mgr.get(f"{base}.10.{idx}")) out_oct = int(await mgr.get(f"{base}.16.{idx}")) if_status.labels(host=host, interface=name).set(status) if_in_octets.labels(host=host, interface=name).set(in_oct) if_out_octets.labels(host=host, interface=name).set(out_oct) await asyncio.sleep(interval) async def main(): start_http_server(9116) print("Prometheus metrics on :9116/metrics") hosts = ["192.168.1.1", "192.168.1.2"] tasks = [collect_metrics(host) for host in hosts] await asyncio.gather(*tasks) asyncio.run(main())

Next Steps

Last updated on