@@ -65,6 +65,7 @@ class ExecMode(Enum):
6565
6666
6767class ChunkIndex (IntEnum ):
68+ NONE = - 3
6869 PREPROCESS = - 2
6970 POSTPROCESS = - 1
7071 # Standard chunks are indexed from 0
@@ -660,8 +661,6 @@ def isFinished(self):
660661 return self ._status .status == Status .SUCCESS
661662
662663 def process (self , forceCompute = False , inCurrentEnv = False ):
663- print (f"(NodeChunk) process { self } -> pre={ self .isPreprocess } , post={ self .isPostprocess } " )
664-
665664 if not forceCompute and self ._status .status == Status .SUCCESS :
666665 logging .info (f"Node chunk already computed: { self .name } " )
667666 return
@@ -670,30 +669,19 @@ def process(self, forceCompute=False, inCurrentEnv=False):
670669 # This only happens once, when the node has the SUBMITTED status.
671670 # The sub-process will go through this method again, but the node status will
672671 # have been set to RUNNING.
673- print (f"(_process) A" )
674672 if not inCurrentEnv and self .node .getMrNodeType () == MrNodeType .NODE :
675- print (f"(_process) B" )
676673 self ._processInIsolatedEnvironment ()
677674 return
678675
679- print (f"(_process) C" )
680676 runningProcesses [self .name ] = self
681- print (f"(_process) D" )
682677 self ._status .setNode (self .node )
683- print (f"(_process) E" )
684678 self ._status .initStartCompute ()
685- print (f"(_process) F" )
686679 self .upgradeStatusFile ()
687- print (f"(_process) G" )
688680 executionStatus = None
689- print (f"(_process) H" )
690681 self .statThread = stats .StatisticsThread (self )
691- print (f"(_process) I" )
692682 self .statThread .start ()
693683
694- print (f"(_process) J" )
695684 try :
696- print (f"(_process) K" )
697685 if self .isPreprocess :
698686 self .node .nodeDesc .preprocess (self .node )
699687 elif self .isPostprocess :
@@ -704,17 +692,14 @@ def process(self, forceCompute=False, inCurrentEnv=False):
704692 self .node .saveOutputAttr ()
705693 executionStatus = Status .SUCCESS
706694 except Exception :
707- print (f"(_process) L" )
708695 self .updateStatusFromCache () # check if the status has been updated by another process
709696 if self ._status .status != Status .STOPPED :
710697 executionStatus = Status .ERROR
711698 raise
712699 except (KeyboardInterrupt , SystemError , GeneratorExit ):
713- print (f"(_process) M" )
714700 executionStatus = Status .STOPPED
715701 raise
716702 finally :
717- print (f"(_process) N" )
718703 self ._status .setNode (self .node )
719704 self ._status .initEndCompute ()
720705 self .upgradeStatusFile ()
@@ -728,8 +713,6 @@ def process(self, forceCompute=False, inCurrentEnv=False):
728713 self .statistics = stats .Statistics ()
729714 del runningProcesses [self .name ]
730715
731- print (f"(_process) O" )
732-
733716 def _processInIsolatedEnvironment (self ):
734717 """
735718 Process this node chunk in the isolated environment defined in the environment
@@ -1721,42 +1704,45 @@ def initStatusOnCompute(self, forceCompute=False):
17211704 chunkPlaceholder ._status .status = self ._nodeStatus .status
17221705 self ._chunkPlaceholder .setObjectList ([chunkPlaceholder ])
17231706 self .chunksChanged .emit ()
1707+
1708+ def getChunkName (self , iteration : int ):
1709+ if iteration >= 0 :
1710+ return str (self .chunks [iteration ].index )
1711+ elif iteration == ChunkIndex .PREPROCESS :
1712+ return "preprocess"
1713+ elif iteration == ChunkIndex .POSTPROCESS :
1714+ return "postprocess"
1715+ return "0"
17241716
17251717 def processIteration (self , iteration ):
17261718 self ._chunks [iteration ].process ()
17271719
1728- def preprocess (self ):
1720+ def preprocess (self , forceCompute = False , inCurrentEnv = False ):
17291721 """ Prepare the node processing """
1730- print ("(preprocess)" )
17311722 if self .nodeDesc ._hasPreprocess :
1732- print ("-> has preprocess" )
17331723 # self.nodeDesc.preprocess(self)
1734- self ._preprocessChunk .process ()
1724+ self ._preprocessChunk .process (forceCompute , inCurrentEnv )
17351725
17361726 def process (self , forceCompute = False , inCurrentEnv = False ):
1737- print ("(process)" )
17381727 for chunk in self ._chunks :
1739- print ("-> chunk" , chunk )
17401728 chunk .process (forceCompute , inCurrentEnv )
17411729
1742- def postprocess (self ):
1730+ def postprocess (self , forceCompute = False , inCurrentEnv = False ):
17431731 """
17441732 Invoke the post process on Client Node to execute after the processing on the
17451733 node is completed
17461734 """
1747- print ("(postprocess)" )
17481735 if self .nodeDesc ._hasPostprocess :
1749- print ("-> has postprocess" )
17501736 # self.nodeDesc.postprocess(self)
1751- self ._postprocessChunk .process ()
1737+ self ._postprocessChunk .process (forceCompute , inCurrentEnv )
17521738
17531739 def getLogHandlers (self ):
17541740 return self ._handlers
17551741
17561742 def prepareLogger (self , iteration = - 1 ):
17571743 # Get file handler path
1758- chunkIndex = self .chunks [ iteration ]. index if iteration != - 1 else 0
1759- logFileName = f"{ chunkIndex } .log"
1744+ chunkName = self .getChunkName ( iteration )
1745+ logFileName = f"{ chunkName } .log"
17601746 logFile = os .path .join (self .internalFolder , logFileName )
17611747 # Setup logger
17621748 rootLogger = logging .getLogger ()
@@ -2336,10 +2322,14 @@ def _resetChunks(self):
23362322 """ Set chunks on the node.
23372323 # TODO : Maybe don't delete chunks if we will recreate them as before ?
23382324 """
2339- if self .isInputNode :
2325+ if not self .isComputableType :
23402326 self ._chunksCreated = True
23412327 return
23422328 # Disconnect signals
2329+ if self ._preprocessChunk :
2330+ self ._preprocessChunk .statusChanged .disconnect (self .globalStatusChanged )
2331+ if self ._postprocessChunk :
2332+ self ._postprocessChunk .statusChanged .disconnect (self .globalStatusChanged )
23432333 for chunk in self ._chunks :
23442334 chunk .statusChanged .disconnect (self .globalStatusChanged )
23452335 # Empty list
@@ -2375,7 +2365,18 @@ def _resetChunks(self):
23752365 self ._chunksCreated = False
23762366 self .setSize (0 )
23772367 self ._chunkPlaceholder .setObjectList ([NodeChunk (self , desc .computation .Range ())])
2378-
2368+ # Pre/post process
2369+ if self .nodeDesc ._hasPreprocess :
2370+ self ._preprocessChunk = NodeChunk (self , desc .Range (ChunkIndex .PREPROCESS ))
2371+ self ._preprocessChunk .statusChanged .connect (self .globalStatusChanged )
2372+ else :
2373+ self ._preprocessChunk = None
2374+ if self .nodeDesc ._hasPostprocess :
2375+ self ._postprocessChunk = NodeChunk (self , desc .Range (ChunkIndex .POSTPROCESS ))
2376+ self ._postprocessChunk .statusChanged .connect (self .globalStatusChanged )
2377+ else :
2378+ self ._postprocessChunk = None
2379+
23792380 # Create chunks when possible
23802381 self .chunksCreatedChanged .emit ()
23812382 self .chunksChanged .emit ()
0 commit comments