-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[desc] Execute pre & post process in the farm #2984
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 17 commits
2a70b02
838661f
60e2b56
518f2f4
2834d94
fefd761
dd9719d
8042632
dd5ca13
f8ea4cf
235cf4e
204219d
3450dbb
384a175
6593cd5
ceb00aa
f36f672
10782bf
d5c1e3f
31e04e6
e23ee67
5afa76d
192652a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ meshroom.setupEnvironment() | |
| import meshroom.core | ||
| import meshroom.core.graph | ||
| from meshroom.core.node import Status | ||
| from meshroom.core.node import ChunkIndex | ||
|
|
||
|
|
||
| parser = argparse.ArgumentParser(description='Execute a Graph of processes.') | ||
|
|
@@ -49,11 +50,17 @@ parser.add_argument('-v', '--verbose', | |
| default=os.environ.get('MESHROOM_VERBOSE', 'info'), | ||
| choices=['fatal', 'error', 'warning', 'info', 'debug', 'trace']) | ||
|
|
||
| parser.add_argument('-i', '--iteration', type=int, | ||
| default=-1, help='') | ||
| parser.add_argument('-i', '--iteration', type=int, default=ChunkIndex.NONE, help='') | ||
| parser.add_argument('--preprocess', help='Execute preprocess chunk', action='store_true') | ||
| parser.add_argument('--postprocess', help='Execute postprocess chunk', action='store_true') | ||
|
|
||
| args = parser.parse_args() | ||
|
|
||
| if args.preprocess: | ||
| args.iteration = ChunkIndex.PREPROCESS | ||
| elif args.postprocess: | ||
| args.iteration = ChunkIndex.POSTPROCESS | ||
|
|
||
| # Setup the verbose level | ||
| if args.extern: | ||
| # For extern computation, we want to focus on the node computation log. | ||
|
|
@@ -131,15 +138,21 @@ if args.node: | |
| print(f"InputNode: No computation to do.") | ||
| sys.exit(0) | ||
|
|
||
| if args.preprocess: | ||
| chunks = [node._preprocessChunk] | ||
|
Alxiice marked this conversation as resolved.
|
||
| elif args.postprocess: | ||
| chunks = [node._postprocessChunk] | ||
| elif args.iteration == ChunkIndex.NONE: # Warning : default value | ||
| chunks = node.chunks | ||
| else: | ||
| chunks = [node.chunks[args.iteration]] | ||
|
|
||
| if not args.forceStatus and not args.forceCompute: | ||
| if args.iteration != -1: | ||
| chunks = [node.chunks[args.iteration]] | ||
| else: | ||
| chunks = node.chunks | ||
| for chunk in chunks: | ||
| if chunk.status.status in submittedStatuses: | ||
| # Particular case for the local isolated, the node status is set to RUNNING by the submitter directly. | ||
| # We ensure that no other instance has started to compute, by checking that the computeSessionUid is empty. | ||
| # We ensure that no other instance has start | ||
|
Alxiice marked this conversation as resolved.
Outdated
|
||
| # ed to compute, by checking that the computeSessionUid is empty. | ||
| if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and \ | ||
| not chunk.status.computeSessionUid and node._nodeStatus.submitterSessionUid: | ||
| continue | ||
|
|
@@ -149,25 +162,29 @@ if args.node: | |
| if args.extern: | ||
| # Restore the log level | ||
| logging.getLogger().setLevel(meshroom.logStringToPython[args.verbose]) | ||
|
|
||
| node.prepareLogger(args.iteration) | ||
| node.preprocess() | ||
| if args.iteration != -1: | ||
| chunk = node.chunks[args.iteration] | ||
| if chunk._status.status == Status.STOPPED: | ||
| print(f"Chunk {chunk}: status is STOPPED") | ||
| killRunningJob(node) | ||
| chunk.process(args.forceCompute, args.inCurrentEnv) | ||
| else: | ||
|
|
||
| if args.iteration == ChunkIndex.NONE: | ||
| # Process the whole node | ||
| if node.nodeStatus.status == Status.STOPPED: | ||
| print(f"Node {node}: status is STOPPED") | ||
| killRunningJob(node) | ||
| node.createChunks() | ||
| node.prepareLogger(args.iteration) | ||
| node.preprocess(args.forceCompute, args.inCurrentEnv) | ||
| node.process(args.forceCompute, args.inCurrentEnv) | ||
| node.postprocess() | ||
| node.restoreLogger() | ||
| node.postprocess(args.forceCompute, args.inCurrentEnv) | ||
| node.restoreLogger() | ||
| else: | ||
| chunk = chunks[0] | ||
| if chunk._status.status == Status.STOPPED: | ||
| print(f"Chunk {chunk}: status is STOPPED") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it's okay I will not do the modification here because we already used this before and it's used a lot (like a 100 times or so). But we should do another PR for this |
||
| killRunningJob(node) | ||
| node.prepareLogger(args.iteration) | ||
| chunk.process(args.forceCompute, args.inCurrentEnv) | ||
| node.restoreLogger() | ||
|
|
||
| else: | ||
| if args.iteration != -1: | ||
| if args.iteration != ChunkIndex.NONE: | ||
| print('Error: "--iteration" only makes sense when used with "--node".') | ||
| sys.exit(-1) | ||
| toNodes = None | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,174 @@ | ||
| # Meshroom Local Farm | ||
|
|
||
| This folder contains a local farm tool for meshroom. It can be used in various ways : | ||
| - For testing we setup and launch the farm backend process and use it to test the submitting process | ||
| - We also added a submitter to be able to use it inside meshroom | ||
|
|
||
| > [!NOTE] | ||
| > Note that the local famr only works in Unix for now because we use `fork` for daemonization. | ||
| > We could implement the [`DETACHED_PROCESS`](https://stackoverflow.com/a/12854376) flag with `subprocess.Popen` | ||
| > to handle the farm in Windows. | ||
|
|
||
| ## How to use | ||
|
|
||
| ### Launch | ||
|
|
||
| First launch the farm process | ||
| ```sh | ||
| python localfarm/localFarmLauncher.py start --root <FARM_ROOT> | ||
| ``` | ||
|
|
||
| The `FARM_ROOT` folder will contain the logs for each process and for the main process. | ||
|
|
||
| ### Commands | ||
|
|
||
| - _start_ : Launch the farm | ||
| - _clean_ : Clean the files | ||
| - _stop_ : Stop the farm process | ||
| - _restart_ : Restart the farm process | ||
| - _status_ : Check the status | ||
| - _fullInfo_ : Display additional info | ||
|
|
||
| ### Add jobs | ||
|
|
||
| The `test.py` script can be used to find examples on how to use it. | ||
| Basically here's how to create jobs and tasks : | ||
|
|
||
| ```py | ||
| import os | ||
| import datetime | ||
| from time import sleep | ||
| from collections import defaultdict | ||
| from localfarm.localFarm import Task, Job, LocalFarmEngine | ||
|
|
||
| def now(): | ||
| now = datetime.datetime.now() | ||
| return now.strftime("%H:%M:%S ") | ||
|
|
||
| def createTask(job, command, dependencies=[], _tasks=[]): | ||
| i = len(_tasks) | ||
| task = Task(f"Task {i}", f"echo '> Task {i}' && {command}") | ||
| job.addTask(task) | ||
| for parentTask in dependencies: | ||
| job.addTaskDependency(task, parentTask) | ||
| return task | ||
|
|
||
| def getTasksByStatus(jid): | ||
| jobInfo = engine.get_job_status(jid) | ||
| if not jobInfo: | ||
| return {} | ||
| taskByStatus = defaultdict(set) | ||
| for task in jobInfo.get("tasks", []): | ||
| status = task.get("status", "UNKNOWN") | ||
| taskByStatus[status].add(task.get("tid")) | ||
| return dict(taskByStatus) | ||
|
|
||
| # Get engine | ||
| engine = LocalFarmEngine(FARM_ROOT) | ||
| # Create job | ||
| job = Job("Example Job") | ||
| job.setEngine(engine) | ||
| # Add tasks | ||
| task1 = createTask(job, command="sleep 2", dependencies=[]) | ||
| task2 = createTask(job, command="sleep 2", dependencies=[task1]) | ||
| task3 = createTask(job, command="sleep 2", dependencies=[task1]) | ||
| task4 = createTask(job, command="sleep 2", dependencies=[task2, task3]) | ||
| task5 = createTask(job, command="sleep 2", dependencies=[task4]) | ||
| # Submit job | ||
| res = job.submit() | ||
| jid = res['jid'] | ||
| print(now() + f"-> job: {res}") | ||
|
|
||
| # Monitor job | ||
| currentRunningTids = set() | ||
| while True: | ||
| sleep(1) | ||
| tasks = getTasksByStatus(jid) | ||
| if not tasks: | ||
| print("No tasks found for job") | ||
| break | ||
| runningTids = tasks.get("RUNNING") | ||
| activeTasks = tasks.get("SUBMITTED", set()).union(tasks.get("RUNNING", set())) | ||
| if not activeTasks: | ||
| print(now() + "All tasks completed") | ||
| break | ||
| if runningTids: | ||
| runningTids = [int(t) for t in runningTids] | ||
| newRunningTasks = set(runningTids) | ||
| if currentRunningTids != newRunningTasks: | ||
| print(now() + f"Now running tasks: {runningTids} (active tasks: {activeTasks})") | ||
| currentRunningTids = newRunningTasks | ||
| ``` | ||
|
|
||
| And this gives : | ||
|
|
||
| ``` | ||
| 10:54:36 -> job: {'jid': 1} | ||
| 10:54:37 Now running tasks: [1] (active tasks: {1, 2, 3, 4, 5}) | ||
| 10:54:39 Now running tasks: [2, 3] (active tasks: {2, 3, 4, 5}) | ||
| 10:54:41 Now running tasks: [4] (active tasks: {4, 5}) | ||
| 10:54:44 Now running tasks: [5] (active tasks: {5}) | ||
| 10:54:47 All tasks completed | ||
| ``` | ||
|
|
||
| ### Launch the backend from a python process | ||
|
|
||
| Instead of using the command line you can also use the launcher as an API : | ||
|
|
||
| ```py | ||
| from localfarm.localFarmLauncher import FarmLauncher | ||
|
|
||
| # Launch | ||
| launcher = FarmLauncher(root=FARM_ROOT) | ||
| launcher.start() | ||
| # Add jobs & tasks & submit | ||
| ... | ||
|
|
||
| # Check status | ||
| launcher.status() | ||
|
|
||
| # Stop the farm | ||
| launcher.stop() | ||
| ``` | ||
|
|
||
| And here are the logs : | ||
| ``` | ||
| <!-- Launch --> | ||
| Clean farm files... | ||
| Done. | ||
| Starting farm backend... | ||
| Farm root is: /homes/$USER/.local_farm | ||
| Farm backend started (PID: 6776) | ||
| Logs: /homes/$USER/.local_farm/backend.log | ||
|
|
||
| <!-- Interrogate status --> | ||
| Farm backend is running (PID: 6776) | ||
| [LocalFarm][INFO] Connect to farm located at FARM_ROOT | ||
| Active jobs: 1 | ||
| - 1: RUNNING (5 tasks) -> {'SUCCESS': {1}, 'RUNNING': {2, 3}, 'SUBMITTED': {4, 5}} | ||
|
|
||
| <!-- Stop the farm --> | ||
| Stopping farm backend (PID: 6776)... | ||
| Farm backend stopped | ||
| ``` | ||
|
|
||
| ## Logs | ||
|
|
||
| Here are the files we can find on the farm root : | ||
| ``` | ||
| . | ||
| ├── backend.log | ||
| ├── backend.port | ||
| ├── farm.pid | ||
| └── jobs | ||
| └── jid | ||
| └── tasks | ||
| ├── tid_min.log | ||
| ├── ... | ||
| └── tid_max.log | ||
| ``` | ||
|
|
||
| - _backend.log_ contains the logs for the backend process | ||
| - _farm.pid_ contains the PID for the backend process | ||
| - _backend.port_ contains the port used for the TCP connection | ||
| - In the jobs folder you can find all logs for the tasks of each job. The structure is : `jobs/{jid}/tasks/{tid}.log` |
Uh oh!
There was an error while loading. Please reload this page.