11#!/usr/bin/env python
2+
3+ # core/node.py
4+
25import sys
36import atexit
47import copy
@@ -62,6 +65,7 @@ class ExecMode(Enum):
6265
6366
6467class ChunkIndex (IntEnum ):
68+ NONE = - 3
6569 PREPROCESS = - 2
6670 POSTPROCESS = - 1
6771 # Standard chunks are indexed from 0
@@ -404,25 +408,22 @@ def makeProgressBar(self, end, message=''):
404408
405409 f .close ()
406410
407- with open (self .logFile ) as f :
411+ with open (self .logFile , "r" ) as f :
408412 content = f .read ()
409413 self .progressBarPosition = content .rfind ('\n ' )
410414
411- f .close ()
412-
413415 def updateProgressBar (self , value ):
414416 assert self .progressBar
415417 assert value <= self .progressEnd
416418
417419 tics = round ((value / self .progressEnd )* 51 )
418420
419- with open (self .logFile , 'r+' ) as f :
421+ with open (self .logFile , "r+" ) as f :
420422 text = f .read ()
421423 for i in range (tics - self .currentProgressTics ):
422424 text = text [:self .progressBarPosition ]+ '*' + text [self .progressBarPosition :]
423425 f .seek (0 )
424426 f .write (text )
425- f .close ()
426427
427428 self .currentProgressTics = tics
428429
@@ -657,13 +658,6 @@ def isFinished(self):
657658 return self ._status .status == Status .SUCCESS
658659
659660 def process (self , forceCompute = False , inCurrentEnv = False ):
660- if self .isPreprocess :
661- return self .node .nodeDesc .preprocess (self .node )
662- if self .isPostprocess :
663- return self .node .nodeDesc .postprocess (self .node )
664- return self ._process (forceCompute , inCurrentEnv )
665-
666- def _process (self , forceCompute = False , inCurrentEnv = False ):
667661 if not forceCompute and self ._status .status == Status .SUCCESS :
668662 logging .info (f"Node chunk already computed: { self .name } " )
669663 return
@@ -685,8 +679,12 @@ def _process(self, forceCompute=False, inCurrentEnv=False):
685679 self .statThread .start ()
686680
687681 try :
688- logging .info (f"[Process chunk] Start processing..." )
689- self .node .nodeDesc .processChunk (self )
682+ if self .isPreprocess :
683+ self .node .nodeDesc .preprocess (self .node )
684+ elif self .isPostprocess :
685+ self .node .nodeDesc .postprocess (self .node )
686+ else :
687+ self .node .nodeDesc .processChunk (self )
690688 # NOTE: this assumes saving the output attributes for each chunk
691689 self .node .saveOutputAttr ()
692690 executionStatus = Status .SUCCESS
@@ -712,7 +710,6 @@ def _process(self, forceCompute=False, inCurrentEnv=False):
712710 self .statistics = stats .Statistics ()
713711 del runningProcesses [self .name ]
714712
715-
716713 def _processInIsolatedEnvironment (self ):
717714 """
718715 Process this node chunk in the isolated environment defined in the environment
@@ -1704,42 +1701,47 @@ def initStatusOnCompute(self, forceCompute=False):
17041701 chunkPlaceholder ._status .status = self ._nodeStatus .status
17051702 self ._chunkPlaceholder .setObjectList ([chunkPlaceholder ])
17061703 self .chunksChanged .emit ()
1704+
1705+ def getChunkName (self , iteration : int ):
1706+ if iteration >= 0 :
1707+ return str (self .chunks [iteration ].index )
1708+ elif iteration == ChunkIndex .PREPROCESS :
1709+ return "preprocess"
1710+ elif iteration == ChunkIndex .POSTPROCESS :
1711+ return "postprocess"
1712+ return "0"
17071713
17081714 def processIteration (self , iteration ):
17091715 self ._chunks [iteration ].process ()
17101716
1711- def preprocess (self ):
1717+ def preprocess (self , forceCompute = False , inCurrentEnv = False ):
17121718 """ Prepare the node processing """
1713- print ( "(preprocess)" )
1719+ self . prepareLogger ( ChunkIndex . PREPROCESS )
17141720 if self .nodeDesc ._hasPreprocess :
1715- print ("-> has preprocess" )
1716- # self.nodeDesc.preprocess(self)
1717- self ._preprocessChunk .process ()
1721+ self ._preprocessChunk .process (forceCompute , inCurrentEnv )
1722+ self .restoreLogger ()
17181723
17191724 def process (self , forceCompute = False , inCurrentEnv = False ):
1720- print ("(process)" )
17211725 for chunk in self ._chunks :
1722- print ("-> chunk" , chunk )
17231726 chunk .process (forceCompute , inCurrentEnv )
17241727
1725- def postprocess (self ):
1728+ def postprocess (self , forceCompute = False , inCurrentEnv = False ):
17261729 """
17271730 Invoke the post process on Client Node to execute after the processing on the
17281731 node is completed
17291732 """
1730- print ( "(postprocess)" )
1733+ self . prepareLogger ( ChunkIndex . POSTPROCESS )
17311734 if self .nodeDesc ._hasPostprocess :
1732- print ("-> has postprocess" )
1733- # self.nodeDesc.postprocess(self)
1734- self ._postprocessChunk .process ()
1735+ self ._postprocessChunk .process (forceCompute , inCurrentEnv )
1736+ self .restoreLogger ()
17351737
17361738 def getLogHandlers (self ):
17371739 return self ._handlers
17381740
1739- def prepareLogger (self , iteration = - 1 ):
1741+ def prepareLogger (self , iteration = ChunkIndex . NONE ):
17401742 # Get file handler path
1741- chunkIndex = self .chunks [ iteration ]. index if iteration != - 1 else 0
1742- logFileName = f"{ chunkIndex } .log"
1743+ chunkName = self .getChunkName ( iteration )
1744+ logFileName = f"{ chunkName } .log"
17431745 logFile = os .path .join (self .internalFolder , logFileName )
17441746 # Setup logger
17451747 rootLogger = logging .getLogger ()
@@ -2319,10 +2321,14 @@ def _resetChunks(self):
23192321 """ Set chunks on the node.
23202322 # TODO : Maybe don't delete chunks if we will recreate them as before ?
23212323 """
2322- if self .isInputNode :
2324+ if not self .isComputableType :
23232325 self ._chunksCreated = True
23242326 return
23252327 # Disconnect signals
2328+ if self ._preprocessChunk :
2329+ self ._preprocessChunk .statusChanged .disconnect (self .globalStatusChanged )
2330+ if self ._postprocessChunk :
2331+ self ._postprocessChunk .statusChanged .disconnect (self .globalStatusChanged )
23262332 for chunk in self ._chunks :
23272333 chunk .statusChanged .disconnect (self .globalStatusChanged )
23282334 # Empty list
@@ -2358,7 +2364,18 @@ def _resetChunks(self):
23582364 self ._chunksCreated = False
23592365 self .setSize (0 )
23602366 self ._chunkPlaceholder .setObjectList ([NodeChunk (self , desc .computation .Range ())])
2361-
2367+ # Pre/post process
2368+ if self .nodeDesc ._hasPreprocess :
2369+ self ._preprocessChunk = NodeChunk (self , desc .Range (ChunkIndex .PREPROCESS ))
2370+ self ._preprocessChunk .statusChanged .connect (self .globalStatusChanged )
2371+ else :
2372+ self ._preprocessChunk = None
2373+ if self .nodeDesc ._hasPostprocess :
2374+ self ._postprocessChunk = NodeChunk (self , desc .Range (ChunkIndex .POSTPROCESS ))
2375+ self ._postprocessChunk .statusChanged .connect (self .globalStatusChanged )
2376+ else :
2377+ self ._postprocessChunk = None
2378+
23622379 # Create chunks when possible
23632380 self .chunksCreatedChanged .emit ()
23642381 self .chunksChanged .emit ()
0 commit comments