Skip to content

Commit 4e018ee

Browse files
committed
[core] Apply corrections to handle statuses correctly with pre/post process
1 parent c340ca9 commit 4e018ee

8 files changed

Lines changed: 301 additions & 193 deletions

File tree

meshroom/core/desc/node.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,19 +61,6 @@ class MrNodeType(enum.Enum):
6161
BACKDROP = enum.auto()
6262

6363

64-
def add_method_flag(methodName: str):
65-
""" Decorator used to add a flag to a method. A flag is simply a variable set to True.
66-
It will be used in this context to track whether a method is overloaded or not.
67-
68-
Args:
69-
methodName (str): Name of the annotation to set
70-
"""
71-
def wrapper(method):
72-
method.__annotations__[methodName] = True
73-
return method
74-
return wrapper
75-
76-
7764
class InternalAttributesFactory:
7865
BASIC = [
7966
StringParam(
@@ -266,7 +253,6 @@ def postUpdate(cls, node):
266253
"""
267254
pass
268255

269-
@add_method_flag("disabled_preprocess")
270256
def preprocess(self, node):
271257
""" Gets invoked just before the processChunk method for the node.
272258
@@ -278,9 +264,8 @@ def preprocess(self, node):
278264
@property
279265
def _hasPreprocess(self):
280266
""" Returns True if the class has a preprocess """
281-
return not self.preprocess.__annotations__.get("disabled_preprocess", False)
267+
return type(self).preprocess is not BaseNode.preprocess
282268

283-
@add_method_flag("disabled_postprocess")
284269
def postprocess(self, node):
285270
""" Gets invoked after the processChunk method for the node.
286271
@@ -292,7 +277,7 @@ def postprocess(self, node):
292277
@property
293278
def _hasPostprocess(self):
294279
""" Returns True if the class has a postprocess """
295-
return not self.postprocess.__annotations__.get("disabled_postprocess", False)
280+
return type(self).postprocess is not BaseNode.postprocess
296281

297282
def process(self, node):
298283
raise NotImplementedError(f'No process implementation on node: "{node.name}"')

meshroom/core/graph.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1775,31 +1775,24 @@ def executeGraph(graph, toNodes=None, forceCompute=False, forceStatus=False):
17751775
node.initStatusOnCompute(forceCompute)
17761776

17771777
for n, node in enumerate(nodes):
1778-
print(f"(executeGraph) {n} {node}")
17791778
try:
17801779
# If the node is in compatibility mode, it cannot be computed
17811780
if node.isCompatibilityNode:
17821781
logging.warning(f"{node.name} is in Compatibility Mode and cannot be computed: {node.issueDetails}.")
17831782
continue
17841783

1785-
print("A")
17861784
node.preprocess(forceCompute)
17871785
if not node._chunksCreated:
1788-
print("B create chunks")
17891786
node.createChunks()
17901787
multiChunks = len(node.chunks) > 1
1791-
print("C")
17921788
for c, chunk in enumerate(node.chunks):
1793-
print("D", c, chunk)
17941789
if multiChunks:
17951790
print('\n[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format(
17961791
node=n+1, nbNodes=len(nodes),
17971792
chunk=c+1, nbChunks=len(node.chunks), nodeName=node.nodeType))
17981793
else:
17991794
print(f'\n[{n + 1}/{len(nodes)}] {node.nodeType}')
1800-
print("E")
18011795
chunk.process(forceCompute)
1802-
print("F")
18031796
node.postprocess(forceCompute)
18041797
except Exception as exc:
18051798
logging.error(f"Error on node computation: {exc}")

meshroom/core/node.py

Lines changed: 66 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import uuid
1717
from collections import namedtuple, OrderedDict
1818
from enum import Enum, IntEnum, auto
19-
from typing import Callable, Optional, List, Dict
19+
from typing import Callable, Optional, List, Dict, Union
2020

2121
import meshroom
2222
from meshroom.common import Signal, Variant, Property, BaseObject, Slot, ListModel, DictModel
@@ -474,7 +474,7 @@ def __init__(self, node, range, parent=None):
474474
self.node.internalFolderChanged.connect(self.nodeFolderChanged)
475475

476476
def __repr__(self):
477-
return f"<NodeChunk {hex(id(self))}>"
477+
return f"<NodeChunk {self.name} ({self.getStatusName()}) {hex(id(self))}>"
478478

479479
@property
480480
def index(self):
@@ -1315,11 +1315,11 @@ def ram(self):
13151315
return self.nodeDesc.resolvedRam(self)
13161316

13171317
def hasStatus(self, status: Status):
1318+
if self.isInputNode:
1319+
return status == Status.INPUT
13181320
if not self._chunks or not self._chunksCreated:
1319-
if self.isInputNode:
1320-
return status == Status.INPUT
13211321
return status == Status.NONE
1322-
for chunk in self._chunks:
1322+
for chunk in self._allChunks:
13231323
if chunk.status.status != status:
13241324
return False
13251325
return True
@@ -1360,19 +1360,19 @@ def clearData(self):
13601360
@Slot(result=str)
13611361
def getStartDateTime(self):
13621362
""" Return the date (str) of the first running chunk """
1363-
dateTime = [chunk._status.startDateTime for chunk in self._chunks if chunk._status.status
1363+
dateTime = [chunk._status.startDateTime for chunk in self._allChunks if chunk._status.status
13641364
not in (Status.NONE, Status.SUBMITTED) and chunk._status.startDateTime != ""]
13651365
return min(dateTime) if len(dateTime) != 0 else ""
13661366

13671367
def isAlreadySubmitted(self):
13681368
if self._chunksCreated:
1369-
return any(c.isAlreadySubmitted() for c in self._chunks)
1369+
return any(c.isAlreadySubmitted() for c in self._allChunks)
13701370
else:
13711371
return self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING)
13721372

13731373
def isAlreadySubmittedOrFinished(self):
13741374
if self._chunksCreated:
1375-
return all(c.isAlreadySubmittedOrFinished() for c in self._chunks)
1375+
return all(c.isAlreadySubmittedOrFinished() for c in self._allChunks)
13761376
else:
13771377
return self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING, Status.SUCCESS)
13781378

@@ -1386,30 +1386,30 @@ def isSubmittedOrRunning(self):
13861386
return False
13871387
if not self.isAlreadySubmittedOrFinished():
13881388
return False
1389-
for chunk in self._chunks:
1389+
for chunk in self._allChunks:
13901390
if chunk.isRunning():
13911391
return True
13921392
return False
13931393

13941394
@Slot(result=bool)
13951395
def isRunning(self):
13961396
""" Return True if at least one chunk of this Node is running, False otherwise. """
1397-
return any(chunk.isRunning() for chunk in self._chunks)
1397+
return any(chunk.isRunning() for chunk in self._allChunks)
13981398

13991399
@Slot(result=bool)
14001400
def isFinishedOrRunning(self):
14011401
"""
14021402
Return True if all chunks of this Node is either finished or running, False
14031403
otherwise.
14041404
"""
1405-
if not self._chunks:
1405+
if not self._allChunks:
14061406
return False
1407-
return all(chunk.isFinishedOrRunning() for chunk in self._chunks)
1407+
return all(chunk.isFinishedOrRunning() for chunk in self._allChunks)
14081408

14091409
@Slot(result=bool)
14101410
def isPartiallyFinished(self):
14111411
""" Return True is at least one chunk of this Node is finished, False otherwise. """
1412-
return any(chunk.isFinished() for chunk in self._chunks)
1412+
return any(chunk.isFinished() for chunk in self._allChunks)
14131413

14141414
def isExtern(self):
14151415
"""
@@ -1428,7 +1428,7 @@ def isExtern(self):
14281428
elif self._nodeStatus.execMode == ExecMode.LOCAL and self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING):
14291429
return meshroom.core.sessionUid != self._nodeStatus.submitterSessionUid
14301430
return False
1431-
return any(chunk.isExtern() for chunk in self._chunks)
1431+
return any(chunk.isExtern() for chunk in self._allChunks)
14321432

14331433
@Slot()
14341434
def clearSubmittedChunks(self):
@@ -1440,28 +1440,30 @@ def clearSubmittedChunks(self):
14401440
This must be used with caution. This could lead to inconsistent node status
14411441
if the graph is still being computed.
14421442
"""
1443-
if self._chunksCreated:
1444-
for chunk in self._chunks:
1445-
if chunk.isAlreadySubmitted():
1446-
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE)
1447-
else:
1448-
if self.isAlreadySubmitted():
1449-
self.upgradeStatusTo(Status.NONE, ExecMode.NONE)
1443+
chunks: List[Union[BaseNode, NodeChunk]] = self._allChunks
1444+
if not self._chunksCreated:
1445+
chunks.append(self)
1446+
for chunk in chunks:
1447+
if chunk.isAlreadySubmitted():
1448+
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE)
14501449
self.globalStatusChanged.emit()
14511450

14521451
def clearLocallySubmittedChunks(self):
14531452
""" Reset all locally submitted chunks to Status.NONE. """
1454-
if self._chunksCreated:
1455-
for chunk in self._chunks:
1456-
if chunk.isAlreadySubmitted() and not chunk.isExtern():
1457-
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE)
1458-
else:
1459-
if self.isAlreadySubmitted() and not self.isExtern():
1460-
self.upgradeStatusTo(Status.NONE, ExecMode.NONE)
1453+
chunks: List[Union[BaseNode, NodeChunk]] = self._allChunks
1454+
if not self._chunksCreated:
1455+
chunks.append(self)
1456+
for chunk in chunks:
1457+
if chunk.isAlreadySubmitted() and not chunk.isExtern():
1458+
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE)
14611459
self.globalStatusChanged.emit()
14621460

14631461
def upgradeStatusTo(self, newStatus, execMode=None):
14641462
""" Upgrade node to the given status and save it on disk. """
1463+
if self.nodeDesc._hasPreprocess:
1464+
self._preprocessChunk.upgradeStatusTo(newStatus)
1465+
if self.nodeDesc._hasPostprocess:
1466+
self._postprocessChunk.upgradeStatusTo(newStatus)
14651467
if self._chunksCreated:
14661468
for chunk in self._chunks:
14671469
chunk.upgradeStatusTo(newStatus)
@@ -1478,7 +1480,7 @@ def upgradeStatusTo(self, newStatus, execMode=None):
14781480
self.globalStatusChanged.emit()
14791481

14801482
def updateStatisticsFromCache(self):
1481-
for chunk in self._chunks:
1483+
for chunk in self._allChunks:
14821484
chunk.updateStatisticsFromCache()
14831485

14841486
def _resetChunks(self):
@@ -1653,6 +1655,10 @@ def updateStatusFromCache(self):
16531655
logging.warning(f"Could not create chunks from cache: {e}")
16541656
return
16551657
s = self.globalStatus
1658+
if self.nodeDesc._hasPreprocess:
1659+
self._preprocessChunk.updateStatusFromCache()
1660+
if self.nodeDesc._hasPostprocess:
1661+
self._postprocessChunk.updateStatusFromCache()
16561662
if self._chunksCreated:
16571663
for chunk in self._chunks:
16581664
chunk.updateStatusFromCache()
@@ -1687,7 +1693,7 @@ def initStatusOnSubmit(self, forceCompute=False):
16871693
hasChunkToLaunch = False
16881694
if not self._chunksCreated:
16891695
hasChunkToLaunch = True
1690-
for chunk in self._chunks:
1696+
for chunk in self._allChunks:
16911697
if forceCompute or chunk._status.status != Status.SUCCESS:
16921698
hasChunkToLaunch = True
16931699
chunk._status.setNode(self)
@@ -1709,7 +1715,7 @@ def initStatusOnCompute(self, forceCompute=False):
17091715
hasChunkToLaunch = False
17101716
if not self._chunksCreated:
17111717
hasChunkToLaunch = True
1712-
for chunk in self._chunks:
1718+
for chunk in self._allChunks:
17131719
if forceCompute or (chunk._status.status not in (Status.RUNNING, Status.SUCCESS)):
17141720
hasChunkToLaunch = True
17151721
chunk._status.setNode(self)
@@ -1851,6 +1857,10 @@ def endSequence(self):
18511857

18521858
def stopComputation(self):
18531859
""" Stop the computation of this node. """
1860+
if self.nodeDesc._hasPreprocess:
1861+
self._preprocessChunk.stopProcess()
1862+
if self.nodeDesc._hasPostprocess:
1863+
self._postprocessChunk.stopProcess()
18541864
if self._chunks:
18551865
for chunk in self._chunks.values():
18561866
chunk.stopProcess()
@@ -1873,12 +1883,12 @@ def getGlobalStatus(self):
18731883
if not self._chunksCreated:
18741884
# Get status from nodeStatus
18751885
return self._nodeStatus.status
1876-
if not self._chunks:
1886+
if not self._allChunks:
18771887
return Status.NONE
1878-
if len(self._chunks) == 1:
1879-
return self._chunks[0]._status.status
1880-
1881-
chunksStatus = [chunk._status.status for chunk in self._chunks]
1888+
if len(self._allChunks) == 1:
1889+
return self._allChunks[0]._status.status
1890+
1891+
chunksStatus = [chunk._status.status for chunk in self._allChunks]
18821892

18831893
anyOf = (Status.ERROR, Status.STOPPED, Status.KILLED,
18841894
Status.RUNNING, Status.SUBMITTED)
@@ -1895,11 +1905,11 @@ def getGlobalStatus(self):
18951905

18961906
@Slot(result=ChunkStatusData)
18971907
def getFusedStatus(self):
1898-
if not self._chunks:
1908+
if not self._allChunks:
18991909
return ChunkStatusData()
19001910
fusedStatus = ChunkStatusData()
1901-
fusedStatus.fromDict(self._chunks[0]._status.toDict())
1902-
for chunk in self._chunks[1:]:
1911+
fusedStatus.fromDict(self._allChunks[0]._status.toDict())
1912+
for chunk in self._allChunks[1:]:
19031913
fusedStatus.merge(chunk._status)
19041914
fusedStatus.status = self.getGlobalStatus()
19051915
return fusedStatus
@@ -1925,8 +1935,8 @@ def _isBackdropNode(self) -> bool:
19251935
def globalExecMode(self):
19261936
if not self._chunksCreated:
19271937
return self._nodeStatus.execMode.name
1928-
if len(self._chunks):
1929-
return self._chunks.at(0).getExecModeName()
1938+
if len(self._allChunks):
1939+
return self._allChunks[0].getExecModeName()
19301940
else:
19311941
return ExecMode.NONE
19321942

@@ -1941,7 +1951,17 @@ def _getJobName(self):
19411951

19421952
def getChunks(self) -> list[NodeChunk]:
19431953
return self._chunks
1944-
1954+
1955+
@property
1956+
def _allChunks(self) -> list[NodeChunk]:
1957+
chunks = []
1958+
if self.nodeDesc._hasPreprocess:
1959+
chunks.append(self._preprocessChunk)
1960+
chunks.extend([c for c in self._chunks])
1961+
if self.nodeDesc._hasPostprocess:
1962+
chunks.append(self._postprocessChunk)
1963+
return chunks
1964+
19451965
def getAllChunks(self):
19461966
allChunks = []
19471967
if self.nodeDesc._hasPreprocess:
@@ -2064,19 +2084,19 @@ def updateDuplicates(self, nodesPerUid):
20642084

20652085
def initFromThisSession(self) -> bool:
20662086
""" Check if the node was submitted from the current session """
2067-
if not self._chunksCreated or not self._chunks:
2087+
if not self._chunksCreated or not self._allChunks:
20682088
return meshroom.core.sessionUid == self._nodeStatus.submitterSessionUid
2069-
for chunk in self._chunks:
2089+
for chunk in self._allChunks:
20702090
# Technically the check on chunk._status.computeSessionUid is useless
20712091
if meshroom.core.sessionUid not in (chunk._status.computeSessionUid, self._nodeStatus.submitterSessionUid):
20722092
return False
20732093
return True
20742094

20752095
def isMainNode(self) -> bool:
20762096
""" In case of a node with duplicates, we check that the node is the one driving the computation. """
2077-
if len(self._chunks) == 0:
2097+
if len(self._allChunks) == 0:
20782098
return True
2079-
firstChunk = self._chunks.at(0)
2099+
firstChunk = self._allChunks[0]
20802100
if not firstChunk.statusNodeName:
20812101
# If nothing is declared, anyone could become the main (if there are duplicates).
20822102
return True

0 commit comments

Comments
 (0)