Skip to content

Commit d77f450

Browse files
committed
[core] Apply corrections to handle statuses correctly with pre/post process
1 parent fc0d06c commit d77f450

8 files changed

Lines changed: 301 additions & 193 deletions

File tree

meshroom/core/desc/node.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,19 +61,6 @@ class MrNodeType(enum.Enum):
6161
BACKDROP = enum.auto()
6262

6363

64-
def add_method_flag(methodName: str):
65-
""" Decorator used to add a flag to a method. A flag is simply a variable set to True.
66-
It will be used in this context to track whether a method is overloaded or not.
67-
68-
Args:
69-
methodName (str): Name of the annotation to set
70-
"""
71-
def wrapper(method):
72-
method.__annotations__[methodName] = True
73-
return method
74-
return wrapper
75-
76-
7764
class InternalAttributesFactory:
7865
BASIC = [
7966
StringParam(
@@ -267,7 +254,6 @@ def postUpdate(cls, node):
267254
"""
268255
pass
269256

270-
@add_method_flag("disabled_preprocess")
271257
def preprocess(self, node):
272258
""" Gets invoked just before the processChunk method for the node.
273259
@@ -279,9 +265,8 @@ def preprocess(self, node):
279265
@property
280266
def _hasPreprocess(self):
281267
""" Returns True if the class has a preprocess """
282-
return not self.preprocess.__annotations__.get("disabled_preprocess", False)
268+
return type(self).preprocess is not BaseNode.preprocess
283269

284-
@add_method_flag("disabled_postprocess")
285270
def postprocess(self, node):
286271
""" Gets invoked after the processChunk method for the node.
287272
@@ -293,7 +278,7 @@ def postprocess(self, node):
293278
@property
294279
def _hasPostprocess(self):
295280
""" Returns True if the class has a postprocess """
296-
return not self.postprocess.__annotations__.get("disabled_postprocess", False)
281+
return type(self).postprocess is not BaseNode.postprocess
297282

298283
def process(self, node):
299284
raise NotImplementedError(f'No process implementation on node: "{node.name}"')

meshroom/core/graph.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1775,31 +1775,24 @@ def executeGraph(graph, toNodes=None, forceCompute=False, forceStatus=False):
17751775
node.initStatusOnCompute(forceCompute)
17761776

17771777
for n, node in enumerate(nodes):
1778-
print(f"(executeGraph) {n} {node}")
17791778
try:
17801779
# If the node is in compatibility mode, it cannot be computed
17811780
if node.isCompatibilityNode:
17821781
logging.warning(f"{node.name} is in Compatibility Mode and cannot be computed: {node.issueDetails}.")
17831782
continue
17841783

1785-
print("A")
17861784
node.preprocess(forceCompute)
17871785
if not node._chunksCreated:
1788-
print("B create chunks")
17891786
node.createChunks()
17901787
multiChunks = len(node.chunks) > 1
1791-
print("C")
17921788
for c, chunk in enumerate(node.chunks):
1793-
print("D", c, chunk)
17941789
if multiChunks:
17951790
print('\n[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format(
17961791
node=n+1, nbNodes=len(nodes),
17971792
chunk=c+1, nbChunks=len(node.chunks), nodeName=node.nodeType))
17981793
else:
17991794
print(f'\n[{n + 1}/{len(nodes)}] {node.nodeType}')
1800-
print("E")
18011795
chunk.process(forceCompute)
1802-
print("F")
18031796
node.postprocess(forceCompute)
18041797
except Exception as exc:
18051798
logging.error(f"Error on node computation: {exc}")

meshroom/core/node.py

Lines changed: 66 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import uuid
1717
from collections import namedtuple, OrderedDict
1818
from enum import Enum, IntEnum, auto
19-
from typing import Callable, Optional, List, Dict
19+
from typing import Callable, Optional, List, Dict, Union
2020

2121
import meshroom
2222
from meshroom.common import Signal, Variant, Property, BaseObject, Slot, ListModel, DictModel
@@ -474,7 +474,7 @@ def __init__(self, node, range, parent=None):
474474
self.node.internalFolderChanged.connect(self.nodeFolderChanged)
475475

476476
def __repr__(self):
477-
return f"<NodeChunk {hex(id(self))}>"
477+
return f"<NodeChunk {self.name} ({self.getStatusName()}) {hex(id(self))}>"
478478

479479
@property
480480
def index(self):
@@ -1316,11 +1316,11 @@ def ram(self):
13161316
return self.nodeDesc.resolvedRam(self)
13171317

13181318
def hasStatus(self, status: Status):
1319+
if self.isInputNode:
1320+
return status == Status.INPUT
13191321
if not self._chunks or not self._chunksCreated:
1320-
if self.isInputNode:
1321-
return status == Status.INPUT
13221322
return status == Status.NONE
1323-
for chunk in self._chunks:
1323+
for chunk in self._allChunks:
13241324
if chunk.status.status != status:
13251325
return False
13261326
return True
@@ -1361,19 +1361,19 @@ def clearData(self):
13611361
@Slot(result=str)
13621362
def getStartDateTime(self):
13631363
""" Return the date (str) of the first running chunk """
1364-
dateTime = [chunk._status.startDateTime for chunk in self._chunks if chunk._status.status
1364+
dateTime = [chunk._status.startDateTime for chunk in self._allChunks if chunk._status.status
13651365
not in (Status.NONE, Status.SUBMITTED) and chunk._status.startDateTime != ""]
13661366
return min(dateTime) if len(dateTime) != 0 else ""
13671367

13681368
def isAlreadySubmitted(self):
13691369
if self._chunksCreated:
1370-
return any(c.isAlreadySubmitted() for c in self._chunks)
1370+
return any(c.isAlreadySubmitted() for c in self._allChunks)
13711371
else:
13721372
return self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING)
13731373

13741374
def isAlreadySubmittedOrFinished(self):
13751375
if self._chunksCreated:
1376-
return all(c.isAlreadySubmittedOrFinished() for c in self._chunks)
1376+
return all(c.isAlreadySubmittedOrFinished() for c in self._allChunks)
13771377
else:
13781378
return self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING, Status.SUCCESS)
13791379

@@ -1387,30 +1387,30 @@ def isSubmittedOrRunning(self):
13871387
return False
13881388
if not self.isAlreadySubmittedOrFinished():
13891389
return False
1390-
for chunk in self._chunks:
1390+
for chunk in self._allChunks:
13911391
if chunk.isRunning():
13921392
return True
13931393
return False
13941394

13951395
@Slot(result=bool)
13961396
def isRunning(self):
13971397
""" Return True if at least one chunk of this Node is running, False otherwise. """
1398-
return any(chunk.isRunning() for chunk in self._chunks)
1398+
return any(chunk.isRunning() for chunk in self._allChunks)
13991399

14001400
@Slot(result=bool)
14011401
def isFinishedOrRunning(self):
14021402
"""
14031403
Return True if all chunks of this Node is either finished or running, False
14041404
otherwise.
14051405
"""
1406-
if not self._chunks:
1406+
if not self._allChunks:
14071407
return False
1408-
return all(chunk.isFinishedOrRunning() for chunk in self._chunks)
1408+
return all(chunk.isFinishedOrRunning() for chunk in self._allChunks)
14091409

14101410
@Slot(result=bool)
14111411
def isPartiallyFinished(self):
14121412
""" Return True is at least one chunk of this Node is finished, False otherwise. """
1413-
return any(chunk.isFinished() for chunk in self._chunks)
1413+
return any(chunk.isFinished() for chunk in self._allChunks)
14141414

14151415
def isExtern(self):
14161416
"""
@@ -1429,7 +1429,7 @@ def isExtern(self):
14291429
elif self._nodeStatus.execMode == ExecMode.LOCAL and self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING):
14301430
return meshroom.core.sessionUid != self._nodeStatus.submitterSessionUid
14311431
return False
1432-
return any(chunk.isExtern() for chunk in self._chunks)
1432+
return any(chunk.isExtern() for chunk in self._allChunks)
14331433

14341434
@Slot()
14351435
def clearSubmittedChunks(self):
@@ -1441,28 +1441,30 @@ def clearSubmittedChunks(self):
14411441
This must be used with caution. This could lead to inconsistent node status
14421442
if the graph is still being computed.
14431443
"""
1444-
if self._chunksCreated:
1445-
for chunk in self._chunks:
1446-
if chunk.isAlreadySubmitted():
1447-
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE)
1448-
else:
1449-
if self.isAlreadySubmitted():
1450-
self.upgradeStatusTo(Status.NONE, ExecMode.NONE)
1444+
chunks: List[Union[BaseNode, NodeChunk]] = self._allChunks
1445+
if not self._chunksCreated:
1446+
chunks.append(self)
1447+
for chunk in chunks:
1448+
if chunk.isAlreadySubmitted():
1449+
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE)
14511450
self.globalStatusChanged.emit()
14521451

14531452
def clearLocallySubmittedChunks(self):
14541453
""" Reset all locally submitted chunks to Status.NONE. """
1455-
if self._chunksCreated:
1456-
for chunk in self._chunks:
1457-
if chunk.isAlreadySubmitted() and not chunk.isExtern():
1458-
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE)
1459-
else:
1460-
if self.isAlreadySubmitted() and not self.isExtern():
1461-
self.upgradeStatusTo(Status.NONE, ExecMode.NONE)
1454+
chunks: List[Union[BaseNode, NodeChunk]] = self._allChunks
1455+
if not self._chunksCreated:
1456+
chunks.append(self)
1457+
for chunk in chunks:
1458+
if chunk.isAlreadySubmitted() and not chunk.isExtern():
1459+
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE)
14621460
self.globalStatusChanged.emit()
14631461

14641462
def upgradeStatusTo(self, newStatus, execMode=None):
14651463
""" Upgrade node to the given status and save it on disk. """
1464+
if self.nodeDesc._hasPreprocess:
1465+
self._preprocessChunk.upgradeStatusTo(newStatus)
1466+
if self.nodeDesc._hasPostprocess:
1467+
self._postprocessChunk.upgradeStatusTo(newStatus)
14661468
if self._chunksCreated:
14671469
for chunk in self._chunks:
14681470
chunk.upgradeStatusTo(newStatus)
@@ -1479,7 +1481,7 @@ def upgradeStatusTo(self, newStatus, execMode=None):
14791481
self.globalStatusChanged.emit()
14801482

14811483
def updateStatisticsFromCache(self):
1482-
for chunk in self._chunks:
1484+
for chunk in self._allChunks:
14831485
chunk.updateStatisticsFromCache()
14841486

14851487
def _resetChunks(self):
@@ -1677,6 +1679,10 @@ def updateStatusFromCache(self):
16771679
logging.warning(f"Could not create chunks from cache: {e}")
16781680
return
16791681
s = self.globalStatus
1682+
if self.nodeDesc._hasPreprocess:
1683+
self._preprocessChunk.updateStatusFromCache()
1684+
if self.nodeDesc._hasPostprocess:
1685+
self._postprocessChunk.updateStatusFromCache()
16801686
if self._chunksCreated:
16811687
for chunk in self._chunks:
16821688
chunk.updateStatusFromCache()
@@ -1711,7 +1717,7 @@ def initStatusOnSubmit(self, forceCompute=False):
17111717
hasChunkToLaunch = False
17121718
if not self._chunksCreated:
17131719
hasChunkToLaunch = True
1714-
for chunk in self._chunks:
1720+
for chunk in self._allChunks:
17151721
if forceCompute or chunk._status.status != Status.SUCCESS:
17161722
hasChunkToLaunch = True
17171723
chunk._status.setNode(self)
@@ -1733,7 +1739,7 @@ def initStatusOnCompute(self, forceCompute=False):
17331739
hasChunkToLaunch = False
17341740
if not self._chunksCreated:
17351741
hasChunkToLaunch = True
1736-
for chunk in self._chunks:
1742+
for chunk in self._allChunks:
17371743
if forceCompute or (chunk._status.status not in (Status.RUNNING, Status.SUCCESS)):
17381744
hasChunkToLaunch = True
17391745
chunk._status.setNode(self)
@@ -1875,6 +1881,10 @@ def endSequence(self):
18751881

18761882
def stopComputation(self):
18771883
""" Stop the computation of this node. """
1884+
if self.nodeDesc._hasPreprocess:
1885+
self._preprocessChunk.stopProcess()
1886+
if self.nodeDesc._hasPostprocess:
1887+
self._postprocessChunk.stopProcess()
18781888
if self._chunks:
18791889
for chunk in self._chunks.values():
18801890
chunk.stopProcess()
@@ -1897,12 +1907,12 @@ def getGlobalStatus(self):
18971907
if not self._chunksCreated:
18981908
# Get status from nodeStatus
18991909
return self._nodeStatus.status
1900-
if not self._chunks:
1910+
if not self._allChunks:
19011911
return Status.NONE
1902-
if len(self._chunks) == 1:
1903-
return self._chunks[0]._status.status
1904-
1905-
chunksStatus = [chunk._status.status for chunk in self._chunks]
1912+
if len(self._allChunks) == 1:
1913+
return self._allChunks[0]._status.status
1914+
1915+
chunksStatus = [chunk._status.status for chunk in self._allChunks]
19061916

19071917
anyOf = (Status.ERROR, Status.STOPPED, Status.KILLED,
19081918
Status.RUNNING, Status.SUBMITTED)
@@ -1919,11 +1929,11 @@ def getGlobalStatus(self):
19191929

19201930
@Slot(result=ChunkStatusData)
19211931
def getFusedStatus(self):
1922-
if not self._chunks:
1932+
if not self._allChunks:
19231933
return ChunkStatusData()
19241934
fusedStatus = ChunkStatusData()
1925-
fusedStatus.fromDict(self._chunks[0]._status.toDict())
1926-
for chunk in self._chunks[1:]:
1935+
fusedStatus.fromDict(self._allChunks[0]._status.toDict())
1936+
for chunk in self._allChunks[1:]:
19271937
fusedStatus.merge(chunk._status)
19281938
fusedStatus.status = self.getGlobalStatus()
19291939
return fusedStatus
@@ -1949,8 +1959,8 @@ def _isBackdropNode(self) -> bool:
19491959
def globalExecMode(self):
19501960
if not self._chunksCreated:
19511961
return self._nodeStatus.execMode.name
1952-
if len(self._chunks):
1953-
return self._chunks.at(0).getExecModeName()
1962+
if len(self._allChunks):
1963+
return self._allChunks[0].getExecModeName()
19541964
else:
19551965
return ExecMode.NONE
19561966

@@ -1965,7 +1975,17 @@ def _getJobName(self):
19651975

19661976
def getChunks(self) -> list[NodeChunk]:
19671977
return self._chunks
1968-
1978+
1979+
@property
1980+
def _allChunks(self) -> list[NodeChunk]:
1981+
chunks = []
1982+
if self.nodeDesc._hasPreprocess:
1983+
chunks.append(self._preprocessChunk)
1984+
chunks.extend([c for c in self._chunks])
1985+
if self.nodeDesc._hasPostprocess:
1986+
chunks.append(self._postprocessChunk)
1987+
return chunks
1988+
19691989
def getAllChunks(self):
19701990
allChunks = []
19711991
if self.nodeDesc._hasPreprocess:
@@ -2088,19 +2108,19 @@ def updateDuplicates(self, nodesPerUid):
20882108

20892109
def initFromThisSession(self) -> bool:
20902110
""" Check if the node was submitted from the current session """
2091-
if not self._chunksCreated or not self._chunks:
2111+
if not self._chunksCreated or not self._allChunks:
20922112
return meshroom.core.sessionUid == self._nodeStatus.submitterSessionUid
2093-
for chunk in self._chunks:
2113+
for chunk in self._allChunks:
20942114
# Technically the check on chunk._status.computeSessionUid is useless
20952115
if meshroom.core.sessionUid not in (chunk._status.computeSessionUid, self._nodeStatus.submitterSessionUid):
20962116
return False
20972117
return True
20982118

20992119
def isMainNode(self) -> bool:
21002120
""" In case of a node with duplicates, we check that the node is the one driving the computation. """
2101-
if len(self._chunks) == 0:
2121+
if len(self._allChunks) == 0:
21022122
return True
2103-
firstChunk = self._chunks.at(0)
2123+
firstChunk = self._allChunks[0]
21042124
if not firstChunk.statusNodeName:
21052125
# If nothing is declared, anyone could become the main (if there are duplicates).
21062126
return True

0 commit comments

Comments
 (0)