11#!/usr/bin/env python
2+
3+ # core/node.py
4+
25import sys
36import atexit
47import copy
@@ -657,13 +660,8 @@ def isFinished(self):
657660 return self ._status .status == Status .SUCCESS
658661
659662 def process (self , forceCompute = False , inCurrentEnv = False ):
660- if self .isPreprocess :
661- return self .node .nodeDesc .preprocess (self .node )
662- if self .isPostprocess :
663- return self .node .nodeDesc .postprocess (self .node )
664- return self ._process (forceCompute , inCurrentEnv )
663+ print (f"(NodeChunk) process { self } -> pre={ self .isPreprocess } , post={ self .isPostprocess } " )
665664
666- def _process (self , forceCompute = False , inCurrentEnv = False ):
667665 if not forceCompute and self ._status .status == Status .SUCCESS :
668666 logging .info (f"Node chunk already computed: { self .name } " )
669667 return
@@ -672,33 +670,51 @@ def _process(self, forceCompute=False, inCurrentEnv=False):
672670 # This only happens once, when the node has the SUBMITTED status.
673671 # The sub-process will go through this method again, but the node status will
674672 # have been set to RUNNING.
673+ print (f"(_process) A" )
675674 if not inCurrentEnv and self .node .getMrNodeType () == MrNodeType .NODE :
675+ print (f"(_process) B" )
676676 self ._processInIsolatedEnvironment ()
677677 return
678678
679+ print (f"(_process) C" )
679680 runningProcesses [self .name ] = self
681+ print (f"(_process) D" )
680682 self ._status .setNode (self .node )
683+ print (f"(_process) E" )
681684 self ._status .initStartCompute ()
685+ print (f"(_process) F" )
682686 self .upgradeStatusFile ()
687+ print (f"(_process) G" )
683688 executionStatus = None
689+ print (f"(_process) H" )
684690 self .statThread = stats .StatisticsThread (self )
691+ print (f"(_process) I" )
685692 self .statThread .start ()
686693
694+ print (f"(_process) J" )
687695 try :
688- logging .info (f"[Process chunk] Start processing..." )
689- self .node .nodeDesc .processChunk (self )
696+ print (f"(_process) K" )
697+ if self .isPreprocess :
698+ self .node .nodeDesc .preprocess (self .node )
699+ elif self .isPostprocess :
700+ self .node .nodeDesc .postprocess (self .node )
701+ else :
702+ self .node .nodeDesc .processChunk (self )
690703 # NOTE: this assumes saving the output attributes for each chunk
691704 self .node .saveOutputAttr ()
692705 executionStatus = Status .SUCCESS
693706 except Exception :
707+ print (f"(_process) L" )
694708 self .updateStatusFromCache () # check if the status has been updated by another process
695709 if self ._status .status != Status .STOPPED :
696710 executionStatus = Status .ERROR
697711 raise
698712 except (KeyboardInterrupt , SystemError , GeneratorExit ):
713+ print (f"(_process) M" )
699714 executionStatus = Status .STOPPED
700715 raise
701716 finally :
717+ print (f"(_process) N" )
702718 self ._status .setNode (self .node )
703719 self ._status .initEndCompute ()
704720 self .upgradeStatusFile ()
@@ -712,6 +728,7 @@ def _process(self, forceCompute=False, inCurrentEnv=False):
712728 self .statistics = stats .Statistics ()
713729 del runningProcesses [self .name ]
714730
731+ print (f"(_process) O" )
715732
716733 def _processInIsolatedEnvironment (self ):
717734 """
0 commit comments