Skip to content

Commit de6a44e

Browse files
committed
[submitter] Update local farm to work with preprocess/postprocess chunks
1 parent 3914422 commit de6a44e

5 files changed

Lines changed: 55 additions & 32 deletions

File tree

bin/meshroom_compute

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,20 +138,21 @@ if args.node:
138138
print(f"InputNode: No computation to do.")
139139
sys.exit(0)
140140

141-
if args.iteration == ChunkIndex.NONE:
142-
chunks = node.chunks
143-
elif args.preprocess:
141+
if args.preprocess:
144142
chunks = [node._preprocessChunk]
145143
elif args.postprocess:
146144
chunks = [node._postprocessChunk]
145+
elif args.iteration == ChunkIndex.NONE: # Warning : default value
146+
chunks = node.chunks
147147
else:
148148
chunks = [node.chunks[args.iteration]]
149-
149+
150150
if not args.forceStatus and not args.forceCompute:
151151
for chunk in chunks:
152152
if chunk.status.status in submittedStatuses:
153153
# Particular case for the local isolated, the node status is set to RUNNING by the submitter directly.
154-
# We ensure that no other instance has started to compute, by checking that the computeSessionUid is empty.
154+
# We ensure that no other instance has start
155+
# ed to compute, by checking that the computeSessionUid is empty.
155156
if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and \
156157
not chunk.status.computeSessionUid and node._nodeStatus.submitterSessionUid:
157158
continue

localfarm/localFarmBackend.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ def startTask(self, task: Task):
358358
with open(task.logFile, "w") as log:
359359
log.write(f"# ========== Starting task {task.tid} at {task.started_at.isoformat()}"
360360
f" (command=\"{task.command}\") ==========\n")
361+
log.write(f"# metadata: {task.metadata}\n")
361362
log.write(f"# process_env:\n")
362363
log.write(f"# Additional env variables:\n")
363364
for _k, _v in additional_env.items():

meshroom/core/submitter.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -299,14 +299,12 @@ def display(self, task:OrderedTask=None, level=0):
299299
def iterOnTasks(self, current:OrderedTask=None):
300300
if current is None:
301301
current = self.rootTask
302-
items = [current]
302+
yield current
303303
for task in current.dependencies:
304-
items.extend(self.iterOnTasks(task))
305-
return items
304+
yield from self.iterOnTasks(task)
306305

307306
def __iter__(self):
308-
for item in self.iterOnTasks():
309-
yield item
307+
yield from self.iterOnTasks()
310308

311309

312310
class BaseSubmittedJob:

meshroom/submitters/localFarmSubmitter.py

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def getRequestPackages(packagesDelimiter="=="):
7878
return list(reqPackages)
7979

8080

81-
def rezWrapCommand(cmd, useCurrentContext=False, useRequestedContext=True, otherRezPkg: list[str] = None):
81+
def rezWrapCommand(cmd, useCurrentContext=False, useRequestedContext=True, otherRezPkg: list[str] = None, additionalEnv: dict=None):
8282
""" Wrap command to be runned using rez.
8383
:param cmd: command to run
8484
:type cmd: bool
@@ -108,7 +108,11 @@ def rezWrapCommand(cmd, useCurrentContext=False, useRequestedContext=True, other
108108
rezBin = os.path.join(os.environ["REZ_PACKAGES_ROOT"], "bin/rez")
109109
elif shutil.which("rez"):
110110
rezBin = shutil.which("rez")
111-
return f"{rezBin} env {packagesStr} -- {cmd}"
111+
addEnvCmd = ""
112+
if additionalEnv:
113+
envVars = " ".join([f'{k}="{v}"' for k, v in additionalEnv.items()])
114+
addEnvCmd = f"env {envVars} "
115+
return f"{rezBin} env {packagesStr} -- {addEnvCmd}{cmd}"
112116
return cmd
113117

114118

@@ -236,33 +240,44 @@ def getChunks(chunkParams) -> list[Chunk]:
236240
it = [Chunk(i, item[0], item[-1]) for i, item in enumerate(slices) if i not in ignoreIterations]
237241
return it
238242

239-
@staticmethod
240-
def getExpandWrappedCmd(cmdArgs, rezPackages):
243+
def getExpandWrappedCmd(self, cmdArgs, rezPackages):
241244
# Wrap with create_chunks
242245
cmdBin = wrapMeshroomBin("meshroom_createChunks")
243246
cmd = f"{cmdBin} --submitter LocalFarm {cmdArgs}"
244247
# Wrap with rez
245-
cmd = rezWrapCommand(cmd, otherRezPkg=rezPackages)
248+
cmd = rezWrapCommand(cmd, otherRezPkg=rezPackages, additionalEnv=self.jobEnv)
246249
return cmd
247250

248251
def createFarmTask(self, meshroomFile: str, orderedTask: OrderedTask, createdTasks: Dict[OrderedTask, Task]) -> Task:
249252
metadata = dict()
250253
if orderedTask.node:
251-
metadata = {"nodeUid": orderedTask.node._uid, "iteration": orderedTask.iteration}
254+
metadata = {"nodeUid": orderedTask.node._uid}
252255

256+
if orderedTask.iteration >= 0:
257+
metadata["iteration"] = orderedTask.iteration
258+
elif orderedTask.taskType == OrderedTaskType.PREPROCESS:
259+
metadata["iteration"] = "preprocess"
260+
elif orderedTask.taskType == OrderedTaskType.POSTPROCESS:
261+
metadata["iteration"] = "postprocess"
262+
253263
if orderedTask.taskType == OrderedTaskType.PLACEHOLDER:
254264
return Task(name=orderedTask.node.name if orderedTask.node else "", command="", metadata=metadata)
255265

256266
cmdArgs = f"--node {orderedTask.node.name} \"{meshroomFile}\" --extern"
257-
metadata = {"nodeUid": orderedTask.node._uid, "iteration": orderedTask.iteration}
258267

259268
if orderedTask.taskType == OrderedTaskType.EXPANDING:
260269
cmd = self.getExpandWrappedCmd(cmdArgs, self.reqPackages)
261270
task = Task(name=orderedTask.node.name, command=cmd, metadata=metadata, env=self.jobEnv)
262271
else:
263272
cmdBin = wrapMeshroomBin("meshroom_compute")
264-
cmd = f"{cmdBin} {cmdArgs} --iteration {orderedTask.iteration}"
265-
cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages)
273+
cmd = f"{cmdBin} {cmdArgs}"
274+
if orderedTask.taskType == OrderedTaskType.PREPROCESS:
275+
cmd += f" --preprocess"
276+
elif orderedTask.taskType == OrderedTaskType.POSTPROCESS:
277+
cmd += f" --postprocess"
278+
elif orderedTask.taskType == OrderedTaskType.CHUNK:
279+
cmd += f" --iteration {orderedTask.iteration}"
280+
cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages, additionalEnv=self.jobEnv)
266281
task = Task(name=orderedTask.node.name, command=cmd, metadata=metadata, env=self.jobEnv)
267282

268283
return task
@@ -303,24 +318,25 @@ def createJob(self, orderedTasks, filepath, submitLabel="{projectName}") -> Loca
303318
name = submitLabel.format(projectName=projectName)
304319
# Create job
305320
job = Job(name)
306-
321+
307322
# Create tasks
308323
orderedTasks.display()
309324
createdTasks: Dict[OrderedTask, Task] = dict()
310325
for taskToCreate in orderedTasks.iterOnTasks():
311326
if taskToCreate in createdTasks.keys():
312327
continue
313328
createdTask = self.createFarmTask(filepath, taskToCreate, createdTasks)
329+
job.addTask(createdTask)
314330
createdTasks[taskToCreate] = createdTask
315-
331+
316332
for orderedTask, task in createdTasks.items():
317333
print(orderedTask, "->", task)
318-
334+
319335
for orderedTask, task in createdTasks.items():
320336
deps = [createdTasks.get(t) for t in orderedTask.dependencies]
321337
for dependency in deps:
322-
job.addTaskDependency(dependency, task)
323-
338+
job.addTaskDependency(task, dependency)
339+
324340
# Submit job
325341
engine = LocalFarmEngine(self.farmPath)
326342
res = job.submit(engine)
@@ -359,7 +375,7 @@ def createChunkTask(self, node, graphFile, **kwargs):
359375
metadata = {"nodeUid": node._uid, "iteration": chunk.iteration}
360376
cmdBin = wrapMeshroomBin("meshroom_compute")
361377
cmd = f"{cmdBin} {cmdArgs} --iteration {chunk.iteration}"
362-
cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages)
378+
cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages, additionalEnv=self.jobEnv)
363379
print("Additional chunk task command: ", cmd)
364380
task = Task(name=name, command=cmd, metadata=metadata, env=taskEnv)
365381
engine.create_additional_task(currentJid, currentTid, task)

tests/test_submit.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,25 @@ def waitForNodeCompletion(job: LocalFarmJob, node: Node, timeout=25):
5252
print(f"Waiting for node {node.name} to complete...")
5353
startTime = time.time()
5454
while True:
55+
time.sleep(1)
56+
if time.time() - startTime > timeout:
57+
raise TimeoutError(f"Node {node.name} did not complete within {timeout} seconds")
58+
# Check for job error
59+
err = job.getJobErrors()
60+
if err:
61+
raise RuntimeError(f"Job encountered an error: {err}")
62+
# Check that all tasks are finished
63+
for task in job.localfarmTasks.values():
64+
if task.get("status") not in (Status.NONE.name, Status.SUCCESS.name, Status.STOPPED.name, Status.ERROR.name):
65+
continue
66+
break
67+
# Stop if the node switched to done
5568
node.updateStatusFromCache()
5669
nodeStatus = node.getGlobalStatus()
5770
if nodeStatus not in (Status.SUBMITTED, Status.RUNNING):
5871
print(f"Node status switched to {nodeStatus}")
5972
return
60-
# Check for job error
61-
err = job.getJobErrors()
62-
if err:
63-
raise RuntimeError(f"Job encountered an error: {err}")
64-
if time.time() - startTime > timeout:
65-
raise TimeoutError(f"Node {node.name} did not complete within {timeout} seconds")
66-
time.sleep(1)
73+
6774

6875
def processSubmit(node: Node, graph, tmp_path):
6976
"""

0 commit comments

Comments
 (0)