Advanced
Performance tuning and advanced configuration options.
Agent Configuration
from snmpkit.agent import Agent
agent = Agent(
agent_id="my-agent", # Identifies your agent to snmpd
socket_path="/var/agentx/master", # AgentX socket path
timeout=5, # Connection timeout in seconds
parallel_encoding=False, # Enable Rust parallel encoding
worker_threads=0, # Python thread pool size
queue_size=0, # Request queue size (0 = unbounded)
)Parallel Encoding
For agents with many OIDs (1000+), enable Rust’s parallel encoding using rayon:
agent = Agent(parallel_encoding=True)This parallelizes PDU encoding across CPU cores. Benchmarks show 2-4x improvement for large response payloads.
Parallel encoding has overhead for small responses. Only enable it if you’re serving 1000+ OIDs or have large table responses.
Request Queue
The queue_size parameter controls the internal request queue:
agent = Agent(queue_size=1000) # Limit to 1000 pending requests| Value | Behavior |
|---|---|
0 (default) | Unbounded queue - never drops requests |
> 0 | Fixed size - rejects new requests when full |
Use a bounded queue to prevent memory exhaustion under extreme load. When the queue is full, new requests receive an error response.
For most use cases, the default unbounded queue is fine. Only set a limit if you’re concerned about memory under sustained overload.
Worker Threads
For CPU-bound update callbacks, use a thread pool:
agent = Agent(worker_threads=4)This is useful when your update() methods do heavy computation. With Python 3.14+ free-threading, these threads run truly in parallel.
When to Use Worker Threads
| Scenario | Recommendation |
|---|---|
| I/O-bound updates (database, HTTP) | Don’t use (async is sufficient) |
| CPU-bound computation | Use worker_threads=N |
| Mixed workloads | Test both configurations |
Update Frequency
Control how often each updater refreshes its data:
# Fast updates for real-time metrics
agent.register("1.3.6.1.4.1.12345.1", RealtimeUpdater(), freq=1)
# Slow updates for stable data
agent.register("1.3.6.1.4.1.12345.2", ConfigUpdater(), freq=60)Very fast update frequencies (< 1 second) can cause high CPU usage. Consider whether you really need sub-second data freshness.
Registration Priority
When multiple agents register overlapping OIDs, priority determines which agent handles requests:
# Lower number = higher priority (1-255)
agent.register("1.3.6.1.4.1.12345", MyUpdater(), priority=100)Default priority is 127. Use lower values to override other agents.
Dynamic Registration
Register and unregister OIDs at runtime:
class DynamicAgent:
def __init__(self):
self.agent = Agent()
self.updaters = {}
def add_device(self, device_id: str, updater: Updater):
oid = f"1.3.6.1.4.1.12345.{device_id}"
self.agent.register(oid, updater)
self.updaters[device_id] = oid
def remove_device(self, device_id: str):
if device_id in self.updaters:
self.agent.unregister(self.updaters[device_id])
del self.updaters[device_id]Graceful Shutdown
Handle shutdown signals properly:
import signal
import snmpkit
from snmpkit.agent import Agent, Updater
class MyUpdater(Updater):
async def update(self):
self.set_INTEGER("1.0", 42)
async def main():
agent = Agent()
agent.register("1.3.6.1.4.1.12345", MyUpdater())
# Handle SIGTERM/SIGINT
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda: asyncio.create_task(agent.stop()))
try:
await agent.start()
except asyncio.CancelledError:
pass
snmpkit.run(main())Logging
Configure logging to debug issues:
import logging
# Enable debug logging for snmpkit
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("snmpkit").setLevel(logging.DEBUG)Log levels:
DEBUG- PDU details, registration eventsINFO- Connection events, errorsWARNING- Recoverable errorsERROR- Critical failures
Connection Handling
The agent automatically reconnects if the connection to snmpd is lost:
INFO:snmpkit.agent:Reconnecting...
WARNING:snmpkit.agent:Reconnect failed: Connection refused, retrying in 2s
INFO:snmpkit.agent:Reconnecting...
INFO:snmpkit.agent:Connected to /var/agentx/masterNo code changes needed - just ensure snmpd is running.
Sync vs Async Entry Points
Async (recommended)
import snmpkit
from snmpkit.agent import Agent, Updater
async def main():
agent = Agent()
agent.register("1.3.6.1.4.1.12345", MyUpdater())
await agent.start()
# Uses uvloop automatically
snmpkit.run(main())The async approach is recommended as it gives you control over the event loop and allows running other async tasks alongside the agent.
Performance Recommendations
| Scenario | Configuration |
|---|---|
| < 100 OIDs, light I/O | Default settings |
| 100-1000 OIDs | Default settings |
| 1000-10000 OIDs | parallel_encoding=True |
| CPU-heavy update callbacks | worker_threads=N (N = CPU cores) |
| High OID count + CPU-heavy | Both options |
Custom Socket Path
For non-standard snmpd configurations:
# TCP socket
agent = Agent(socket_path="tcp:localhost:705")
# Custom Unix socket
agent = Agent(socket_path="/run/snmp/agentx.sock")Next Steps
- Examples - Real-world patterns
- Performance - Benchmark details