Skip to content

Commit 3914422

Browse files
committed
[core] Apply corrections to handle statuses correctly with pre/post process
1 parent 0e989d9 commit 3914422

8 files changed

Lines changed: 301 additions & 162 deletions

File tree

meshroom/core/desc/node.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,19 +61,6 @@ class MrNodeType(enum.Enum):
6161
BACKDROP = enum.auto()
6262

6363

64-
def add_method_flag(methodName: str):
65-
""" Decorator used to add a flag to a method. A flag is simply a variable set to True.
66-
It will be used in this context to track whether a method is overloaded or not.
67-
68-
Args:
69-
methodName (str): Name of the annotation to set
70-
"""
71-
def wrapper(method):
72-
method.__annotations__[methodName] = True
73-
return method
74-
return wrapper
75-
76-
7764
class InternalAttributesFactory:
7865
BASIC = [
7966
StringParam(
@@ -285,7 +272,6 @@ def postUpdate(cls, node):
285272
"""
286273
pass
287274

288-
@add_method_flag("disabled_preprocess")
289275
def preprocess(self, node):
290276
""" Gets invoked just before the processChunk method for the node.
291277
@@ -297,9 +283,8 @@ def preprocess(self, node):
297283
@property
298284
def _hasPreprocess(self):
299285
""" Returns True if the class has a preprocess """
300-
return not self.preprocess.__annotations__.get("disabled_preprocess", False)
286+
return type(self).preprocess is not BaseNode.preprocess
301287

302-
@add_method_flag("disabled_postprocess")
303288
def postprocess(self, node):
304289
""" Gets invoked after the processChunk method for the node.
305290
@@ -311,7 +296,7 @@ def postprocess(self, node):
311296
@property
312297
def _hasPostprocess(self):
313298
""" Returns True if the class has a postprocess """
314-
return not self.postprocess.__annotations__.get("disabled_postprocess", False)
299+
return type(self).postprocess is not BaseNode.postprocess
315300

316301
def process(self, node):
317302
raise NotImplementedError(f'No process implementation on node: "{node.name}"')

meshroom/core/graph.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1775,31 +1775,24 @@ def executeGraph(graph, toNodes=None, forceCompute=False, forceStatus=False):
17751775
node.initStatusOnCompute(forceCompute)
17761776

17771777
for n, node in enumerate(nodes):
1778-
print(f"(executeGraph) {n} {node}")
17791778
try:
17801779
# If the node is in compatibility mode, it cannot be computed
17811780
if node.isCompatibilityNode:
17821781
logging.warning(f"{node.name} is in Compatibility Mode and cannot be computed: {node.issueDetails}.")
17831782
continue
17841783

1785-
print("A")
17861784
node.preprocess(forceCompute)
17871785
if not node._chunksCreated:
1788-
print("B create chunks")
17891786
node.createChunks()
17901787
multiChunks = len(node.chunks) > 1
1791-
print("C")
17921788
for c, chunk in enumerate(node.chunks):
1793-
print("D", c, chunk)
17941789
if multiChunks:
17951790
print('\n[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format(
17961791
node=n+1, nbNodes=len(nodes),
17971792
chunk=c+1, nbChunks=len(node.chunks), nodeName=node.nodeType))
17981793
else:
17991794
print(f'\n[{n + 1}/{len(nodes)}] {node.nodeType}')
1800-
print("E")
18011795
chunk.process(forceCompute)
1802-
print("F")
18031796
node.postprocess(forceCompute)
18041797
except Exception as exc:
18051798
logging.error(f"Error on node computation: {exc}")

meshroom/core/node.py

Lines changed: 66 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import uuid
1717
from collections import namedtuple, OrderedDict
1818
from enum import Enum, IntEnum, auto
19-
from typing import Callable, Optional, List, Dict
19+
from typing import Callable, Optional, List, Dict, Union
2020

2121
import meshroom
2222
from meshroom.common import Signal, Variant, Property, BaseObject, Slot, ListModel, DictModel
@@ -481,7 +481,7 @@ def __init__(self, node, range, parent=None):
481481
self.node.internalFolderChanged.connect(self.nodeFolderChanged)
482482

483483
def __repr__(self):
484-
return f"<NodeChunk {hex(id(self))}>"
484+
return f"<NodeChunk {self.name} ({self.getStatusName()}) {hex(id(self))}>"
485485

486486
@property
487487
def index(self):
@@ -1323,11 +1323,11 @@ def ram(self):
13231323
return self.nodeDesc.resolvedRam(self)
13241324

13251325
def hasStatus(self, status: Status):
1326+
if self.isInputNode:
1327+
return status == Status.INPUT
13261328
if not self._chunks or not self._chunksCreated:
1327-
if self.isInputNode:
1328-
return status == Status.INPUT
13291329
return status == Status.NONE
1330-
for chunk in self._chunks:
1330+
for chunk in self._allChunks:
13311331
if chunk.status.status != status:
13321332
return False
13331333
return True
@@ -1368,19 +1368,19 @@ def clearData(self):
13681368
@Slot(result=str)
13691369
def getStartDateTime(self):
13701370
""" Return the date (str) of the first running chunk """
1371-
dateTime = [chunk._status.startDateTime for chunk in self._chunks if chunk._status.status
1371+
dateTime = [chunk._status.startDateTime for chunk in self._allChunks if chunk._status.status
13721372
not in (Status.NONE, Status.SUBMITTED) and chunk._status.startDateTime != ""]
13731373
return min(dateTime) if len(dateTime) != 0 else ""
13741374

13751375
def isAlreadySubmitted(self):
13761376
if self._chunksCreated:
1377-
return any(c.isAlreadySubmitted() for c in self._chunks)
1377+
return any(c.isAlreadySubmitted() for c in self._allChunks)
13781378
else:
13791379
return self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING)
13801380

13811381
def isAlreadySubmittedOrFinished(self):
13821382
if self._chunksCreated:
1383-
return all(c.isAlreadySubmittedOrFinished() for c in self._chunks)
1383+
return all(c.isAlreadySubmittedOrFinished() for c in self._allChunks)
13841384
else:
13851385
return self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING, Status.SUCCESS)
13861386

@@ -1394,30 +1394,30 @@ def isSubmittedOrRunning(self):
13941394
return False
13951395
if not self.isAlreadySubmittedOrFinished():
13961396
return False
1397-
for chunk in self._chunks:
1397+
for chunk in self._allChunks:
13981398
if chunk.isRunning():
13991399
return True
14001400
return False
14011401

14021402
@Slot(result=bool)
14031403
def isRunning(self):
14041404
""" Return True if at least one chunk of this Node is running, False otherwise. """
1405-
return any(chunk.isRunning() for chunk in self._chunks)
1405+
return any(chunk.isRunning() for chunk in self._allChunks)
14061406

14071407
@Slot(result=bool)
14081408
def isFinishedOrRunning(self):
14091409
"""
14101410
Return True if all chunks of this Node is either finished or running, False
14111411
otherwise.
14121412
"""
1413-
if not self._chunks:
1413+
if not self._allChunks:
14141414
return False
1415-
return all(chunk.isFinishedOrRunning() for chunk in self._chunks)
1415+
return all(chunk.isFinishedOrRunning() for chunk in self._allChunks)
14161416

14171417
@Slot(result=bool)
14181418
def isPartiallyFinished(self):
14191419
""" Return True is at least one chunk of this Node is finished, False otherwise. """
1420-
return any(chunk.isFinished() for chunk in self._chunks)
1420+
return any(chunk.isFinished() for chunk in self._allChunks)
14211421

14221422
def isExtern(self):
14231423
"""
@@ -1436,7 +1436,7 @@ def isExtern(self):
14361436
elif self._nodeStatus.execMode == ExecMode.LOCAL and self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING):
14371437
return meshroom.core.sessionUid != self._nodeStatus.submitterSessionUid
14381438
return False
1439-
return any(chunk.isExtern() for chunk in self._chunks)
1439+
return any(chunk.isExtern() for chunk in self._allChunks)
14401440

14411441
@Slot()
14421442
def clearSubmittedChunks(self):
@@ -1448,28 +1448,30 @@ def clearSubmittedChunks(self):
14481448
This must be used with caution. This could lead to inconsistent node status
14491449
if the graph is still being computed.
14501450
"""
1451-
if self._chunksCreated:
1452-
for chunk in self._chunks:
1453-
if chunk.isAlreadySubmitted():
1454-
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE)
1455-
else:
1456-
if self.isAlreadySubmitted():
1457-
self.upgradeStatusTo(Status.NONE, ExecMode.NONE)
1451+
chunks: List[Union[BaseNode, NodeChunk]] = self._allChunks
1452+
if not self._chunksCreated:
1453+
chunks.append(self)
1454+
for chunk in chunks:
1455+
if chunk.isAlreadySubmitted():
1456+
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE)
14581457
self.globalStatusChanged.emit()
14591458

14601459
def clearLocallySubmittedChunks(self):
14611460
""" Reset all locally submitted chunks to Status.NONE. """
1462-
if self._chunksCreated:
1463-
for chunk in self._chunks:
1464-
if chunk.isAlreadySubmitted() and not chunk.isExtern():
1465-
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE)
1466-
else:
1467-
if self.isAlreadySubmitted() and not self.isExtern():
1468-
self.upgradeStatusTo(Status.NONE, ExecMode.NONE)
1461+
chunks: List[Union[BaseNode, NodeChunk]] = self._allChunks
1462+
if not self._chunksCreated:
1463+
chunks.append(self)
1464+
for chunk in chunks:
1465+
if chunk.isAlreadySubmitted() and not chunk.isExtern():
1466+
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE)
14691467
self.globalStatusChanged.emit()
14701468

14711469
def upgradeStatusTo(self, newStatus, execMode=None):
14721470
""" Upgrade node to the given status and save it on disk. """
1471+
if self.nodeDesc._hasPreprocess:
1472+
self._preprocessChunk.upgradeStatusTo(newStatus)
1473+
if self.nodeDesc._hasPostprocess:
1474+
self._postprocessChunk.upgradeStatusTo(newStatus)
14731475
if self._chunksCreated:
14741476
for chunk in self._chunks:
14751477
chunk.upgradeStatusTo(newStatus)
@@ -1486,7 +1488,7 @@ def upgradeStatusTo(self, newStatus, execMode=None):
14861488
self.globalStatusChanged.emit()
14871489

14881490
def updateStatisticsFromCache(self):
1489-
for chunk in self._chunks:
1491+
for chunk in self._allChunks:
14901492
chunk.updateStatisticsFromCache()
14911493

14921494
def _resetChunks(self):
@@ -1672,6 +1674,10 @@ def updateStatusFromCache(self):
16721674
logging.warning(f"Could not create chunks from cache: {e}")
16731675
return
16741676
s = self.globalStatus
1677+
if self.nodeDesc._hasPreprocess:
1678+
self._preprocessChunk.updateStatusFromCache()
1679+
if self.nodeDesc._hasPostprocess:
1680+
self._postprocessChunk.updateStatusFromCache()
16751681
if self._chunksCreated:
16761682
for chunk in self._chunks:
16771683
chunk.updateStatusFromCache()
@@ -1717,7 +1723,7 @@ def initStatusOnSubmit(self, forceCompute=False):
17171723
# `updateStatusFromCache()` will NOT recreate chunks from stale cache,
17181724
# allowing `node.createChunks()` to evaluate fresh chunk parameters.
17191725
self._nodeStatus.resetChunkInfo()
1720-
for chunk in self._chunks:
1726+
for chunk in self._allChunks:
17211727
if forceCompute or chunk._status.status != Status.SUCCESS:
17221728
hasChunkToLaunch = True
17231729
chunk._status.setNode(self)
@@ -1742,7 +1748,7 @@ def initStatusOnCompute(self, forceCompute=False):
17421748
# Same rationale as initStatusOnSubmit: clear stale chunk info
17431749
# so that the nodeStatus file does not contain outdated chunk setup.
17441750
self._nodeStatus.resetChunkInfo()
1745-
for chunk in self._chunks:
1751+
for chunk in self._allChunks:
17461752
if forceCompute or (chunk._status.status not in (Status.RUNNING, Status.SUCCESS)):
17471753
hasChunkToLaunch = True
17481754
chunk._status.setNode(self)
@@ -1884,6 +1890,10 @@ def endSequence(self):
18841890

18851891
def stopComputation(self):
18861892
""" Stop the computation of this node. """
1893+
if self.nodeDesc._hasPreprocess:
1894+
self._preprocessChunk.stopProcess()
1895+
if self.nodeDesc._hasPostprocess:
1896+
self._postprocessChunk.stopProcess()
18871897
if self._chunks:
18881898
for chunk in self._chunks.values():
18891899
chunk.stopProcess()
@@ -1906,12 +1916,12 @@ def getGlobalStatus(self):
19061916
if not self._chunksCreated:
19071917
# Get status from nodeStatus
19081918
return self._nodeStatus.status
1909-
if not self._chunks:
1919+
if not self._allChunks:
19101920
return Status.NONE
1911-
if len(self._chunks) == 1:
1912-
return self._chunks[0]._status.status
1913-
1914-
chunksStatus = [chunk._status.status for chunk in self._chunks]
1921+
if len(self._allChunks) == 1:
1922+
return self._allChunks[0]._status.status
1923+
1924+
chunksStatus = [chunk._status.status for chunk in self._allChunks]
19151925

19161926
anyOf = (Status.ERROR, Status.STOPPED, Status.KILLED,
19171927
Status.RUNNING, Status.SUBMITTED)
@@ -1928,11 +1938,11 @@ def getGlobalStatus(self):
19281938

19291939
@Slot(result=ChunkStatusData)
19301940
def getFusedStatus(self):
1931-
if not self._chunks:
1941+
if not self._allChunks:
19321942
return ChunkStatusData()
19331943
fusedStatus = ChunkStatusData()
1934-
fusedStatus.fromDict(self._chunks[0]._status.toDict())
1935-
for chunk in self._chunks[1:]:
1944+
fusedStatus.fromDict(self._allChunks[0]._status.toDict())
1945+
for chunk in self._allChunks[1:]:
19361946
fusedStatus.merge(chunk._status)
19371947
fusedStatus.status = self.getGlobalStatus()
19381948
return fusedStatus
@@ -1958,8 +1968,8 @@ def _isBackdropNode(self) -> bool:
19581968
def globalExecMode(self):
19591969
if not self._chunksCreated:
19601970
return self._nodeStatus.execMode.name
1961-
if len(self._chunks):
1962-
return self._chunks.at(0).getExecModeName()
1971+
if len(self._allChunks):
1972+
return self._allChunks[0].getExecModeName()
19631973
else:
19641974
return ExecMode.NONE
19651975

@@ -1974,7 +1984,17 @@ def _getJobName(self):
19741984

19751985
def getChunks(self) -> list[NodeChunk]:
19761986
return self._chunks
1977-
1987+
1988+
@property
1989+
def _allChunks(self) -> list[NodeChunk]:
1990+
chunks = []
1991+
if self.nodeDesc._hasPreprocess:
1992+
chunks.append(self._preprocessChunk)
1993+
chunks.extend([c for c in self._chunks])
1994+
if self.nodeDesc._hasPostprocess:
1995+
chunks.append(self._postprocessChunk)
1996+
return chunks
1997+
19781998
def getAllChunks(self):
19791999
allChunks = []
19802000
if self.nodeDesc._hasPreprocess:
@@ -2097,19 +2117,19 @@ def updateDuplicates(self, nodesPerUid):
20972117

20982118
def initFromThisSession(self) -> bool:
20992119
""" Check if the node was submitted from the current session """
2100-
if not self._chunksCreated or not self._chunks:
2120+
if not self._chunksCreated or not self._allChunks:
21012121
return meshroom.core.sessionUid == self._nodeStatus.submitterSessionUid
2102-
for chunk in self._chunks:
2122+
for chunk in self._allChunks:
21032123
# Technically the check on chunk._status.computeSessionUid is useless
21042124
if meshroom.core.sessionUid not in (chunk._status.computeSessionUid, self._nodeStatus.submitterSessionUid):
21052125
return False
21062126
return True
21072127

21082128
def isMainNode(self) -> bool:
21092129
""" In case of a node with duplicates, we check that the node is the one driving the computation. """
2110-
if len(self._chunks) == 0:
2130+
if len(self._allChunks) == 0:
21112131
return True
2112-
firstChunk = self._chunks.at(0)
2132+
firstChunk = self._allChunks[0]
21132133
if not firstChunk.statusNodeName:
21142134
# If nothing is declared, anyone could become the main (if there are duplicates).
21152135
return True

0 commit comments

Comments
 (0)