@@ -760,16 +760,22 @@ def interruptJob(self, node: Node):
760760 if chunk ._status .status in (Status .SUBMITTED , Status .RUNNING ):
761761 chunk .updateStatusFromCache ()
762762 chunk .upgradeStatusTo (Status .STOPPED )
763- for node in self ._graph .dfsOnFinish (None )[0 ]:
764- if not node ._chunksCreated and node ._nodeStatus .status in (Status .SUBMITTED , Status .RUNNING ):
765- node .upgradeStatusTo (Status .STOPPED )
763+ for _node in self ._graph .dfsOnFinish (None )[0 ]:
764+ if jobManager .getNodeJob (_node ) == job and not _node ._chunksCreated and \
765+ _node ._nodeStatus .status in (Status .SUBMITTED , Status .RUNNING ):
766+ _node .upgradeStatusTo (Status .STOPPED )
766767 self .parent ().showMessage (f"Interrupted the job for node { node } " )
767768 elif not node .isExtern ():
768- self ._taskManager .clear ()
769- for chunk in node ._chunks :
770- if chunk ._status .status == Status .RUNNING and not chunk .isExtern ():
771- chunk .stopProcess ()
772- self .parent ().showMessage (f"Cleared the task manager" )
769+ for chunk in self ._sortedDFSChunks :
770+ if not chunk .isExtern () and chunk ._status .status in (Status .SUBMITTED , Status .RUNNING ):
771+ chunk .updateStatusFromCache ()
772+ chunk .upgradeStatusTo (Status .STOPPED )
773+ for node in self ._graph .dfsOnFinish (None )[0 ]:
774+ if not node .isExtern () and not node ._chunksCreated and \
775+ node ._nodeStatus .status in (Status .SUBMITTED , Status .RUNNING ):
776+ node .upgradeStatusTo (Status .STOPPED )
777+ self .stopExecution ()
778+ self .parent ().showMessage (f"Stopped the local job process" )
773779 else :
774780 self .parent ().showMessage (f"Could not retrieve job for node { node } " , "error" )
775781
0 commit comments