Skip to content

Commit 7874d82

Browse files
committed
Complete localFarm implementation on tests
1 parent 3f6c99f commit 7874d82

File tree

15 files changed

+583
-199
lines changed

15 files changed

+583
-199
lines changed

.codecov.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,4 @@ ignore:
5050
- "docs/"
5151
- "scripts/"
5252
- "bin/"
53+
- "localFarm/" # For now ignore localFarm as it has no coverage yet

bin/meshroom_compute

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ graph.update()
7272
if args.node:
7373
# Execute the node
7474
node = graph.findNode(args.node)
75+
node.updateStatusFromCache()
7576
submittedStatuses = [Status.RUNNING]
7677
if not args.extern:
7778
# If running as "extern", the task is supposed to have the status SUBMITTED.
@@ -80,7 +81,7 @@ if args.node:
8081

8182
if not node._chunksCreated:
8283
print(f"Error: Node {node} has been submitted before chunks have been created." \
83-
"See file: \"{node.nodeStatusFile}\".")
84+
f"See file: \"{node.nodeStatusFile}\".")
8485
sys.exit(-1)
8586

8687
if node._isInputNode():

bin/meshroom_createChunks

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ if chunksToProcess:
137137
node.postprocess()
138138
node.restoreLogger()
139139
else:
140-
logging.info(f"[MeshroomCreateChunks] -> create job to process chunks {node.chunks}")
140+
logging.info(f"[MeshroomCreateChunks] -> create job to process chunks {[c for c in node.chunks]}")
141141
submitter.createChunkTask(node, graphFile=args.graphFile, cache=args.cache,
142142
forceStatus=args.forceStatus, forceCompute=args.forceCompute)
143143

localfarm/__init__.py

Whitespace-only changes.

meshroom/submitters/localFarm/localFarm.py renamed to localfarm/localFarm.py

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@
44
Local Farm : A simple local job runner
55
"""
66

7+
from __future__ import annotations # For forward references in type hints
8+
79
import logging
810
import json
911
import socket
1012
import logging
1113
import uuid
12-
from collections import defaultdict, deque
14+
from collections import defaultdict
1315
from pathlib import Path
14-
from typing import Dict, List
16+
from typing import Dict, List, Generator
1517

1618
logging.basicConfig(
1719
level=logging.INFO,
@@ -30,6 +32,7 @@ def __init__(self, root):
3032

3133
def connect(self):
3234
"""Connect to the backend"""
35+
print("Connect to farm located at", self.root)
3336
if self.tcpPortFile.exists():
3437
try:
3538
port = int(self.tcpPortFile.read_text())
@@ -68,7 +71,7 @@ def _call(self, method, **params):
6871
finally:
6972
sock.close()
7073

71-
def submit_job(self, job):
74+
def submit_job(self, job: Job):
7275
""" Submit the job to the farm """
7376
# Create the job
7477
createdJob = self._call("create_job", name=job.name)
@@ -83,22 +86,23 @@ def submit_job(self, job):
8386
raise RuntimeError(f"Parent task {parentTask.name} not created yet")
8487
deps.append(tasksCreated[parentTask])
8588
createdTask = self._call("create_task",
86-
jid=jid, name=task.name, command=task.command, metadata=task.metadata, dependencies=deps)
89+
jid=jid, name=task.name, command=task.command,
90+
metadata=task.metadata, dependencies=deps, env=task.env)
8791
tasksCreated[task] = createdTask["tid"]
8892
# Submit the job
8993
self._call("submit_job", jid=jid)
9094
return {"jid": jid}
9195

9296
def create_additional_task(self, jid, tid, task):
9397
""" Create new task in an existing job """
94-
metadata = {}
9598
createdTask = self._call("expand_task",
96-
jid=jid, name=task.name, command=task.command, metadata=task.metadata, parentTid=tid)
99+
jid=jid, name=task.name, command=task.command,
100+
metadata=task.metadata, parentTid=tid, env=task.env)
97101
return {"tid": createdTask["tid"]}
98102

99103
def get_job_infos(self, jid):
100104
"""Get job status"""
101-
return self._call("get_job_status", jid=jid)
105+
return self._call("get_job_infos", jid=jid)["result"]
102106

103107
def pause_job(self, jid):
104108
"""Pause a job"""
@@ -132,9 +136,15 @@ def restart_task(self, jid, tid):
132136
"""Restart a task"""
133137
return self._call("restart_task", jid=jid, tid=tid)
134138

135-
def list_jobs(self):
139+
def list_jobs(self) -> list:
136140
"""List all jobs"""
137-
return self._call("list_jobs")
141+
return self._call("list_jobs")["jobs"]
142+
143+
def getJobStatus(self, jid: int) -> dict:
144+
for job in self.list_jobs():
145+
if job["jid"] == jid:
146+
return job
147+
return {}
138148

139149
def ping(self):
140150
"""Check if backend is alive"""
@@ -146,12 +156,13 @@ def ping(self):
146156

147157

148158
class Task:
149-
def __init__(self, name, command, metadata):
159+
def __init__(self, name, command, metadata=None, env=None):
150160
self.uid = str(uuid.uuid1())
151161
self.name = name
152162
self.command = command
153-
self.metadata = metadata
154-
163+
self.metadata = metadata or {}
164+
self.env = env or {}
165+
155166
def __repr__(self):
156167
return f"<Task {self.name}|{self.uid}>"
157168

@@ -165,13 +176,17 @@ def __init__(self, name):
165176
self.tasks: Dict[str, Task] = {}
166177
self.dependencies: Dict[str: List[str]] = defaultdict(set)
167178
self.reverseDependencies: Dict[str: List[str]] = defaultdict(set)
179+
self._engine: LocalFarmEngine = None
180+
181+
def setEngine(self, engine: LocalFarmEngine):
182+
self._engine = engine
168183

169184
def addTask(self, task):
170185
if task.name in self.tasks:
171186
raise ValueError(f"Task {task} already exists in job")
172187
self.tasks[task.uid] = task
173188

174-
def addTaskDependency(self, task, dependsOn):
189+
def addTaskDependency(self, task: Task, dependsOn: Task):
175190
if task.uid not in self.tasks:
176191
raise ValueError(f"Task {task} not found in job")
177192
if dependsOn.uid not in self.tasks:
@@ -217,7 +232,7 @@ def exploreTask(taskUid, taskParents=None):
217232
return True
218233
return False
219234

220-
def tasksDFS(self) -> List[Task]:
235+
def tasksDFS(self) -> Generator[Task]:
221236
"""
222237
Return tasks in topological order (dependencies before dependents).
223238
Tasks closer to roots appear first.
@@ -243,10 +258,13 @@ def exploreTask(task: str, currentLevel=0):
243258
for task in tasks:
244259
yield task
245260

246-
def submit(self, farmPath):
247-
farm = LocalFarmEngine(farmPath)
248-
result = farm.submit_job(self)
249-
return result
261+
def submit(self, engine: LocalFarmEngine = None):
262+
engine = engine or self._engine
263+
if engine:
264+
result = engine.submit_job(self)
265+
return result
266+
else:
267+
raise ValueError("No LocalFarmEngine set for this job")
250268

251269

252270
def test():

meshroom/submitters/localFarm/localFarmBackend.py renamed to localfarm/localFarmBackend.py

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import threading
2424
from socketserver import BaseRequestHandler, ThreadingTCPServer
2525

26-
FARM_MAX_PARALLEL_TASKS = 2
26+
FARM_MAX_PARALLEL_TASKS = 10
2727
MAX_BYTES_REQUEST = 4096 # 8192 / 65536 if needed
2828

2929
PathLike = Union[str, Path]
@@ -48,14 +48,15 @@ class Status(Enum):
4848

4949

5050
class Task:
51-
def __init__(self, jid: str, tid: str, label: str, command: str, metadata: dict, jobDir: PathLike):
51+
def __init__(self, jid: str, tid: str, label: str, command: str, metadata: dict, jobDir: PathLike, env: dict = None):
5252
self.jid: str = jid
5353
self.tid: str = tid
5454
self.parentTids = [] # Tasks that must be completed before this one
5555
self.childTids = [] # Task that depend on this one
5656
self.label: str = label
5757
self.command: str = command
58-
self.metadata: dict = metadata
58+
self.metadata: dict = metadata or {}
59+
self.env: dict = env or {}
5960
self.taskDir: Path = Path(jobDir) / "tasks"
6061
self.taskDir.mkdir(parents=True, exist_ok=True)
6162
self.status: Status = Status.NONE
@@ -73,6 +74,7 @@ def to_dict(self):
7374
"label": self.label,
7475
"command": self.command,
7576
"metadata": self.metadata,
77+
"env": self.env,
7678
"status": self.status.name,
7779
"created_at": self.created_at.isoformat(),
7880
"started_at": self.started_at.isoformat() if self.started_at else None,
@@ -300,14 +302,7 @@ def processJobs(self):
300302
# Check if process finished
301303
returncode = task.process.poll()
302304
if returncode is not None:
303-
task.finished_at = datetime.now()
304-
task.return_code = returncode
305-
if returncode == 0:
306-
task.status = Status.SUCCESS
307-
logger.info(f"Task {task.tid} completed")
308-
else:
309-
task.status = Status.ERROR
310-
logger.error(f"Task {task.tid} failed with code {returncode}")
305+
self.finishTask(task, returncode)
311306

312307
# Check if job is complete
313308
if any(t.status in [Status.ERROR, Status.STOPPED, Status.KILLED] for t in job.tasks):
@@ -342,20 +337,48 @@ def startTask(self, task: Task):
342337
task.status = Status.RUNNING
343338
task.started_at = datetime.now()
344339
# Create log file
340+
additional_env = {
341+
"LOCALFARM_CURRENT_JID": str(task.jid),
342+
"LOCALFARM_CURRENT_TID": str(task.tid),
343+
"MR_LOCAL_FARM_PATH": str(self.root)
344+
}
345+
additional_env.update(task.env)
346+
process_env = os.environ.copy()
347+
process_env.update(additional_env)
345348
try:
349+
346350
with open(task.logFile, "w") as log:
351+
log.write(f"# ========== Starting task {task.tid} at {task.started_at.isoformat()}" \
352+
f" (command=\"{task.command}\") ==========\n")
353+
log.write(f"# Additional env variables:\n")
354+
for _k, _v in additional_env.items():
355+
log.write(f"# - {str(_k)}={str(_v)}\n")
356+
log.write(f"\n")
347357
task.process = subprocess.Popen(
348358
task.command,
349359
# shlex.split(task.command),
350360
stdout=log,
351361
stderr=log,
352362
cwd=task.taskDir,
363+
env=process_env,
353364
shell=True
354365
)
355366
except Exception as e:
356367
logger.error(f"Failed to start task {task.tid}: {e}")
357368
task.status = "error"
358369
task.finished_at = datetime.now()
370+
371+
def finishTask(self, task: Task, returncode: int):
372+
task.finished_at = datetime.now()
373+
task.return_code = returncode
374+
if returncode == 0:
375+
task.status = Status.SUCCESS
376+
logger.info(f"Task {task.tid} completed")
377+
else:
378+
task.status = Status.ERROR
379+
logger.error(f"Task {task.tid} failed with code {returncode}")
380+
with open(task.logFile, "a") as log:
381+
log.write(f"\n# ========== Task {task.tid} finished at {task.finished_at.isoformat()} ==========\n")
359382

360383
def cleanup(self):
361384
logger.info("Cleaning up...")
@@ -393,15 +416,15 @@ def create_job(self, name):
393416
logger.info(f"Created job {jid}")
394417
return {"success": True, "jid": jid}
395418

396-
def create_task(self, jid, name, command, metadata, dependencies):
419+
def create_task(self, jid, name, command, metadata, dependencies, env=None):
397420
"""Add a task to a job"""
398421
with self.lock:
399422
if jid not in self.jobs:
400423
return {"success": False, "error": "Job not found"}
401424
job = self.jobs[jid]
402425
job.lastJid += 1
403426
tid = job.lastJid
404-
task = Task(jid, tid, name, command, metadata, job.jobDir)
427+
task = Task(jid, tid, name, command, metadata, job.jobDir, env=env)
405428
job.tasks.append(task)
406429
for parentTid in dependencies:
407430
parentTask = next((t for t in job.tasks if t.tid == parentTid), None)
@@ -412,14 +435,15 @@ def create_task(self, jid, name, command, metadata, dependencies):
412435
logger.info(f"Added task {tid} to job {jid}")
413436
return {"success": True, "tid": tid}
414437

415-
def expand_task(self, jid, name, command, metadata, parentTid):
438+
def expand_task(self, jid, name, command, metadata, parentTid, env=None):
416439
with self.lock:
417440
if jid not in self.jobs:
441+
logger.info(f"Available jobs: {list(self.jobs.keys())}")
418442
return {"success": False, "error": "Job not found"}
419443
job = self.jobs[jid]
420444
job.lastJid += 1
421445
tid = job.lastJid
422-
task = Task(jid, tid, name, command, metadata, job.jobDir)
446+
task = Task(jid, tid, name, command, metadata, job.jobDir, env=env)
423447
task.status = Status.SUBMITTED
424448
job.tasks.append(task)
425449
parentTask = next((t for t in job.tasks if t.tid == parentTid), None)
@@ -456,7 +480,6 @@ def get_job_infos(self, jid):
456480
if jid not in self.jobs:
457481
return {'success': False, "error": "Job not found"}
458482
job = self.jobs[jid]
459-
logger.info(f"Job infos : {job.to_dict()}")
460483
return {"success": True, "result": job.to_dict()}
461484

462485
def pause_job(self, jid):
@@ -538,7 +561,7 @@ def list_jobs(self):
538561
with self.lock:
539562
return {
540563
"success": True,
541-
"jobs": {jid: job.to_dict() for jid, job in self.jobs.items()}
564+
"jobs": [job.to_dict() for job in self.jobs.values()]
542565
}
543566

544567

@@ -609,6 +632,9 @@ def main(root):
609632

610633
if __name__ == "__main__":
611634
parser = argparse.ArgumentParser(description='Execute a Graph of processes.')
612-
parser.add_argument('farm_root', type=str, help='Root path for the farm.')
635+
parser.add_argument('--root', type=str, required=False, help='Root path for the farm.')
613636
args = parser.parse_args()
614-
main(args.farm_root)
637+
root = args.root
638+
if not root:
639+
root = os.getenv("MR_LOCAL_FARM_PATH", os.path.join(os.path.expanduser("~"), ".local_farm"))
640+
main(root)

0 commit comments

Comments
 (0)