11"""
22DIRAC Wrapper to execute python and system commands with a wrapper, that might
33set a timeout.
4- 3 FUNCTIONS are provided:
4+ 3 functions are provided:
55
66 - shellCall( iTimeOut, cmdSeq, callbackFunction = None, env = None ):
77 it uses subprocess.Popen class with "shell = True".
2626 should be used to wrap third party python functions
2727
2828"""
29+
2930import os
3031import selectors
3132import signal
@@ -161,7 +162,6 @@ def __init__(self, timeout=False, bufferLimit=52428800):
161162
162163 self .child = None
163164 self .childPID = 0
164- self .childKilled = False
165165 self .callback = None
166166 self .bufferList = []
167167 self .cmdSeq = []
@@ -193,7 +193,7 @@ def __readFromFD(self, fd, baseLength=0):
193193 f"First and last data in buffer: \n { dataString [:100 ]} \n ....\n { dataString [- 100 :]} " ,
194194 )
195195 retDict = S_ERROR (
196- "Reached maximum allowed length (%d bytes) " " for called function return value" % self .bufferLimit
196+ "Reached maximum allowed length (%d bytes) for called function return value" % self .bufferLimit
197197 )
198198 retDict ["Value" ] = dataString
199199 return retDict
@@ -241,60 +241,71 @@ def __selectFD(self, readSeq, timeout=False):
241241 events = sel .select (timeout = timeout or self .timeout or None )
242242 return [key .fileobj for key , event in events if event & selectors .EVENT_READ ]
243243
244- def __killPid (self , pid , sig = 9 ):
245- """send signal :sig: to process :pid:
246-
247- :param int pid: process id
248- :param int sig: signal to send, default 9 (SIGKILL)
249- """
244+ def __terminateProcess (self , process ):
245+ """Tries to terminate a process with SIGTERM. Returns a (gone, alive) tuple"""
246+ self .log .verbose (f"Sending SIGTERM signal to PID { process .pid } " )
250247 try :
251- os .kill (pid , sig )
252- except Exception as x :
253- if str (x ) != "[Errno 3] No such process" :
254- self .log .exception ("Exception while killing timed out process" )
255- raise x
256-
257- def __poll (self , pid ):
258- """wait for :pid:"""
248+ process .terminate ()
249+ except psutil .NoSuchProcess :
250+ return ([], [])
251+ return psutil .wait_procs ([process ], timeout = 60 )
252+
253+ def __poll (self , process ):
254+ """Non-blocking check of whether process `pid` is still alive.
255+ Returns:
256+ - (0, 0) if process is still running (like os.waitpid(pid, os.WNOHANG))
257+ - (pid, exitcode) if process has terminated
258+ - None if process info cannot be retrieved
259+ """
259260 try :
260- return os .waitpid (pid , os .WNOHANG )
261- except os .error :
262- if self .childKilled :
263- return False
261+ exitcode = process .wait (timeout = 0 )
262+ return (process .pid , exitcode ) # exited
263+ except psutil .TimeoutExpired :
264+ return (0 , 0 ) # still running
265+ except psutil .NoSuchProcess :
264266 return None
265267
266268 def killChild (self , recursive = True ):
267- """kill child process
268-
269- :param boolean recursive: flag to kill all descendants
269+ """Kills a process tree (including children) with signal SIGTERM. If that fails, escalate to SIGKILL
270+ returns (gone, alive) tuple.
270271 """
271- pgid = os .getpgid (self .childPID )
272- if pgid != os .getpgrp ():
273- try :
274- # Child is in its own group: kill the group
275- os .killpg (pgid , signal .SIGTERM )
276- except OSError :
277- # Process is already dead
278- pass
279- else :
280- # No separate group: walk the tree
281- parent = psutil .Process (self .childPID )
282- procs = parent .children (recursive = recursive )
283- procs .append (parent )
284- for p in procs :
272+
273+ self .log .info (f"Killing childPID { self .childPID } " )
274+
275+ gone , alive = [], []
276+ try :
277+ child_process = psutil .Process (self .childPID )
278+ except psutil .NoSuchProcess :
279+ self .log .warn (f"Child PID { self .childPID } no longer exists" )
280+ return (gone , alive )
281+
282+ if recursive :
283+ # grandchildren
284+ children = child_process .children (recursive = True )
285+ self .log .info (f"Sending kill signal to { len (children )} children PIDs" )
286+ for p in children :
285287 try :
286288 p .terminate ()
287289 except psutil .NoSuchProcess :
288- pass
289- _gone , alive = psutil .wait_procs (procs , timeout = 10 )
290- # Escalate any survivors
290+ continue
291+ g , a = psutil .wait_procs (children , timeout = 60 )
292+ gone .extend (g )
293+ alive .extend (a )
294+
295+ # now killing the child_process
296+ g , a = self .__terminateProcess (child_process )
297+ gone .extend (g )
298+ alive .extend (a )
299+
300+ # if there's something still alive, use SIGKILL
301+ if alive :
291302 for p in alive :
292303 try :
293304 p .kill ()
294305 except psutil .NoSuchProcess :
295306 pass
296307
297- self . childKilled = True
308+ return psutil . wait_procs ( alive , timeout = 60 )
298309
299310 def pythonCall (self , function , * stArgs , ** stKeyArgs ):
300311 """call python function :function: with :stArgs: and :stKeyArgs:"""
@@ -309,8 +320,6 @@ def pythonCall(self, function, *stArgs, **stKeyArgs):
309320 if pid == 0 :
310321 os .close (readFD )
311322 self .__executePythonFunction (function , writeFD , * stArgs , ** stKeyArgs )
312- # FIXME: the close it is done at __executePythonFunction, do we need it here?
313- os .close (writeFD )
314323 else :
315324 os .close (writeFD )
316325 readSeq = self .__selectFD ([readFD ])
@@ -319,14 +328,13 @@ def pythonCall(self, function, *stArgs, **stKeyArgs):
319328 try :
320329 if len (readSeq ) == 0 :
321330 self .log .debug ("Timeout limit reached for pythonCall" , function .__name__ )
322- self .__killPid (pid )
323-
324- # HACK to avoid python bug
325- # self.wait()
326- retries = 10000
327- while os .waitpid (pid , 0 ) == - 1 and retries > 0 :
328- time .sleep (0.001 )
329- retries -= 1
331+ gone , alive = self .__terminateProcess (psutil .Process (pid ))
332+ if alive :
333+ for p in alive :
334+ try :
335+ p .kill ()
336+ except psutil .NoSuchProcess :
337+ continue
330338
331339 return S_ERROR ('%d seconds timeout for "%s" call' % (self .timeout , function .__name__ ))
332340 elif readSeq [0 ] == readFD :
@@ -400,7 +408,7 @@ def __readFromFile(self, fd, baseLength):
400408 if len (dataString ) + baseLength > self .bufferLimit :
401409 self .log .error ("Maximum output buffer length reached" )
402410 retDict = S_ERROR (
403- "Reached maximum allowed length (%d bytes) for called " " function return value" % self .bufferLimit
411+ "Reached maximum allowed length (%d bytes) for called function return value" % self .bufferLimit
404412 )
405413 retDict ["Value" ] = dataString
406414 return retDict
@@ -446,6 +454,7 @@ def systemCall(
446454 start_new_session = start_new_session ,
447455 )
448456 self .childPID = self .child .pid
457+ child_process = psutil .Process (self .childPID )
449458 except OSError as v :
450459 retDict = S_ERROR (repr (v ))
451460 retDict ["Value" ] = (- 1 , "" , str (v ))
@@ -464,9 +473,9 @@ def systemCall(
464473 self .bufferList = [["" , 0 ], ["" , 0 ]]
465474 initialTime = time .time ()
466475
467- exitStatus = self .__poll (self . child . pid )
476+ exitStatus = self .__poll (child_process )
468477
469- while (0 , 0 ) == exitStatus or exitStatus is None :
478+ while (0 , 0 ) == exitStatus : # This means that the process is still alive
470479 retDict = self .__readFromCommand ()
471480 if not retDict ["OK" ]:
472481 return retDict
@@ -478,14 +487,14 @@ def systemCall(
478487 1 , "Timeout (%d seconds) for '%s' call" % (self .timeout , cmdSeq )
479488 )
480489 time .sleep (0.01 )
481- exitStatus = self .__poll (self . child . pid )
490+ exitStatus = self .__poll (child_process )
482491
483492 self .__readFromCommand ()
484493
485494 if exitStatus :
486495 exitStatus = exitStatus [1 ]
487496
488- if exitStatus >= 256 :
497+ if exitStatus and exitStatus >= 256 :
489498 exitStatus = int (exitStatus / 256 )
490499 return S_OK ((exitStatus , self .bufferList [0 ][0 ], self .bufferList [1 ][0 ]))
491500 finally :
0 commit comments