Skip to content

Commit 02c55cb

Browse files
committed
[core] Start updating taskmanager and submitter for new chunk process
1 parent 3049a6d commit 02c55cb

File tree

9 files changed

+397
-302
lines changed

9 files changed

+397
-302
lines changed

bin/meshroom_createChunks

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
11
#!/usr/bin/env python
2+
3+
"""
4+
This is a script used to wrap the process of processing a node on the farm
5+
It will handle chunk creation and create all the jobs for these chunks
6+
If the submitter cannot create chunks, then it will process the chunks serially
7+
in the current process
8+
"""
9+
210
import argparse
311
import logging
412
import os
513
import sys
6-
714
try:
815
import meshroom
916
except Exception:
@@ -25,6 +32,9 @@ from meshroom.core.node import Status
2532
parser = argparse.ArgumentParser(description='Execute a Graph of processes.')
2633
parser.add_argument('graphFile', metavar='GRAPHFILE.mg', type=str,
2734
help='Filepath to a graph file.')
35+
36+
parser.add_argument('--submitter', type=str, required=True,
37+
help='Name of the submitter used to create the job.')
2838
parser.add_argument('--node', metavar='NODE_NAME', type=str, required=True,
2939
help='Process the node. It will generate an error if the dependencies are not already computed.')
3040
parser.add_argument('--inCurrentEnv', help='Execute process in current env without creating a dedicated runtime environment.',
@@ -70,47 +80,31 @@ submittedStatuses = [Status.RUNNING]
7080

7181

7282
# Find job
73-
submitter, job = None, None
83+
submitter = None
7484
# It's required if we want to spool chunks on different machines
75-
jobInfos = node._nodeStatus.jobInfos
76-
submitterName, jid = jobInfos.get("submitterName"), jobInfos.get("jid")
7785
for subName, sub in submitters.items():
78-
if not sub._options.includes(SubmitterOptionsEnum.RETRIEVE):
79-
continue
80-
if submitterName == subName:
86+
if args.submitter == subName:
8187
submitter = sub
82-
job = jobManager.retreiveJob(sub, jid)
8388
break
8489

8590
if not (hasattr(node, '_chunksCreated') and node._chunksCreated):
8691
# Create node chunks
8792
# Once created we don't have to do it again even if we relaunch the job
8893
node._createChunks()
89-
# TODO
90-
# Notify that the chunk is submitted
91-
# for chunk in node._chunks:
92-
# chunk._status.initExternSubmit()
94+
# Set the chunks statuses
95+
node.initStatusOnSubmit()
9396

9497
# Get chunks to process in the current process
9598
chunksToProcess = []
96-
if job:
97-
if not job.submitterOptions.includes(SubmitterOptionsEnum.EDIT_TASKS):
99+
if submitter:
100+
if not submitter._options.includes(SubmitterOptionsEnum.EDIT_TASKS):
98101
chunksToProcess = node.chunks
99102
else:
100103
# Cannot retrieve job -> execute process serially
101104
chunksToProcess = node.chunks
102105

103106
logging.info("[MeshroomCreateChunks] Chunks to process here :", chunksToProcess)
104107

105-
# Identify if the current process
106-
# is a running chunk or if we need to spool the chunk job
107-
#
108-
109-
# - create map from job/no des
110-
# - if we are on a task process
111-
# - do like before
112-
pass
113-
114108
if not args.forceStatus and not args.forceCompute:
115109
for chunk in chunksToProcess:
116110
if chunk.status.status in submittedStatuses:
@@ -135,8 +129,8 @@ if chunksToProcess:
135129
node.restoreLogger()
136130
else:
137131
logging.info(f"[MeshroomCreateChunks] -> create job to process chunks {node.chunks}")
138-
job.addChunkTask(node, graphFile=args.graphFile, cache=args.cache,
139-
forceStatus=args.forceStatus, forceCompute=args.forceCompute)
132+
submitter.createChunkTask(node, graphFile=args.graphFile, cache=args.cache,
133+
forceStatus=args.forceStatus, forceCompute=args.forceCompute)
140134

141135
# Restore the log level
142136
logging.getLogger().setLevel(meshroom.logStringToPython[args.verbose])

meshroom/core/graph.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1081,7 +1081,6 @@ def dfsToProcess(self, startNodes=None):
10811081
visited nodes and edges that are not already computed (node.status != SUCCESS).
10821082
The order is defined by the visit and finishVertex event.
10831083
"""
1084-
print(f"[Graph] (dfsToProcess) startNodes={startNodes}")
10851084
nodes = []
10861085
edges = []
10871086
visitor = Visitor(reverse=False, dependenciesOnly=True)

meshroom/core/node.py

Lines changed: 29 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ def renameWritingToFinalPath(writingFilepath: str, filepath: str) -> str:
4040
pass
4141
os.rename(writingFilepath, filepath)
4242

43-
4443
class Status(Enum):
4544
"""
4645
"""
@@ -75,24 +74,20 @@ def __init__(self, nodeName='', nodeType='', packageName='', packageVersion='',
7574
self.nodeType: str = nodeType
7675
self.packageName: str = packageName
7776
self.packageVersion: str = packageVersion
78-
self.mrNodeType: MrNodeType = mrNodeType
79-
self.chunks: NodeChunkSetup = None
77+
self.mrNodeType: str = mrNodeType
78+
self.reset()
79+
80+
def reset(self):
81+
self.resetChunkInfos()
8082
self.resetDynamicValues()
8183

84+
def resetChunkInfos(self):
85+
self.chunks: NodeChunkSetup = None
86+
8287
def resetDynamicValues(self):
83-
self.statusInheritedFromChunks: bool = False
8488
self.status: Status = Status.NONE
8589
self.execMode: ExecMode = ExecMode.NONE
8690
self.jobInfos: dict = {}
87-
# TODO : move all possible infos from ChunkStatusData to this place
88-
89-
def reset(self):
90-
self.nodeName: str = ""
91-
self.nodeType: str = ""
92-
self.packageName: str = ""
93-
self.packageVersion: str = ""
94-
self.mrNodeType: str = MrNodeType.NONE
95-
self.resetDynamicValues()
9691

9792
def setNode(self, node):
9893
""" Set the node information from one node instance. """
@@ -113,6 +108,13 @@ def initExternSubmit(self):
113108
self.resetDynamicValues()
114109
self.status = Status.SUBMITTED
115110
self.execMode = ExecMode.EXTERN
111+
112+
def setJob(self, jid, submitterName):
113+
""" Set Job infos on the node so that """
114+
self.jobInfos = {
115+
"jid": str(jid),
116+
"submitterName": str(submitterName),
117+
}
116118

117119
def initLocalSubmit(self):
118120
"""
@@ -127,15 +129,8 @@ def initEndCompute(self):
127129
pass # TODO
128130

129131
def setComputationStatusToInheritChunks(self):
130-
self.statusInheritedFromChunks = True
131132
self.status: Status = Status.NONE
132133

133-
def setJob(self, jid, submitterName):
134-
self.jobInfos = {
135-
"jid": str(jid),
136-
"submitterName": str(submitterName),
137-
}
138-
139134
def setNodeType(self, node):
140135
"""
141136
Set the node type and package information from the given node.
@@ -149,38 +144,39 @@ def setNodeType(self, node):
149144
def toDict(self):
150145
keys = list(self.__slots__) or []
151146
d = {key:getattr(self, key) for key in keys}
152-
chunks = [-1, -1, -1]
147+
chunks = None
153148
if self.chunks:
154149
chunks = list(self.chunks)
155150
d["chunks"] = chunks
156151
return d
157152

158-
def fromDict(self, d):
153+
def updateFromDict(self, d):
159154
self.reset()
160155
self.mrNodeType = d.pop("mrNodeType", MrNodeType.NONE)
161156
if not isinstance(self.mrNodeType, MrNodeType):
162157
self.mrNodeType = MrNodeType[self.mrNodeType]
163-
self.chunks = NodeChunkSetup(*d.pop("chunks", [-1, -1, -1]))
158+
if "chunks" in d:
159+
chunks = d.pop("chunks")
160+
if chunks:
161+
self.chunks = NodeChunkSetup(*chunks)
164162
if "status" in d:
165163
statusName = d.pop("status")
166164
self.status: Status = Status[statusName]
167165
if "execMode" in d:
168166
execModeName = d.pop("execMode")
169-
self.execMode = Status[execModeName]
167+
self.execMode = ExecMode[execModeName]
170168
for _key, _value in d.items():
171169
if _key in self.__slots__:
172170
setattr(self, _key, _value)
173171

174172
def loadFromCache(self, statusFile):
175-
print(f"[NodeStatusData](loadFromCache) {statusFile}")
176173
self.reset()
177174
try:
178175
with open(statusFile) as jsonFile:
179176
statusData = json.load(jsonFile)
180-
print(f"statusData :\n{statusData}")
181-
self.fromDict(statusData)
177+
self.updateFromDict(statusData)
182178
except Exception as e:
183-
logging.warning(f"[NodeStatusData] (loadFromCache) {self.nodeName}: Error while loading status file {statusFile}: {e}")
179+
logging.warning(f"(loadFromCache) {self.nodeName}: Error while loading status file {statusFile}: {e}")
184180
self.reset()
185181

186182
@property
@@ -534,7 +530,6 @@ def updateStatusFromCache(self):
534530
"""
535531
Update chunk status based on status file content/existence.
536532
"""
537-
print(f"updateStatusFromCache {self}")
538533
# TODO : If this is a placeholder chunk
539534
# Then we shouldn't do anything here
540535

@@ -651,7 +646,6 @@ def isFinished(self):
651646
return self._status.status == Status.SUCCESS
652647

653648
def process(self, forceCompute=False, inCurrentEnv=False):
654-
print(f"[NodeChunk] (process) {self}")
655649
if not forceCompute and self._status.status == Status.SUCCESS:
656650
logging.info(f"Node chunk already computed: {self.name}")
657651
return
@@ -698,7 +692,6 @@ def process(self, forceCompute=False, inCurrentEnv=False):
698692
self.statThread.join()
699693
self.statistics = stats.Statistics()
700694
del runningProcesses[self.name]
701-
print(f"[NodeChunk] (process) -> done")
702695

703696

704697
def _processInIsolatedEnvironment(self):
@@ -1529,9 +1522,6 @@ def updateNodeStatusFromCache(self):
15291522
# TODO : integrate statusFileLastModTime ?
15301523
Returns True if a change on the chunk setup has been detected
15311524
"""
1532-
if self.label == "E":
1533-
chunksInfos = [f"<{hex(id(c._status))}, {c._status.status.name}>" for c in self._chunks]
1534-
print(f"[BaseNode] (updateNodeStatusFromCache) E ({self}) ({chunksInfos})")
15351525
chunksRangeHasChanged = False
15361526
if os.path.exists(self.nodeStatusFile):
15371527
oldChunkSetup = self._nodeStatus.chunks
@@ -1548,7 +1538,6 @@ def updateStatusFromCache(self):
15481538
"""
15491539
Update node status based on status file content/existence.
15501540
"""
1551-
print(f"updateStatusFromCache {self}")
15521541
# Update nodeStatus from cache
15531542
chunkChanged = self.updateNodeStatusFromCache()
15541543
# Create chunks if we found info on them on the node cache
@@ -1580,13 +1569,11 @@ def saveNodeStatusFile(self):
15801569
renameWritingToFinalPath(statusFilepathWriting, statusFilepath)
15811570

15821571
def setJobId(self, jid, submitterName):
1583-
print("[BaseNode] (setJobId)", self.label, "->", jid, f"({submitterName})")
15841572
self._nodeStatus.setJob(jid, submitterName)
15851573
self.saveNodeStatusFile()
15861574

15871575
def initStatusOnSubmit(self, forceCompute=False):
15881576
""" Prepare chunks status when the node is in a graph that was submitted """
1589-
print(f"[BaseNode] (initStatusOnSubmit) {self.label} ({self})")
15901577
if not self._chunksCreated:
15911578
self._nodeStatus.setNode(self) # ???
15921579
self._nodeStatus.initExternSubmit()
@@ -1600,7 +1587,6 @@ def initStatusOnSubmit(self, forceCompute=False):
16001587
chunk.upgradeStatusFile()
16011588

16021589
def beginSequence(self, forceCompute=False):
1603-
print(f"[BaseNode] (beginSequence) {self.label} ({self})")
16041590
if not self._chunksCreated:
16051591
self._nodeStatus.setNode(self) # ???
16061592
self._nodeStatus.initLocalSubmit()
@@ -1721,7 +1707,9 @@ def endSequence(self):
17211707

17221708
def stopComputation(self):
17231709
""" Stop the computation of this node. """
1710+
print(f"[BaseNode] (stopComputation) {self}")
17241711
for chunk in self._chunks.values():
1712+
print(f"[BaseNode] (stopComputation) stop chunk {chunk}")
17251713
chunk.stopProcess()
17261714

17271715
def getGlobalStatus(self):
@@ -1807,7 +1795,6 @@ def getLocked(self):
18071795
return self._locked
18081796

18091797
def setLocked(self, lock):
1810-
print(f"(setLocked) {self.label} ({self}) -> {lock}")
18111798
if self._locked == lock:
18121799
return
18131800
self._locked = lock
@@ -1816,17 +1803,14 @@ def setLocked(self, lock):
18161803
@Slot()
18171804
def updateDuplicatesStatusAndLocked(self):
18181805
""" Update status of duplicate nodes without any latency and update locked. """
1819-
print(f"[BaseNode] (updateDuplicatesStatusAndLocked) {self}")
18201806
if self.isMainNode():
18211807
for node in self._duplicates:
18221808
node.updateStatusFromCache()
18231809

18241810
self.updateLocked()
18251811

18261812
def updateLocked(self):
1827-
print(f"(updateLocked) {self.label} ({self})")
18281813
currentStatus = self.getGlobalStatus()
1829-
print(f" currentStatus: {currentStatus}")
18301814

18311815
lockedStatus = (Status.RUNNING, Status.SUBMITTED)
18321816

@@ -1850,7 +1834,6 @@ def updateLocked(self):
18501834
if not self._locked and currentStatus == Status.SUCCESS:
18511835
return
18521836

1853-
print(f" a")
18541837
if currentStatus == Status.SUCCESS:
18551838
# At this moment, the node is necessarily locked because of previous if statement
18561839
inputNodes = self.getInputNodes(recursive=True, dependenciesOnly=True)
@@ -1869,14 +1852,12 @@ def updateLocked(self):
18691852
node.setLocked(False)
18701853
return
18711854
elif currentStatus in lockedStatus and self.isMainNode():
1872-
print(f" b")
18731855
self.setLocked(True)
18741856
inputNodes = self.getInputNodes(recursive=True, dependenciesOnly=True)
18751857
for node in inputNodes:
18761858
node.setLocked(True)
18771859
return
18781860

1879-
print(f" c")
18801861
self.setLocked(False)
18811862

18821863
def updateDuplicates(self, nodesPerUid):
@@ -2178,15 +2159,13 @@ def _resetChunks(self):
21782159
"""
21792160
if isinstance(self.nodeDesc, desc.InputNode):
21802161
return
2181-
print("[Node] (_resetChunks)", self.label)
21822162
# Disconnect signals
21832163
for chunk in self._chunks:
2184-
print(f"[Node] (_resetChunks) -> remove {chunk}")
21852164
chunk.statusChanged.disconnect(self.globalStatusChanged)
21862165
# Empty list
21872166
self._chunks.setObjectList([])
21882167
# Clear cache
2189-
2168+
self._nodeStatus.resetChunkInfos()
21902169
# Recreate list with reset values (1 chunk or the static size)
21912170
if not self.isParallelized:
21922171
self._chunksCreated = True
@@ -2198,8 +2177,6 @@ def _resetChunks(self):
21982177
self._chunksCreated = False
21992178
self.setSize(0)
22002179
# Create chunks when possible
2201-
print(f"[Node] (_resetChunks) <{self.label}|{self._uid[:5] + ('..' if len(self._uid) > 5 else '')}> ({self.size})", end="")
2202-
print(f" parallel={1 if self.isParallelized else 0} chunksCreated={1 if self._chunksCreated else 0}", end="")
22032180
if self._chunksCreated and self.isParallelized:
22042181
try:
22052182
ranges = self.nodeDesc.parallelization.getRanges(self)
@@ -2216,7 +2193,6 @@ def _resetChunks(self):
22162193
self._chunks[0].statusChanged.connect(self.globalStatusChanged)
22172194
else:
22182195
self._chunks.setObjectList([])
2219-
print(f" -> ranges={[_c.range for _c in self._chunks]}")
22202196
self.chunksCreatedChanged.emit()
22212197
self.chunksChanged.emit()
22222198
self.globalStatusChanged.emit()
@@ -2251,14 +2227,14 @@ def __createChunks(self, ranges):
22512227

22522228
def _createChunksFromCache(self):
22532229
"""Create chunks when a node cache exists"""
2254-
print("[Node] (_createChunksFromCache)", self.label, end="")
2230+
# print("[Node] (_createChunksFromCache)", self.label, end="")
22552231
try:
22562232
# Get size from cache
22572233
size = self._nodeStatus.nbChunks
22582234
self.setSize(size)
22592235
ranges = self._nodeStatus.getChunkRanges()
22602236
self.__createChunks(ranges)
2261-
print(" -> ranges =", [_c.range for _c in self._chunks])
2237+
# print(" -> ranges =", [_c.range for _c in self._chunks])
22622238
except Exception as e:
22632239
logging.error(f"Failed to create chunks for {self.name}")
22642240
self._chunks.clear()

0 commit comments

Comments
 (0)