Examples
Real-world patterns for SNMP network management.
Device Discovery
Scan a subnet to find SNMP-enabled devices:
discovery.py
import asyncio
from snmpkit.manager import Manager
from snmpkit.manager.exceptions import TimeoutError
async def discover_device(ip: str) -> dict | None:
try:
async with Manager(ip, timeout=1.0, retries=1) as mgr:
descr = await mgr.get("1.3.6.1.2.1.1.1.0")
name = await mgr.get("1.3.6.1.2.1.1.5.0")
return {"ip": ip, "name": str(name), "description": str(descr)}
except TimeoutError:
return None
async def scan_subnet(network: str, start: int = 1, end: int = 254):
base = ".".join(network.split(".")[:3])
tasks = [discover_device(f"{base}.{i}") for i in range(start, end + 1)]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]
async def main():
devices = await scan_subnet("192.168.1.0")
for device in devices:
print(f"{device['ip']}: {device['name']} - {device['description'][:50]}")
asyncio.run(main())Use short timeouts and minimal retries for discovery to avoid long waits on unresponsive IPs.
Interface Monitoring
Monitor interface traffic on a network device:
interface_monitor.py
import asyncio
from dataclasses import dataclass
from snmpkit.manager import Manager
IF_MIB = "1.3.6.1.2.1.2.2.1"
@dataclass
class Interface:
index: int
name: str
status: int
in_octets: int
out_octets: int
async def get_interfaces(mgr: Manager) -> list[Interface]:
interfaces = []
async for oid, value in mgr.bulk_walk(f"{IF_MIB}.2"):
idx = int(oid.split(".")[-1])
interfaces.append(Interface(
index=idx,
name=str(value),
status=0,
in_octets=0,
out_octets=0,
))
for iface in interfaces:
iface.status = int(await mgr.get(f"{IF_MIB}.8.{iface.index}"))
iface.in_octets = int(await mgr.get(f"{IF_MIB}.10.{iface.index}"))
iface.out_octets = int(await mgr.get(f"{IF_MIB}.16.{iface.index}"))
return interfaces
async def main():
async with Manager("192.168.1.1") as mgr:
interfaces = await get_interfaces(mgr)
for iface in interfaces:
status = "up" if iface.status == 1 else "down"
print(f"{iface.name}: {status}, in={iface.in_octets}, out={iface.out_octets}")
asyncio.run(main())Polling Multiple Devices
Poll system metrics from multiple devices concurrently:
multi_poll.py
import asyncio
from dataclasses import dataclass
from snmpkit.manager import Manager
from snmpkit.manager.exceptions import TimeoutError
SYSTEM_OIDS = {
"name": "1.3.6.1.2.1.1.5.0",
"uptime": "1.3.6.1.2.1.1.3.0",
"contact": "1.3.6.1.2.1.1.4.0",
"location": "1.3.6.1.2.1.1.6.0",
}
@dataclass
class DeviceStatus:
host: str
name: str | None = None
uptime: int | None = None
contact: str | None = None
location: str | None = None
error: str | None = None
async def poll_device(host: str, community: str = "public") -> DeviceStatus:
status = DeviceStatus(host=host)
try:
async with Manager(host, community=community, timeout=3.0) as mgr:
values = await mgr.get_many(*SYSTEM_OIDS.values())
status.name = str(values[0])
status.uptime = int(values[1])
status.contact = str(values[2])
status.location = str(values[3])
except TimeoutError:
status.error = "timeout"
except Exception as e:
status.error = str(e)
return status
async def poll_all(hosts: list[str], community: str = "public"):
tasks = [poll_device(host, community) for host in hosts]
return await asyncio.gather(*tasks)
async def main():
hosts = ["192.168.1.1", "192.168.1.2", "192.168.1.10", "192.168.1.254"]
results = await poll_all(hosts)
for status in results:
if status.error:
print(f"{status.host}: ERROR - {status.error}")
else:
uptime_days = status.uptime // (100 * 60 * 60 * 24)
print(f"{status.host}: {status.name} (up {uptime_days} days)")
asyncio.run(main())Rate-Limited Polling
Control concurrency with semaphores to avoid overwhelming the network:
rate_limited.py
import asyncio
from snmpkit.manager import Manager
async def poll_with_limit(
hosts: list[str],
oid: str,
max_concurrent: int = 50,
):
semaphore = asyncio.Semaphore(max_concurrent)
async def poll_one(host: str):
async with semaphore:
try:
async with Manager(host, timeout=2.0, retries=1) as mgr:
value = await mgr.get(oid)
return (host, value, None)
except Exception as e:
return (host, None, str(e))
tasks = [poll_one(host) for host in hosts]
return await asyncio.gather(*tasks)
async def main():
hosts = [f"192.168.1.{i}" for i in range(1, 255)]
results = await poll_with_limit(hosts, "1.3.6.1.2.1.1.1.0", max_concurrent=25)
success = sum(1 for _, v, e in results if v is not None)
print(f"Discovered {success} devices out of {len(hosts)} scanned")
asyncio.run(main())IF-MIB Table Query
Efficiently query the full interface table:
if_table.py
import asyncio
from snmpkit.manager import Manager
IF_COLUMNS = {
"ifIndex": "1",
"ifDescr": "2",
"ifType": "3",
"ifMtu": "4",
"ifSpeed": "5",
"ifPhysAddress": "6",
"ifAdminStatus": "7",
"ifOperStatus": "8",
"ifInOctets": "10",
"ifInErrors": "14",
"ifOutOctets": "16",
"ifOutErrors": "20",
}
async def get_if_table(mgr: Manager) -> list[dict]:
table = {}
base = "1.3.6.1.2.1.2.2.1"
for col_name, col_id in IF_COLUMNS.items():
async for oid, value in mgr.bulk_walk(f"{base}.{col_id}", bulk_size=25):
idx = oid.split(".")[-1]
if idx not in table:
table[idx] = {"ifIndex": int(idx)}
table[idx][col_name] = value
return list(table.values())
async def main():
async with Manager("192.168.1.1") as mgr:
interfaces = await get_if_table(mgr)
for iface in interfaces:
name = iface.get("ifDescr", "unknown")
status = "up" if iface.get("ifOperStatus") == 1 else "down"
speed = int(iface.get("ifSpeed", 0)) // 1_000_000
in_mb = int(iface.get("ifInOctets", 0)) / 1024 / 1024
out_mb = int(iface.get("ifOutOctets", 0)) / 1024 / 1024
print(f"{name}: {status} @ {speed}Mbps (in={in_mb:.1f}MB, out={out_mb:.1f}MB)")
asyncio.run(main())Traffic Delta Monitoring
Calculate traffic rates over time:
traffic_delta.py
import asyncio
from dataclasses import dataclass
from snmpkit.manager import Manager
@dataclass
class TrafficSample:
timestamp: float
in_octets: int
out_octets: int
async def monitor_interface(host: str, if_index: int, interval: float = 5.0):
base = f"1.3.6.1.2.1.2.2.1"
in_oid = f"{base}.10.{if_index}"
out_oid = f"{base}.16.{if_index}"
prev: TrafficSample | None = None
async with Manager(host) as mgr:
while True:
now = asyncio.get_event_loop().time()
values = await mgr.get_many(in_oid, out_oid)
current = TrafficSample(now, int(values[0]), int(values[1]))
if prev is not None:
dt = current.timestamp - prev.timestamp
in_rate = (current.in_octets - prev.in_octets) * 8 / dt
out_rate = (current.out_octets - prev.out_octets) * 8 / dt
print(f"In: {in_rate/1e6:.2f} Mbps, Out: {out_rate/1e6:.2f} Mbps")
prev = current
await asyncio.sleep(interval)
asyncio.run(monitor_interface("192.168.1.1", if_index=2))Counter values can wrap. For production use, handle 32-bit counter overflow (every ~34GB at 10Gbps) or use 64-bit counters (ifHCInOctets).
SNMP Trap Receiver
Basic
trap_receiver.py
import asyncio
from snmpkit.manager import TrapReceiver
async def handle_trap(trap):
print(f"Trap from {trap.source}:")
print(f" Enterprise: {trap.enterprise}")
print(f" Uptime: {trap.uptime}")
for varbind in trap.varbinds:
print(f" {varbind.oid} = {varbind.value}")
async def main():
receiver = TrapReceiver(port=162)
receiver.on_trap(handle_trap)
print("Listening for traps on port 162...")
await receiver.start()
asyncio.run(main())Error Recovery Pattern
Robust polling with error handling and backoff:
robust_poll.py
import asyncio
from snmpkit.manager import Manager
from snmpkit.manager.exceptions import TimeoutError, SnmpError
async def poll_with_recovery(
host: str,
oids: list[str],
interval: float = 60.0,
max_failures: int = 3,
):
failures = 0
backoff = 1.0
while True:
try:
async with Manager(host, timeout=5.0, retries=2) as mgr:
values = await mgr.get_many(*oids)
failures = 0
backoff = 1.0
yield {oid: value for oid, value in zip(oids, values)}
except TimeoutError:
failures += 1
if failures >= max_failures:
yield {"error": "device unreachable", "host": host}
backoff = min(backoff * 2, 300.0)
await asyncio.sleep(backoff)
continue
except SnmpError as e:
yield {"error": str(e), "host": host}
await asyncio.sleep(interval)
async def main():
oids = ["1.3.6.1.2.1.1.3.0", "1.3.6.1.2.1.1.5.0"]
async for result in poll_with_recovery("192.168.1.1", oids, interval=30.0):
if "error" in result:
print(f"Error: {result['error']}")
else:
print(f"Poll successful: {result}")
asyncio.run(main())Metrics to Prometheus
Export SNMP metrics for Prometheus scraping:
prometheus_exporter.py
import asyncio
from prometheus_client import start_http_server, Gauge
from snmpkit.manager import Manager
if_in_octets = Gauge("snmp_if_in_octets", "Interface input octets", ["host", "interface"])
if_out_octets = Gauge("snmp_if_out_octets", "Interface output octets", ["host", "interface"])
if_status = Gauge("snmp_if_status", "Interface operational status", ["host", "interface"])
async def collect_metrics(host: str, interval: float = 30.0):
base = "1.3.6.1.2.1.2.2.1"
async with Manager(host) as mgr:
while True:
names = {}
async for oid, value in mgr.bulk_walk(f"{base}.2"):
idx = oid.split(".")[-1]
names[idx] = str(value)
for idx, name in names.items():
status = int(await mgr.get(f"{base}.8.{idx}"))
in_oct = int(await mgr.get(f"{base}.10.{idx}"))
out_oct = int(await mgr.get(f"{base}.16.{idx}"))
if_status.labels(host=host, interface=name).set(status)
if_in_octets.labels(host=host, interface=name).set(in_oct)
if_out_octets.labels(host=host, interface=name).set(out_oct)
await asyncio.sleep(interval)
async def main():
start_http_server(9116)
print("Prometheus metrics on :9116/metrics")
hosts = ["192.168.1.1", "192.168.1.2"]
tasks = [collect_metrics(host) for host in hosts]
await asyncio.gather(*tasks)
asyncio.run(main())Next Steps
- Operations — Full API reference
- Performance — Benchmark details
Last updated on