Skip to Content
AgentAdvanced

Advanced

Performance tuning and advanced configuration options.

Agent Configuration

from snmpkit.agent import Agent agent = Agent( agent_id="my-agent", # Identifies your agent to snmpd socket_path="/var/agentx/master", # AgentX socket path timeout=5, # Connection timeout in seconds parallel_encoding=False, # Enable Rust parallel encoding worker_threads=0, # Python thread pool size queue_size=0, # Request queue size (0 = unbounded) )

Parallel Encoding

For agents with many OIDs (1000+), enable Rust’s parallel encoding using rayon:

agent = Agent(parallel_encoding=True)

This parallelizes PDU encoding across CPU cores. Benchmarks show 2-4x improvement for large response payloads.

Parallel encoding has overhead for small responses. Only enable it if you’re serving 1000+ OIDs or have large table responses.

Request Queue

The queue_size parameter controls the internal request queue:

agent = Agent(queue_size=1000) # Limit to 1000 pending requests
ValueBehavior
0 (default)Unbounded queue - never drops requests
> 0Fixed size - rejects new requests when full

Use a bounded queue to prevent memory exhaustion under extreme load. When the queue is full, new requests receive an error response.

For most use cases, the default unbounded queue is fine. Only set a limit if you’re concerned about memory under sustained overload.

Worker Threads

For CPU-bound update callbacks, use a thread pool:

agent = Agent(worker_threads=4)

This is useful when your update() methods do heavy computation. With Python 3.14+ free-threading, these threads run truly in parallel.

When to Use Worker Threads

ScenarioRecommendation
I/O-bound updates (database, HTTP)Don’t use (async is sufficient)
CPU-bound computationUse worker_threads=N
Mixed workloadsTest both configurations

Update Frequency

Control how often each updater refreshes its data:

# Fast updates for real-time metrics agent.register("1.3.6.1.4.1.12345.1", RealtimeUpdater(), freq=1) # Slow updates for stable data agent.register("1.3.6.1.4.1.12345.2", ConfigUpdater(), freq=60)

Very fast update frequencies (< 1 second) can cause high CPU usage. Consider whether you really need sub-second data freshness.

Registration Priority

When multiple agents register overlapping OIDs, priority determines which agent handles requests:

# Lower number = higher priority (1-255) agent.register("1.3.6.1.4.1.12345", MyUpdater(), priority=100)

Default priority is 127. Use lower values to override other agents.

Dynamic Registration

Register and unregister OIDs at runtime:

class DynamicAgent: def __init__(self): self.agent = Agent() self.updaters = {} def add_device(self, device_id: str, updater: Updater): oid = f"1.3.6.1.4.1.12345.{device_id}" self.agent.register(oid, updater) self.updaters[device_id] = oid def remove_device(self, device_id: str): if device_id in self.updaters: self.agent.unregister(self.updaters[device_id]) del self.updaters[device_id]

Graceful Shutdown

Handle shutdown signals properly:

import signal import snmpkit from snmpkit.agent import Agent, Updater class MyUpdater(Updater): async def update(self): self.set_INTEGER("1.0", 42) async def main(): agent = Agent() agent.register("1.3.6.1.4.1.12345", MyUpdater()) # Handle SIGTERM/SIGINT loop = asyncio.get_event_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, lambda: asyncio.create_task(agent.stop())) try: await agent.start() except asyncio.CancelledError: pass snmpkit.run(main())

Logging

Configure logging to debug issues:

import logging # Enable debug logging for snmpkit logging.basicConfig(level=logging.DEBUG) logging.getLogger("snmpkit").setLevel(logging.DEBUG)

Log levels:

  • DEBUG - PDU details, registration events
  • INFO - Connection events, errors
  • WARNING - Recoverable errors
  • ERROR - Critical failures

Connection Handling

The agent automatically reconnects if the connection to snmpd is lost:

INFO:snmpkit.agent:Reconnecting... WARNING:snmpkit.agent:Reconnect failed: Connection refused, retrying in 2s INFO:snmpkit.agent:Reconnecting... INFO:snmpkit.agent:Connected to /var/agentx/master

No code changes needed - just ensure snmpd is running.

Sync vs Async Entry Points

import snmpkit from snmpkit.agent import Agent, Updater async def main(): agent = Agent() agent.register("1.3.6.1.4.1.12345", MyUpdater()) await agent.start() # Uses uvloop automatically snmpkit.run(main())

The async approach is recommended as it gives you control over the event loop and allows running other async tasks alongside the agent.

Performance Recommendations

ScenarioConfiguration
< 100 OIDs, light I/ODefault settings
100-1000 OIDsDefault settings
1000-10000 OIDsparallel_encoding=True
CPU-heavy update callbacksworker_threads=N (N = CPU cores)
High OID count + CPU-heavyBoth options

Custom Socket Path

For non-standard snmpd configurations:

# TCP socket agent = Agent(socket_path="tcp:localhost:705") # Custom Unix socket agent = Agent(socket_path="/run/snmp/agentx.sock")

Next Steps

Last updated on