Skip to content

Commit 1643737

Browse files
committed
v0.2.0: multiprocessing support for 10k+ host polling
Add --workers flag to distribute hosts across multiple processes, each with its own asyncio event loop and SNMP engine pool. Single-process mode (--workers 1) remains the default and is unchanged from v0.1.0. See CHANGELOG.md for the full list of changes.
1 parent bebe8c3 commit 1643737

8 files changed

Lines changed: 762 additions & 43 deletions

File tree

CHANGELOG.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,23 @@ All notable changes to this project will be documented in this file.
44

55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
66

7+
## [0.2.0] - 2026-04-06
8+
9+
### Added
10+
- **Multiprocessing support**`--workers N` distributes hosts across N worker processes, each with its own asyncio event loop and SNMP engine pool. Enables polling 10,000+ hosts in parallel.
11+
- `poll_host()` coroutine — returns result dict instead of writing to file, used by worker processes
12+
- `_partition_hosts()` — splits host records into roughly-equal chunks for worker distribution
13+
- `_worker_process()` — child process entry point with independent engine pool and event loop
14+
- `_run_multiprocess()` — supervisor that spawns workers, collects results via `multiprocessing.Queue`, writes output centrally (no file contention)
15+
- Scalability benchmark suite (`tests/integration/test_scalability.py`) — 50 containers, compares engine pool strategies, measures concurrency speedup
16+
- Unit tests for `_partition_hosts` (5 tests) and `poll_host` (4 tests)
17+
- CLI tests for `--workers` flag (4 tests)
18+
19+
### Changed
20+
- `--engine-pool-size` now applies per worker process (was global)
21+
- `_build_result()` extracted from `get_async` to share result-building logic between single-process and multiprocessing paths
22+
- `main()` branches on `--workers`: 1 = existing single-process path (unchanged), >1 = multiprocessing
23+
724
## [0.1.0] - 2026-04-06
825

926
### Added

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ snmp-poller -s <snmp_auth.yml> -l <hosts.csv> -o <oids.yml>
2929
| `-o` | yes | OID definitions per group (YAML) |
3030
| `-f` | no | JSON output file (default: `/var/log/snmp_poll/snmp_poll.log`) |
3131
| `--log-dir` | no | Application log directory (default: `/var/log/snmp_poll`) |
32+
| `--engine-pool-size` | no | SNMP engines per worker (default: 5) |
33+
| `--workers` | no | Worker processes for parallel polling (default: 1) |
3234

3335
## Configuration
3436

@@ -73,6 +75,21 @@ network_switches:
7375

7476
Any number of groups and any number of OIDs per group are supported.
7577

78+
## Scaling to 10,000+ hosts
79+
80+
By default, snmp-poller runs in a single process. For large-scale
81+
polling, use `--workers` to distribute hosts across multiple processes,
82+
each with its own asyncio event loop and engine pool:
83+
84+
```
85+
# 10 processes × 100 engines = handles ~10,000 hosts
86+
snmp-poller -s auth.yml -l hosts.csv -o oids.yml \
87+
--workers 10 --engine-pool-size 100
88+
```
89+
90+
Each worker polls its hosts concurrently. Results are collected
91+
centrally by the supervisor process — no file contention.
92+
7693
## Output
7794

7895
Each polled host produces a JSON record sent to syslog and written to the output file:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "snmp-poller"
7-
version = "0.1.0"
7+
version = "0.2.0"
88
description = "Concurrent SNMPv3 polling application using asyncio"
99
readme = "README.md"
1010
license = "MIT"

src/snmp_poller/cli.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,22 @@ def cli_params():
6666
help='directory for application logs '
6767
f'(default: {DEFAULT_LOG_DIR})\n')
6868

69+
parser.add_argument('--engine-pool-size',
70+
dest='engine_pool_size',
71+
type=int,
72+
default=5,
73+
metavar='<N>',
74+
help='number of SNMP engines per worker '
75+
'(default: 5)\n')
76+
77+
parser.add_argument('--workers',
78+
dest='workers',
79+
type=int,
80+
default=1,
81+
metavar='<N>',
82+
help='number of worker processes '
83+
'(default: 1, single-process)\n')
84+
6985
args = parser.parse_args()
7086

7187
# Validate each input file: must exist, non-empty, correct extension.
@@ -86,10 +102,19 @@ def cli_params():
86102
# Create log directory if it doesn't exist yet.
87103
os.makedirs(args.log_dir, exist_ok=True)
88104

105+
if args.engine_pool_size < 1:
106+
parser.error('--engine-pool-size must be >= 1')
107+
if args.workers < 1:
108+
parser.error('--workers must be >= 1')
109+
89110
return {
90-
'hosts_file': args.hosts_file,
91-
'snmp_params': args.snmp_params,
92-
'oids': args.oids,
93-
'output_file': args.output_file,
94-
'logging_path': path.join(args.log_dir, 'snmp_poll_app.log'),
111+
'hosts_file': args.hosts_file,
112+
'snmp_params': args.snmp_params,
113+
'oids': args.oids,
114+
'output_file': args.output_file,
115+
'engine_pool_size': args.engine_pool_size,
116+
'workers': args.workers,
117+
'logging_path': path.join(
118+
args.log_dir, 'snmp_poll_app.log',
119+
),
95120
}

src/snmp_poller/poller.py

Lines changed: 203 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import asyncio
44
import json
5+
import multiprocessing
56
import syslog
67

78
from pysnmp.hlapi.asyncio import getCmd
@@ -49,13 +50,35 @@ def build_snmp_request(host, group, oids, snmp_init, snmp_params):
4950
return response, grp_oids
5051

5152

53+
def _build_result(host, group, grp_oids, var_binds):
54+
'''Build the JSON-serializable result dict from SNMP response.'''
55+
result = {
56+
"device": host,
57+
"snmp_data_grp": group,
58+
"poller_instance": "snmp_poll_nix",
59+
}
60+
61+
# Map each OID response to its config-defined name/type.
62+
# var_binds[i] is (oid, value); [1] gets the value.
63+
for i, entry in enumerate(grp_oids):
64+
if i < len(var_binds):
65+
caster = TYPE_CASTERS.get(
66+
entry.get('type', 'str'), str,
67+
)
68+
result[entry['name']] = caster(
69+
var_binds[i][1]
70+
)
71+
72+
return result
73+
74+
5275
async def get_async(host, group, oids, snmp_init, snmp_params,
5376
output_file, logger):
5477
'''
5578
Poll a single host via SNMPv3 and write the result as JSON.
5679
57-
Each call is a coroutine — multiple hosts are polled
58-
concurrently via asyncio.gather() in main().
80+
Used in single-process mode. Each call is a coroutine — multiple
81+
hosts are polled concurrently via asyncio.gather() in main().
5982
6083
See: https://pysnmp.readthedocs.io/en/latest/index.html
6184
'''
@@ -90,23 +113,7 @@ async def get_async(host, group, oids, snmp_init, snmp_params,
90113
)
91114
return
92115

93-
result = {
94-
"device": host,
95-
"snmp_data_grp": group,
96-
"poller_instance": "snmp_poll_nix",
97-
}
98-
99-
# Map each OID response to its config-defined name/type.
100-
# var_binds[i] is (oid, value); [1] gets the value.
101-
for i, entry in enumerate(grp_oids):
102-
if i < len(var_binds):
103-
caster = TYPE_CASTERS.get(
104-
entry.get('type', 'str'), str,
105-
)
106-
result[entry['name']] = caster(
107-
var_binds[i][1]
108-
)
109-
116+
result = _build_result(host, group, grp_oids, var_binds)
110117
output = json.dumps(result, indent=2)
111118
syslog.syslog(output)
112119
with open(output_file, 'a') as f:
@@ -116,6 +123,152 @@ async def get_async(host, group, oids, snmp_init, snmp_params,
116123
logger.critical(f'{host}: {err}')
117124

118125

126+
async def poll_host(host, group, oids, snmp_init, snmp_params,
127+
logger):
128+
'''
129+
Poll a single host and return the result dict, or None on error.
130+
131+
Used in multiprocessing mode — results are collected and
132+
written centrally by the supervisor process.
133+
'''
134+
try:
135+
response, grp_oids = build_snmp_request(
136+
host, group, oids, snmp_init, snmp_params,
137+
)
138+
if response is None:
139+
logger.critical(
140+
f'{host}: unhandled device group "{group}", '
141+
f'wrong/missing info from CSV'
142+
)
143+
return None
144+
145+
err_indication, err_status, err_index, var_binds = (
146+
await response
147+
)
148+
149+
if err_indication:
150+
logger.critical(f'{host}: {err_indication}')
151+
return None
152+
if err_status:
153+
err_at = (
154+
var_binds[int(err_index) - 1]
155+
if err_index else '?'
156+
)
157+
logger.critical(
158+
f'{err_status.prettyPrint()} at {err_at}'
159+
)
160+
return None
161+
162+
return _build_result(host, group, grp_oids, var_binds)
163+
164+
except Exception as err:
165+
logger.critical(f'{host}: {err}')
166+
return None
167+
168+
169+
def _partition_hosts(records, num_workers):
170+
'''Split {host: group} dict into num_workers roughly-equal chunks.'''
171+
items = list(records.items())
172+
chunks = []
173+
chunk_size = len(items) // num_workers
174+
remainder = len(items) % num_workers
175+
start = 0
176+
177+
for i in range(num_workers):
178+
end = start + chunk_size + (1 if i < remainder else 0)
179+
chunks.append(dict(items[start:end]))
180+
start = end
181+
182+
return chunks
183+
184+
185+
def _worker_process(host_chunk, snmp_params, oids,
186+
engine_pool_size, logging_path,
187+
result_queue):
188+
'''
189+
Entry point for each worker process.
190+
Creates its own engine pool and event loop, polls assigned
191+
hosts, pushes result dicts onto result_queue.
192+
'''
193+
logger = pysnmp_logging(logging_path)
194+
195+
engine_pool = [
196+
PySnmpInit(
197+
snmp_params['userName'],
198+
snmp_params['authKey'],
199+
snmp_params['privKey'],
200+
)
201+
for _ in range(engine_pool_size)
202+
]
203+
204+
async def _run():
205+
tasks = [
206+
poll_host(
207+
host, group, oids,
208+
engine_pool[i % engine_pool_size],
209+
snmp_params, logger,
210+
)
211+
for i, (host, group) in enumerate(
212+
host_chunk.items()
213+
)
214+
]
215+
results = await asyncio.gather(*tasks)
216+
for r in results:
217+
if r is not None:
218+
result_queue.put(r)
219+
220+
asyncio.run(_run())
221+
# Sentinel: tells supervisor this worker is done.
222+
result_queue.put(None)
223+
224+
225+
def _run_multiprocess(records, snmp_params, oids,
226+
engine_pool_size, output_file,
227+
logging_path, num_workers):
228+
'''
229+
Distribute hosts across worker processes, collect results
230+
centrally, and write output from the supervisor process.
231+
'''
232+
chunks = _partition_hosts(records, num_workers)
233+
result_queue = multiprocessing.Queue()
234+
235+
processes = []
236+
for chunk in chunks:
237+
if not chunk:
238+
continue
239+
p = multiprocessing.Process(
240+
target=_worker_process,
241+
args=(
242+
chunk, snmp_params, oids,
243+
engine_pool_size, logging_path,
244+
result_queue,
245+
),
246+
)
247+
p.start()
248+
processes.append(p)
249+
250+
active_workers = len(processes)
251+
252+
# Drain results and write centrally — single file handle,
253+
# no contention between processes.
254+
syslog.openlog(facility=syslog.LOG_LOCAL1)
255+
workers_done = 0
256+
with open(output_file, 'a') as f:
257+
while workers_done < active_workers:
258+
item = result_queue.get()
259+
if item is None:
260+
workers_done += 1
261+
continue
262+
output = json.dumps(item, indent=2)
263+
syslog.syslog(output)
264+
f.write(output + '\n')
265+
266+
for p in processes:
267+
p.join()
268+
269+
syslog.closelog()
270+
271+
119272
def main():
120273
'''Entry point — load config, init SNMP, poll all hosts.'''
121274
paths = cli_params()
@@ -125,23 +278,37 @@ def main():
125278
oids = yml_loader(paths['oids'])
126279
logger = pysnmp_logging(paths['logging_path'])
127280

128-
snmp_init = PySnmpInit(
129-
snmp_params['userName'],
130-
snmp_params['authKey'],
131-
snmp_params['privKey'],
132-
)
281+
pool_size = paths['engine_pool_size']
282+
num_workers = paths['workers']
283+
output_file = paths['output_file']
133284

134-
syslog.openlog(facility=syslog.LOG_LOCAL1)
285+
if num_workers <= 1:
286+
# Single-process mode — existing behavior.
287+
engine_pool = [
288+
PySnmpInit(
289+
snmp_params['userName'],
290+
snmp_params['authKey'],
291+
snmp_params['privKey'],
292+
)
293+
for _ in range(pool_size)
294+
]
135295

136-
# Build one coroutine per host, then poll all concurrently.
137-
output_file = paths['output_file']
138-
tasks = [
139-
get_async(
140-
host, group, oids, snmp_init,
141-
snmp_params, output_file, logger,
142-
)
143-
for host, group in records.items()
144-
]
145-
asyncio.run(asyncio.gather(*tasks))
296+
syslog.openlog(facility=syslog.LOG_LOCAL1)
146297

147-
syslog.closelog()
298+
tasks = [
299+
get_async(
300+
host, group, oids,
301+
engine_pool[i % pool_size],
302+
snmp_params, output_file, logger,
303+
)
304+
for i, (host, group) in enumerate(records.items())
305+
]
306+
asyncio.run(asyncio.gather(*tasks))
307+
308+
syslog.closelog()
309+
else:
310+
# Multiprocessing mode — distribute across workers.
311+
_run_multiprocess(
312+
records, snmp_params, oids, pool_size,
313+
output_file, paths['logging_path'], num_workers,
314+
)

0 commit comments

Comments
 (0)