Skip to content

Commit 14b795d

Browse files
committed
[submitter] Update local farm to work with preprocess/postprocess chunks
1 parent 4e018ee commit 14b795d

5 files changed

Lines changed: 56 additions & 33 deletions

File tree

bin/meshroom_compute

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ if args.node:
109109
# If not running as "extern", the SUBMITTED status should generate a warning.
110110
submittedStatuses.append(Status.SUBMITTED)
111111

112-
if not node._chunksCreated:
112+
if not node._chunksCreated and args.iteration >= 0:
113113
print(f"Error: Node {node} has been submitted before chunks have been created." \
114114
f"See file: \"{node.nodeStatusFile}\".")
115115
sys.exit(-1)
@@ -118,20 +118,21 @@ if args.node:
118118
print(f"InputNode: No computation to do.")
119119
sys.exit(0)
120120

121-
if args.iteration == ChunkIndex.NONE:
122-
chunks = node.chunks
123-
elif args.preprocess:
121+
if args.preprocess:
124122
chunks = [node._preprocessChunk]
125123
elif args.postprocess:
126124
chunks = [node._postprocessChunk]
125+
elif args.iteration == ChunkIndex.NONE: # Warning : default value
126+
chunks = node.chunks
127127
else:
128128
chunks = [node.chunks[args.iteration]]
129-
129+
130130
if not args.forceStatus and not args.forceCompute:
131131
for chunk in chunks:
132132
if chunk.status.status in submittedStatuses:
133133
# Particular case for the local isolated, the node status is set to RUNNING by the submitter directly.
134-
# We ensure that no other instance has started to compute, by checking that the computeSessionUid is empty.
134+
# We ensure that no other instance has start
135+
# ed to compute, by checking that the computeSessionUid is empty.
135136
if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and \
136137
not chunk.status.computeSessionUid and node._nodeStatus.submitterSessionUid:
137138
continue

localfarm/localFarmBackend.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ def startTask(self, task: Task):
358358
with open(task.logFile, "w") as log:
359359
log.write(f"# ========== Starting task {task.tid} at {task.started_at.isoformat()}"
360360
f" (command=\"{task.command}\") ==========\n")
361+
log.write(f"# metadata: {task.metadata}\n")
361362
log.write(f"# process_env:\n")
362363
log.write(f"# Additional env variables:\n")
363364
for _k, _v in additional_env.items():

meshroom/core/submitter.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -299,14 +299,12 @@ def display(self, task:OrderedTask=None, level=0):
299299
def iterOnTasks(self, current:OrderedTask=None):
300300
if current is None:
301301
current = self.rootTask
302-
items = [current]
302+
yield current
303303
for task in current.dependencies:
304-
items.extend(self.iterOnTasks(task))
305-
return items
304+
yield from self.iterOnTasks(task)
306305

307306
def __iter__(self):
308-
for item in self.iterOnTasks():
309-
yield item
307+
yield from self.iterOnTasks()
310308

311309

312310
class BaseSubmittedJob:

meshroom/submitters/localFarmSubmitter.py

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def getRequestPackages(packagesDelimiter="=="):
7878
return list(reqPackages)
7979

8080

81-
def rezWrapCommand(cmd, useCurrentContext=False, useRequestedContext=True, otherRezPkg: list[str] = None):
81+
def rezWrapCommand(cmd, useCurrentContext=False, useRequestedContext=True, otherRezPkg: list[str] = None, additionalEnv: dict=None):
8282
""" Wrap command to be runned using rez.
8383
:param cmd: command to run
8484
:type cmd: bool
@@ -108,7 +108,11 @@ def rezWrapCommand(cmd, useCurrentContext=False, useRequestedContext=True, other
108108
rezBin = os.path.join(os.environ["REZ_PACKAGES_ROOT"], "bin/rez")
109109
elif shutil.which("rez"):
110110
rezBin = shutil.which("rez")
111-
return f"{rezBin} env {packagesStr} -- {cmd}"
111+
addEnvCmd = ""
112+
if additionalEnv:
113+
envVars = " ".join([f'{k}="{v}"' for k, v in additionalEnv.items()])
114+
addEnvCmd = f"env {envVars} "
115+
return f"{rezBin} env {packagesStr} -- {addEnvCmd}{cmd}"
112116
return cmd
113117

114118

@@ -236,33 +240,44 @@ def getChunks(chunkParams) -> list[Chunk]:
236240
it = [Chunk(i, item[0], item[-1]) for i, item in enumerate(slices) if i not in ignoreIterations]
237241
return it
238242

239-
@staticmethod
240-
def getExpandWrappedCmd(cmdArgs, rezPackages):
243+
def getExpandWrappedCmd(self, cmdArgs, rezPackages):
241244
# Wrap with create_chunks
242245
cmdBin = wrapMeshroomBin("meshroom_createChunks")
243246
cmd = f"{cmdBin} --submitter LocalFarm {cmdArgs}"
244247
# Wrap with rez
245-
cmd = rezWrapCommand(cmd, otherRezPkg=rezPackages)
248+
cmd = rezWrapCommand(cmd, otherRezPkg=rezPackages, additionalEnv=self.jobEnv)
246249
return cmd
247250

248251
def createFarmTask(self, meshroomFile: str, orderedTask: OrderedTask, createdTasks: Dict[OrderedTask, Task]) -> Task:
249252
metadata = dict()
250253
if orderedTask.node:
251-
metadata = {"nodeUid": orderedTask.node._uid, "iteration": orderedTask.iteration}
254+
metadata = {"nodeUid": orderedTask.node._uid}
252255

256+
if orderedTask.iteration >= 0:
257+
metadata["iteration"] = orderedTask.iteration
258+
elif orderedTask.taskType == OrderedTaskType.PREPROCESS:
259+
metadata["iteration"] = "preprocess"
260+
elif orderedTask.taskType == OrderedTaskType.POSTPROCESS:
261+
metadata["iteration"] = "postprocess"
262+
253263
if orderedTask.taskType == OrderedTaskType.PLACEHOLDER:
254264
return Task(name=orderedTask.node.name if orderedTask.node else "", command="", metadata=metadata)
255265

256266
cmdArgs = f"--node {orderedTask.node.name} \"{meshroomFile}\" --extern"
257-
metadata = {"nodeUid": orderedTask.node._uid, "iteration": orderedTask.iteration}
258267

259268
if orderedTask.taskType == OrderedTaskType.EXPANDING:
260269
cmd = self.getExpandWrappedCmd(cmdArgs, self.reqPackages)
261270
task = Task(name=orderedTask.node.name, command=cmd, metadata=metadata, env=self.jobEnv)
262271
else:
263272
cmdBin = wrapMeshroomBin("meshroom_compute")
264-
cmd = f"{cmdBin} {cmdArgs} --iteration {orderedTask.iteration}"
265-
cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages)
273+
cmd = f"{cmdBin} {cmdArgs}"
274+
if orderedTask.taskType == OrderedTaskType.PREPROCESS:
275+
cmd += f" --preprocess"
276+
elif orderedTask.taskType == OrderedTaskType.POSTPROCESS:
277+
cmd += f" --postprocess"
278+
elif orderedTask.taskType == OrderedTaskType.CHUNK:
279+
cmd += f" --iteration {orderedTask.iteration}"
280+
cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages, additionalEnv=self.jobEnv)
266281
task = Task(name=orderedTask.node.name, command=cmd, metadata=metadata, env=self.jobEnv)
267282

268283
return task
@@ -272,24 +287,25 @@ def createJob(self, orderedTasks, filepath, submitLabel="{projectName}") -> Loca
272287
name = submitLabel.format(projectName=projectName)
273288
# Create job
274289
job = Job(name)
275-
290+
276291
# Create tasks
277292
orderedTasks.display()
278293
createdTasks: Dict[OrderedTask, Task] = dict()
279294
for taskToCreate in orderedTasks.iterOnTasks():
280295
if taskToCreate in createdTasks.keys():
281296
continue
282297
createdTask = self.createFarmTask(filepath, taskToCreate, createdTasks)
298+
job.addTask(createdTask)
283299
createdTasks[taskToCreate] = createdTask
284-
300+
285301
for orderedTask, task in createdTasks.items():
286302
print(orderedTask, "->", task)
287-
303+
288304
for orderedTask, task in createdTasks.items():
289305
deps = [createdTasks.get(t) for t in orderedTask.dependencies]
290306
for dependency in deps:
291-
job.addTaskDependency(dependency, task)
292-
307+
job.addTaskDependency(task, dependency)
308+
293309
# Submit job
294310
engine = LocalFarmEngine(self.farmPath)
295311
res = job.submit(engine)
@@ -328,7 +344,7 @@ def createChunkTask(self, node, graphFile, **kwargs):
328344
metadata = {"nodeUid": node._uid, "iteration": chunk.iteration}
329345
cmdBin = wrapMeshroomBin("meshroom_compute")
330346
cmd = f"{cmdBin} {cmdArgs} --iteration {chunk.iteration}"
331-
cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages)
347+
cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages, additionalEnv=self.jobEnv)
332348
print("Additional chunk task command: ", cmd)
333349
task = Task(name=name, command=cmd, metadata=metadata, env=taskEnv)
334350
engine.create_additional_task(currentJid, currentTid, task)

tests/test_submit.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,25 @@ def waitForNodeCompletion(job: LocalFarmJob, node: Node, timeout=25):
5252
print(f"Waiting for node {node.name} to complete...")
5353
startTime = time.time()
5454
while True:
55+
time.sleep(1)
56+
if time.time() - startTime > timeout:
57+
raise TimeoutError(f"Node {node.name} did not complete within {timeout} seconds")
58+
# Check for job error
59+
err = job.getJobErrors()
60+
if err:
61+
raise RuntimeError(f"Job encountered an error: {err}")
62+
# Check that all tasks are finished
63+
for task in job.localfarmTasks.values():
64+
if task.get("status") not in (Status.NONE.name, Status.SUCCESS.name, Status.STOPPED.name, Status.ERROR.name):
65+
continue
66+
break
67+
# Stop if the node switched to done
5568
node.updateStatusFromCache()
5669
nodeStatus = node.getGlobalStatus()
5770
if nodeStatus not in (Status.SUBMITTED, Status.RUNNING):
5871
print(f"Node status switched to {nodeStatus}")
5972
return
60-
# Check for job error
61-
err = job.getJobErrors()
62-
if err:
63-
raise RuntimeError(f"Job encountered an error: {err}")
64-
if time.time() - startTime > timeout:
65-
raise TimeoutError(f"Node {node.name} did not complete within {timeout} seconds")
66-
time.sleep(1)
73+
6774

6875
def processSubmit(node: Node, graph, tmp_path):
6976
"""

0 commit comments

Comments
 (0)