33import asyncio
44import json
55import multiprocessing
6+ import signal
67import syslog
78
89from pysnmp .hlapi .v3arch .asyncio import get_cmd
@@ -225,10 +226,11 @@ async def _run():
225226
226227def _run_multiprocess (records , snmp_params , oids ,
227228 engine_pool_size , output_file ,
228- logging_path , num_workers ):
229+ logging_path , num_workers , logger ):
229230 '''
230231 Distribute hosts across worker processes, collect results
231232 centrally, and write output from the supervisor process.
233+ Handles SIGINT/SIGTERM to terminate workers cleanly.
232234 '''
233235 chunks = _partition_hosts (records , num_workers )
234236 result_queue = multiprocessing .Queue ()
@@ -249,25 +251,56 @@ def _run_multiprocess(records, snmp_params, oids,
249251 processes .append (p )
250252
251253 active_workers = len (processes )
254+ shutting_down = False
255+
256+ def _shutdown (signum , frame ):
257+ nonlocal shutting_down
258+ if shutting_down :
259+ return
260+ shutting_down = True
261+ sig_name = signal .Signals (signum ).name
262+ logger .info (f'Received { sig_name } , stopping workers...' )
263+ for p in processes :
264+ if p .is_alive ():
265+ p .terminate ()
266+
267+ signal .signal (signal .SIGINT , _shutdown )
268+ signal .signal (signal .SIGTERM , _shutdown )
252269
253270 # Drain results and write centrally — single file handle,
254271 # no contention between processes.
255272 syslog .openlog (facility = syslog .LOG_LOCAL1 )
256273 workers_done = 0
257- with open (output_file , 'a' ) as f :
258- while workers_done < active_workers :
259- item = result_queue .get ()
260- if item is None :
261- workers_done += 1
262- continue
263- output = json .dumps (item , indent = 2 )
264- syslog .syslog (output )
265- f .write (output + '\n ' )
266-
267- for p in processes :
268- p .join ()
274+ try :
275+ with open (output_file , 'a' ) as f :
276+ while workers_done < active_workers :
277+ try :
278+ item = result_queue .get (timeout = 1 )
279+ except Exception :
280+ # Check if workers died or we're shutting down.
281+ if shutting_down :
282+ break
283+ alive = [p for p in processes if p .is_alive ()]
284+ if not alive :
285+ break
286+ continue
287+ if item is None :
288+ workers_done += 1
289+ continue
290+ output = json .dumps (item , indent = 2 )
291+ syslog .syslog (output )
292+ f .write (output + '\n ' )
293+ finally :
294+ # Ensure all workers are stopped and joined.
295+ for p in processes :
296+ if p .is_alive ():
297+ p .terminate ()
298+ for p in processes :
299+ p .join (timeout = 5 )
300+ syslog .closelog ()
269301
270- syslog .closelog ()
302+ if shutting_down :
303+ logger .info ('Shutdown complete.' )
271304
272305
273306def main ():
@@ -284,7 +317,7 @@ def main():
284317 output_file = paths ['output_file' ]
285318
286319 if num_workers <= 1 :
287- # Single-process mode — existing behavior .
320+ # Single-process mode.
288321 engine_pool = [
289322 PySnmpInit (
290323 snmp_params ['userName' ],
@@ -302,14 +335,21 @@ def main():
302335 engine_pool [i % pool_size ],
303336 snmp_params , output_file , logger ,
304337 )
305- for i , (host , group ) in enumerate (records .items ())
338+ for i , (host , group ) in enumerate (
339+ records .items ()
340+ )
306341 ]
307- asyncio .run (asyncio .gather (* tasks ))
308342
309- syslog .closelog ()
343+ try :
344+ asyncio .run (asyncio .gather (* tasks ))
345+ except KeyboardInterrupt :
346+ logger .info ('Interrupted, shutting down.' )
347+ finally :
348+ syslog .closelog ()
310349 else :
311350 # Multiprocessing mode — distribute across workers.
312351 _run_multiprocess (
313352 records , snmp_params , oids , pool_size ,
314- output_file , paths ['logging_path' ], num_workers ,
353+ output_file , paths ['logging_path' ],
354+ num_workers , logger ,
315355 )
0 commit comments