@@ -567,9 +567,9 @@ def upgradeStatusFile(self):
567567 Upgrade node status file based on the current status.
568568 """
569569 self .saveStatusFile ()
570- self .statusChanged .emit ()
571570 # We want to make sure the nodeStatus is up to date too
572571 self .node .upgradeStatusFile ()
572+ self .statusChanged .emit ()
573573
574574 def upgradeStatusTo (self , newStatus , execMode = None ):
575575 if newStatus .value < self ._status .status .value :
@@ -798,6 +798,7 @@ def __init__(self, nodeType: str, position: Position = None, parent: BaseObject
798798 self .dirty : bool = True # whether this node's outputs must be re-evaluated on next Graph update
799799 self ._chunks : list [NodeChunk ] = ListModel (parent = self )
800800 self ._chunksCreated = False # Only initialize chunks on compute
801+ self ._chunkPlaceholder : list [NodeChunk ] = ListModel (parent = self ) # Placeholder chunk for nodes with dynamic ones
801802 self ._uid : str = uid
802803 self ._expVars : dict = {}
803804 self ._size : int = 0
@@ -1363,6 +1364,11 @@ def upgradeStatusTo(self, newStatus, execMode=None):
13631364 self ._nodeStatus .execMode = execMode
13641365 self ._nodeStatus .status = newStatus
13651366 self .upgradeStatusFile ()
1367+ chunkPlaceholder = NodeChunk (self , desc .computation .Range ())
1368+ chunkPlaceholder ._status .execMode = self ._nodeStatus .execMode
1369+ chunkPlaceholder ._status .status = self ._nodeStatus .status
1370+ self .chunkPlaceholder .setObjectList ([chunkPlaceholder ])
1371+ self .chunksChanged .emit ()
13661372 self .globalStatusChanged .emit ()
13671373
13681374 def updateStatisticsFromCache (self ):
@@ -1530,9 +1536,7 @@ def updateNodeStatusFromCache(self):
15301536 return chunksRangeHasChanged
15311537
15321538 def updateStatusFromCache (self ):
1533- """
1534- Update node status based on status file content/existence.
1535- """
1539+ """ Update node status based on status file content/existence. """
15361540 # Update nodeStatus from cache
15371541 chunkChanged = self .updateNodeStatusFromCache ()
15381542 # Create chunks if we found info on them on the node cache
@@ -1541,12 +1545,18 @@ def updateStatusFromCache(self):
15411545 try :
15421546 self .createChunksFromCache ()
15431547 except Exception as e :
1544- logging .warning (f"Could not create chunks from cache : { e } " )
1548+ logging .warning (f"Could not create chunks from cache: { e } " )
15451549 return
15461550 s = self .globalStatus
15471551 if self ._chunksCreated :
15481552 for chunk in self ._chunks :
15491553 chunk .updateStatusFromCache ()
1554+ else :
1555+ # Restore placeholder chunk if needed
1556+ chunkPlaceholder = NodeChunk (self , desc .computation .Range ())
1557+ chunkPlaceholder ._status .execMode = self ._nodeStatus .execMode
1558+ chunkPlaceholder ._status .status = self ._nodeStatus .status
1559+ self ._chunkPlaceholder .setObjectList ([chunkPlaceholder ])
15501560 # logging.debug(f"updateStatusFromCache: {self.name}, status: {s} => {self.globalStatus}")
15511561 self .updateOutputAttr ()
15521562
@@ -1583,6 +1593,12 @@ def initStatusOnSubmit(self, forceCompute=False):
15831593 self ._nodeStatus .initExternSubmit ()
15841594 self .upgradeStatusFile ()
15851595 self .globalStatusChanged .emit ()
1596+ if self ._nodeStatus .execMode == ExecMode .EXTERN and self ._nodeStatus .status in (Status .RUNNING , Status .SUBMITTED ):
1597+ chunkPlaceholder = NodeChunk (self , desc .computation .Range ())
1598+ chunkPlaceholder ._status .execMode = self ._nodeStatus .execMode
1599+ chunkPlaceholder ._status .status = self ._nodeStatus .status
1600+ self ._chunkPlaceholder .setObjectList ([chunkPlaceholder ])
1601+ self .chunksChanged .emit ()
15861602
15871603 def initStatusOnCompute (self , forceCompute = False ):
15881604 hasChunkToLaunch = False
@@ -1599,6 +1615,12 @@ def initStatusOnCompute(self, forceCompute=False):
15991615 self ._nodeStatus .initLocalSubmit ()
16001616 self .upgradeStatusFile ()
16011617 self .globalStatusChanged .emit ()
1618+ if self ._nodeStatus .execMode == ExecMode .LOCAL and self ._nodeStatus .status in (Status .RUNNING , Status .SUBMITTED ):
1619+ chunkPlaceholder = NodeChunk (self , desc .computation .Range ())
1620+ chunkPlaceholder ._status .execMode = self ._nodeStatus .execMode
1621+ chunkPlaceholder ._status .status = self ._nodeStatus .status
1622+ self ._chunkPlaceholder .setObjectList ([chunkPlaceholder ])
1623+ self .chunksChanged .emit ()
16021624
16031625 def processIteration (self , iteration ):
16041626 self ._chunks [iteration ].process ()
@@ -2059,6 +2081,7 @@ def _hasDisplayableShape(self):
20592081 chunksCreated = Property (bool , lambda self : self ._chunksCreated , notify = chunksCreatedChanged )
20602082 chunksChanged = Signal ()
20612083 chunks = Property (Variant , getChunks , notify = chunksChanged )
2084+ chunkPlaceholder = Property (Variant , lambda self : self ._chunkPlaceholder , notify = chunksChanged )
20622085 nbParallelizationBlocks = Property (int , lambda self : len (self ._chunks ) if self ._chunksCreated else 0 , notify = chunksChanged )
20632086 sizeChanged = Signal ()
20642087 size = Property (int , getSize , notify = sizeChanged )
@@ -2197,7 +2220,7 @@ def toDict(self):
21972220 }
21982221
21992222 def _resetChunks (self ):
2200- """ Set chunks on the node
2223+ """ Set chunks on the node.
22012224 # TODO : Maybe don't delete chunks if we will recreate them as before ?
22022225 """
22032226 if isinstance (self .nodeDesc , desc .InputNode ):
@@ -2208,6 +2231,7 @@ def _resetChunks(self):
22082231 chunk .statusChanged .disconnect (self .globalStatusChanged )
22092232 # Empty list
22102233 self ._chunks .setObjectList ([])
2234+ self ._chunkPlaceholder .setObjectList ([])
22112235 # Recreate list with reset values (1 chunk or the static size)
22122236 if not self .isParallelized :
22132237 if not self .nodeDesc .size :
@@ -2235,7 +2259,8 @@ def _resetChunks(self):
22352259 else :
22362260 self ._chunksCreated = False
22372261 self .setSize (0 )
2238- self ._chunks .setObjectList ([])
2262+ self ._chunkPlaceholder .setObjectList ([NodeChunk (self , desc .computation .Range ())])
2263+
22392264 # Create chunks when possible
22402265 self .chunksCreatedChanged .emit ()
22412266 self .chunksChanged .emit ()
@@ -2271,7 +2296,7 @@ def __createChunks(self, ranges):
22712296 self .chunksCreatedChanged .emit ()
22722297
22732298 def createChunksFromCache (self ):
2274- """Create chunks when a node cache exists"""
2299+ """ Create chunks when a node cache exists. """
22752300 try :
22762301 # Get size from cache
22772302 size = self ._nodeStatus .nbChunks
@@ -2285,7 +2310,7 @@ def createChunksFromCache(self):
22852310 raise e
22862311
22872312 def createChunks (self ):
2288- """Create chunks when computation is about to start"""
2313+ """ Create chunks when computation is about to start. """
22892314 if self ._chunksCreated :
22902315 return
22912316 if isinstance (self .nodeDesc , desc .InputNode ):
0 commit comments