Skip to Content
ManagerAdvanced

Advanced

Performance tuning and advanced patterns.

Bulk Operation Tuning

GETBULK Parameters

async with Manager("192.168.1.1") as mgr: # max_repetitions: how many rows per request results = await mgr.get_bulk( "1.3.6.1.2.1.2.2.1.1", max_repetitions=50, # Default: 10 non_repeaters=0, )

Bulk Walk Tuning

# Large tables: bigger batches, fewer round-trips async for oid, value in mgr.bulk_walk("1.3.6.1.2.1.2.2", bulk_size=50): pass # Small tables or slow devices: smaller batches async for oid, value in mgr.bulk_walk("1.3.6.1.2.1.2.2", bulk_size=10): pass

Optimal bulk_size depends on the device. Most support 25-50. Some older devices may only support 10. MTU limits apply (~1400 bytes of OID data per packet).

Concurrent Polling

poll_many

Poll multiple targets concurrently with bounded concurrency:

from snmpkit.manager import PollTarget, PollResult, poll_many targets = [ PollTarget(host="10.0.0.1"), PollTarget(host="10.0.0.2", community="private"), PollTarget(host="10.0.0.3", port=1161, timeout=10.0), ] oids = ["1.3.6.1.2.1.1.1.0", "1.3.6.1.2.1.1.3.0"] async for result in poll_many(targets, oids, concurrency=50): if result.error: print(f"{result.target}: {result.error}") else: print(f"{result.target}: {result.oid} = {result.value}")

PollTarget

@dataclass class PollTarget: host: str port: int = 161 community: str = "public" version: int = 2 # v3 fields user: str | None = None auth_protocol: str | None = None auth_password: str | None = None priv_protocol: str | None = None priv_password: str | None = None timeout: float = 5.0 retries: int = 3

PollResult

@dataclass class PollResult: target: str # "host:port" oid: str # OID that was queried value: Value | None # Value if successful error: str | None # Error message if failed

Errors on one target don’t affect others — each target is isolated.

Manual Concurrent Polling

For more control, use asyncio.gather directly:

async def poll_devices(hosts: list[str], max_concurrent: int = 50): semaphore = asyncio.Semaphore(max_concurrent) async def poll_one(host: str): async with semaphore: async with Manager(host, timeout=2.0) as mgr: return await mgr.get("1.3.6.1.2.1.1.1.0") return await asyncio.gather(*[poll_one(h) for h in hosts], return_exceptions=True)

Logging

Enable Debug Logging

import logging logging.basicConfig(level=logging.DEBUG) logging.getLogger("snmpkit").setLevel(logging.DEBUG)

Loggers used:

  • snmpkit.manager — Manager operations, connection events
  • snmpkit.trap_receiver — TrapReceiver events, Inform ACKs
  • snmpkit.poll — poll_many progress

Log Levels

LevelInformation
DEBUGPDU encode/decode, timing, Inform ACKs
INFOConnection events, TrapReceiver start/stop
WARNINGRetries, recoverable errors
ERRORFailed requests, UDP errors

Async Patterns

Graceful Shutdown

import asyncio import signal async def main(): mgr = Manager("192.168.1.1") await mgr.connect() shutdown_event = asyncio.Event() def signal_handler(): shutdown_event.set() loop = asyncio.get_event_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, signal_handler) try: while not shutdown_event.is_set(): value = await mgr.get("1.3.6.1.2.1.1.3.0") print(f"Uptime: {value}") await asyncio.sleep(60) finally: await mgr.close()

Streaming Results

async def stream_walk(host: str, oid: str): async with Manager(host) as mgr: async for oid, value in mgr.bulk_walk(oid): yield (oid, value) # Use as async generator async for oid, value in stream_walk("192.168.1.1", "1.3.6.1.2.1.2.2"): process(oid, value)

Memory Optimization

Streaming Large Walks

# Bad: loads entire table into memory results = [] async for oid, value in mgr.bulk_walk("1.3.6.1.2.1.4.22"): results.append((oid, value)) # Good: process as you go async for oid, value in mgr.bulk_walk("1.3.6.1.2.1.4.22"): await write_to_database(oid, value)

Thread Safety

Manager instances are not thread-safe. Use one per task:

# Bad: shared manager across threads mgr = Manager("192.168.1.1") # Good: create manager in each task async def worker(host: str): async with Manager(host) as mgr: return await mgr.get(...)

Performance Recommendations

ScenarioConfiguration
Single device, few OIDsDefault settings
Single device, large tablebulk_size=50
Many devices (100-1000)poll_many() with concurrency=50
Many devices (1000+)poll_many() with rate limiting
Slow networkIncrease timeout, reduce bulk_size
Unreliable networkIncrease retries

Next Steps

Last updated on