Skip to content

Commit 95bfbcf

Browse files
committed
[code] submitter : fix issues in dynamic chunks & submitting
1 parent 15df47f commit 95bfbcf

File tree

9 files changed

+620
-277
lines changed

9 files changed

+620
-277
lines changed

bin/meshroom_compute

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,11 @@ if args.node:
9191
for chunk in chunks:
9292
if chunk.status.status in submittedStatuses:
9393
# Particular case for the local isolated, the node status is set to RUNNING by the submitter directly.
94-
# We ensure that no other instance has started to compute, by checking that the sessionUid is empty.
95-
if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and not chunk.status.sessionUid and chunk.status.submitterSessionUid:
94+
# We ensure that no other instance has started to compute, by checking that the computeSessionUid is empty.
95+
if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and \
96+
not chunk.status.computeSessionUid and node._nodeStatus.submitterSessionUid:
9697
continue
97-
print(f'Warning: Node is already submitted with status "{chunk.status.status.name}". See file: "{chunk.statusFile}". ExecMode: {chunk.status.execMode.name}, SessionUid: {chunk.status.sessionUid}, submitterSessionUid: {chunk.status.submitterSessionUid}')
98+
print(f'Warning: Node is already submitted with status "{chunk.status.status.name}". See file: "{chunk.statusFile}". ExecMode: {chunk.status.execMode.name}, computeSessionUid: {chunk.status.computeSessionUid}, submitterSessionUid: {node._nodeStatus.submitterSessionUid}')
9899
# sys.exit(-1)
99100

100101
if args.extern:

bin/meshroom_createChunks

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ graph.update()
7878
node = graph.findNode(args.node)
7979
submittedStatuses = [Status.RUNNING]
8080

81+
if node._nodeStatus.status in (Status.STOPPED, Status.KILLED):
82+
logging.error("Node status is STOPPED or KILLED.")
83+
sys.exit(666)
84+
8185

8286
# Find job
8387
submitter = None
@@ -87,12 +91,17 @@ for subName, sub in submitters.items():
8791
submitter = sub
8892
break
8993

90-
if not (hasattr(node, '_chunksCreated') and node._chunksCreated):
94+
if not node._chunksCreated:
9195
# Create node chunks
9296
# Once created we don't have to do it again even if we relaunch the job
93-
node._createChunks()
97+
node.createChunks()
9498
# Set the chunks statuses
95-
node.initStatusOnSubmit()
99+
for chunk in node._chunks:
100+
if args.forceCompute or chunk._status.status != Status.SUCCESS:
101+
hasChunkToLaunch = True
102+
chunk._status.setNode(node)
103+
chunk._status.initExternSubmit()
104+
chunk.upgradeStatusFile()
96105

97106
# Get chunks to process in the current process
98107
chunksToProcess = []
@@ -103,20 +112,21 @@ else:
103112
# Cannot retrieve job -> execute process serially
104113
chunksToProcess = node.chunks
105114

106-
logging.info("[MeshroomCreateChunks] Chunks to process here :", chunksToProcess)
115+
logging.info(f"[MeshroomCreateChunks] Chunks to process here : {chunksToProcess}")
107116

108117
if not args.forceStatus and not args.forceCompute:
109118
for chunk in chunksToProcess:
110119
if chunk.status.status in submittedStatuses:
111120
# Particular case for the local isolated, the node status is set to RUNNING by the submitter directly.
112-
# We ensure that no other instance has started to compute, by checking that the sessionUid is empty.
113-
if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and not chunk.status.sessionUid and chunk.status.submitterSessionUid:
121+
# We ensure that no other instance has started to compute, by checking that the sessicomputeSessionUidonUid is empty.
122+
if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and \
123+
not chunk.status.computeSessionUid and node._nodeStatus.submitterSessionUid:
114124
continue
115125
logging.warning(
116126
f"[MeshroomCreateChunks] Node is already submitted with status " \
117127
f"\"{chunk.status.status.name}\". See file: \"{chunk.statusFile}\". " \
118-
f"ExecMode: {chunk.status.execMode.name}, SessionUid: {chunk.status.sessionUid}, " \
119-
f"submitterSessionUid: {chunk.status.submitterSessionUid}")
128+
f"ExecMode: {chunk.status.execMode.name}, computeSessionUid: {chunk.status.computeSessionUid}, " \
129+
f"submitterSessionUid: {node._nodeStatus.submitterSessionUid}")
120130
# sys.exit(-1)
121131

122132
if chunksToProcess:

meshroom/core/desc/node.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,21 @@
2323

2424
# Handle cleanup
2525
class ExitCleanup:
26+
"""
27+
Make sure we kill child subprocesses when the main process exits receive SIGTERM.
28+
"""
29+
2630
def __init__(self):
2731
self._subprocesses = []
2832
signal.signal(signal.SIGTERM, self.exit)
29-
33+
3034
def addSubprocess(self, process):
31-
print(f"[ExitCleanup] (addSubprocess) register subprocess {process}")
35+
logging.debug(f"[ExitCleanup] Register subprocess {process}")
3236
self._subprocesses.append(process)
33-
37+
3438
def exit(self, signum, frame):
3539
for proc in self._subprocesses:
36-
print(f"[ExitCleanup] (exit) kill subprocess {proc}")
40+
logging.debug(f"[ExitCleanup] Kill subprocess {proc}")
3741
try:
3842
if proc.is_running():
3943
proc.terminate()
@@ -307,8 +311,6 @@ def processChunkInEnvironment(self, chunk):
307311

308312
if len(chunk.node.getChunks()) > 1:
309313
meshroomComputeCmd += f" --iteration {chunk.range.iteration}"
310-
311-
print(f"(processChunkInEnvironment) meshroomComputeCmd={meshroomComputeCmd}")
312314

313315
runtimeEnv = chunk.node.nodeDesc.plugin.runtimeEnv
314316
cmdPrefix = chunk.node.nodeDesc.plugin.commandPrefix

meshroom/core/graph.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from meshroom.core.node import BaseNode, Status, Node, CompatibilityNode
2222
from meshroom.core.nodeFactory import nodeFactory
2323
from meshroom.core.mtyping import PathLike
24+
from meshroom.core.submitter import BaseSubmittedJob, jobManager
2425

2526
# Replace default encoder to support Enums
2627

@@ -1669,7 +1670,7 @@ def executeGraph(graph, toNodes=None, forceCompute=False, forceStatus=False):
16691670
graph.save()
16701671

16711672
for node in nodes:
1672-
node.beginSequence(forceCompute)
1673+
node.initStatusOnCompute(forceCompute)
16731674

16741675
for n, node in enumerate(nodes):
16751676
try:
@@ -1715,11 +1716,18 @@ def submitGraph(graph, submitter, toNodes=None, submitLabel="{projectName}"):
17151716
raise RuntimeError("Unknown Submitter: '{submitter}'. Available submitters are: '{allSubmitters}'.".format(
17161717
submitter=submitter, allSubmitters=str(meshroom.core.submitters.keys())))
17171718

1719+
for node in nodesToProcess:
1720+
node.initStatusOnSubmit()
1721+
17181722
try:
17191723
res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath, submitLabel=submitLabel)
17201724
if res:
1725+
if isinstance(res, BaseSubmittedJob):
1726+
jobManager.addJob(res, nodesToProcess)
1727+
else:
17211728
for node in nodesToProcess:
1722-
node.initStatusOnSubmit() # update node status
1729+
# TODO : Notify the node that there was an issue on submit
1730+
pass
17231731
except Exception as e:
17241732
logging.error(f"Error on submit : {e}")
17251733

0 commit comments

Comments
 (0)