Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 38 additions & 21 deletions bin/meshroom_compute
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ meshroom.setupEnvironment()
import meshroom.core
import meshroom.core.graph
from meshroom.core.node import Status
from meshroom.core.node import ChunkIndex


parser = argparse.ArgumentParser(description='Execute a Graph of processes.')
Expand Down Expand Up @@ -49,11 +50,17 @@ parser.add_argument('-v', '--verbose',
default=os.environ.get('MESHROOM_VERBOSE', 'info'),
choices=['fatal', 'error', 'warning', 'info', 'debug', 'trace'])

parser.add_argument('-i', '--iteration', type=int,
default=-1, help='')
parser.add_argument('-i', '--iteration', type=int, default=ChunkIndex.NONE, help='')
parser.add_argument('--preprocess', help='Execute preprocess chunk', action='store_true')
parser.add_argument('--postprocess', help='Execute postprocess chunk', action='store_true')

args = parser.parse_args()

if args.preprocess:
args.iteration = ChunkIndex.PREPROCESS
elif args.postprocess:
args.iteration = ChunkIndex.POSTPROCESS

# Setup the verbose level
if args.extern:
# For extern computation, we want to focus on the node computation log.
Expand Down Expand Up @@ -102,7 +109,7 @@ if args.node:
# If not running as "extern", the SUBMITTED status should generate a warning.
submittedStatuses.append(Status.SUBMITTED)

if not node._chunksCreated:
if not node._chunksCreated and args.iteration >= 0:
print(f"Error: Node {node} has been submitted before chunks have been created." \
f"See file: \"{node.nodeStatusFile}\".")
sys.exit(-1)
Expand All @@ -111,15 +118,21 @@ if args.node:
print(f"InputNode: No computation to do.")
sys.exit(0)

if args.preprocess:
chunks = [node._preprocessChunk]
elif args.postprocess:
chunks = [node._postprocessChunk]
elif args.iteration == ChunkIndex.NONE: # Warning : default value
chunks = node.chunks
else:
chunks = [node.chunks[args.iteration]]

if not args.forceStatus and not args.forceCompute:
if args.iteration != -1:
chunks = [node.chunks[args.iteration]]
else:
chunks = node.chunks
for chunk in chunks:
if chunk.status.status in submittedStatuses:
# Particular case for the local isolated, the node status is set to RUNNING by the submitter directly.
# We ensure that no other instance has started to compute, by checking that the computeSessionUid is empty.
# We ensure that no other instance has start
# ed to compute, by checking that the computeSessionUid is empty.
if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and \
not chunk.status.computeSessionUid and node._nodeStatus.submitterSessionUid:
continue
Expand All @@ -129,24 +142,28 @@ if args.node:
if args.extern:
# Restore the log level
logging.getLogger().setLevel(meshroom.logStringToPython[args.verbose])

node.prepareLogger(args.iteration)
node.preprocess()
if args.iteration != -1:
chunk = node.chunks[args.iteration]
if chunk._status.status == Status.STOPPED:
print(f"Chunk {chunk}: status is STOPPED")
killRunningJob(node)
chunk.process(args.forceCompute, args.inCurrentEnv)
else:

if args.iteration == ChunkIndex.NONE:
# Process the whole node
if node.nodeStatus.status == Status.STOPPED:
print(f"Node {node}: status is STOPPED")
killRunningJob(node)
node.preprocess(args.forceCompute, args.inCurrentEnv)
node.prepareLogger(args.iteration)
node.process(args.forceCompute, args.inCurrentEnv)
node.postprocess()
node.restoreLogger()
node.restoreLogger()
node.postprocess(args.forceCompute, args.inCurrentEnv)
else:
chunk = chunks[0]
if chunk._status.status == Status.STOPPED:
print(f"Chunk {chunk}: status is STOPPED")
killRunningJob(node)
node.prepareLogger(args.iteration)
chunk.process(args.forceCompute, args.inCurrentEnv)
node.restoreLogger()

else:
if args.iteration != -1:
if args.iteration != ChunkIndex.NONE:
print('Error: "--iteration" only makes sense when used with "--node".')
sys.exit(-1)
toNodes = None
Expand Down
4 changes: 2 additions & 2 deletions bin/meshroom_createChunks
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ if not args.forceStatus and not args.forceCompute:
f"submitterSessionUid: {node._nodeStatus.submitterSessionUid}")

if chunksToProcess:
node.prepareLogger()
node.preprocess()
node.prepareLogger()
for chunk in chunksToProcess:
logging.info(f"[MeshroomCreateChunks] process chunk {chunk}")
chunk.process(args.forceCompute, args.inCurrentEnv)
node.postprocess()
node.restoreLogger()
node.postprocess()
else:
logging.info(f"[MeshroomCreateChunks] -> create job to process chunks {[c for c in node.chunks]}")
submitter.createChunkTask(node, graphFile=args.graphFile, cache=args.cache,
Expand Down
174 changes: 174 additions & 0 deletions localfarm/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# Meshroom Local Farm

This folder contains a local farm tool for meshroom. It can be used in various ways :
- For testing we setup and launch the farm backend process and use it to test the submitting process
- We also added a submitter to be able to use it inside meshroom

> [!NOTE]
> Note that the local famr only works in Unix for now because we use `fork` for daemonization.
> We could implement the [`DETACHED_PROCESS`](https://stackoverflow.com/a/12854376) flag with `subprocess.Popen`
> to handle the farm in Windows.

## How to use

### Launch

First launch the farm process
```sh
python localfarm/localFarmLauncher.py start --root <FARM_ROOT>
```

The `FARM_ROOT` folder will contain the logs for each process and for the main process.

### Commands

- _start_ : Launch the farm
- _clean_ : Clean the files
- _stop_ : Stop the farm process
- _restart_ : Restart the farm process
- _status_ : Check the status
- _fullInfo_ : Display additional info

### Add jobs

The `test.py` script can be used to find examples on how to use it.
Basically here's how to create jobs and tasks :

```py
import os
import datetime
from time import sleep
from collections import defaultdict
from localfarm.localFarm import Task, Job, LocalFarmEngine

def now():
now = datetime.datetime.now()
return now.strftime("%H:%M:%S ")

def createTask(job, command, dependencies=[], _tasks=[]):
i = len(_tasks)
task = Task(f"Task {i}", f"echo '> Task {i}' && {command}")
job.addTask(task)
for parentTask in dependencies:
job.addTaskDependency(task, parentTask)
return task

def getTasksByStatus(jid):
jobInfo = engine.get_job_status(jid)
if not jobInfo:
return {}
taskByStatus = defaultdict(set)
for task in jobInfo.get("tasks", []):
status = task.get("status", "UNKNOWN")
taskByStatus[status].add(task.get("tid"))
return dict(taskByStatus)

# Get engine
engine = LocalFarmEngine(FARM_ROOT)
# Create job
job = Job("Example Job")
job.setEngine(engine)
# Add tasks
task1 = createTask(job, command="sleep 2", dependencies=[])
task2 = createTask(job, command="sleep 2", dependencies=[task1])
task3 = createTask(job, command="sleep 2", dependencies=[task1])
task4 = createTask(job, command="sleep 2", dependencies=[task2, task3])
task5 = createTask(job, command="sleep 2", dependencies=[task4])
# Submit job
res = job.submit()
jid = res['jid']
print(now() + f"-> job: {res}")

# Monitor job
currentRunningTids = set()
while True:
sleep(1)
tasks = getTasksByStatus(jid)
if not tasks:
print("No tasks found for job")
break
runningTids = tasks.get("RUNNING")
activeTasks = tasks.get("SUBMITTED", set()).union(tasks.get("RUNNING", set()))
if not activeTasks:
print(now() + "All tasks completed")
break
if runningTids:
runningTids = [int(t) for t in runningTids]
newRunningTasks = set(runningTids)
if currentRunningTids != newRunningTasks:
print(now() + f"Now running tasks: {runningTids} (active tasks: {activeTasks})")
currentRunningTids = newRunningTasks
```

And this gives :

```
10:54:36 -> job: {'jid': 1}
10:54:37 Now running tasks: [1] (active tasks: {1, 2, 3, 4, 5})
10:54:39 Now running tasks: [2, 3] (active tasks: {2, 3, 4, 5})
10:54:41 Now running tasks: [4] (active tasks: {4, 5})
10:54:44 Now running tasks: [5] (active tasks: {5})
10:54:47 All tasks completed
```

### Launch the backend from a python process

Instead of using the command line you can also use the launcher as an API :

```py
from localfarm.localFarmLauncher import FarmLauncher

# Launch
launcher = FarmLauncher(root=FARM_ROOT)
launcher.start()
# Add jobs & tasks & submit
...

# Check status
launcher.status()

# Stop the farm
launcher.stop()
```

And here are the logs :
```
<!-- Launch -->
Clean farm files...
Done.
Starting farm backend...
Farm root is: /homes/$USER/.local_farm
Farm backend started (PID: 6776)
Logs: /homes/$USER/.local_farm/backend.log

<!-- Interrogate status -->
Farm backend is running (PID: 6776)
[LocalFarm][INFO] Connect to farm located at FARM_ROOT
Active jobs: 1
- 1: RUNNING (5 tasks) -> {'SUCCESS': {1}, 'RUNNING': {2, 3}, 'SUBMITTED': {4, 5}}

<!-- Stop the farm -->
Stopping farm backend (PID: 6776)...
Farm backend stopped
```

## Logs

Here are the files we can find on the farm root :
```
.
├── backend.log
├── backend.port
├── farm.pid
└── jobs
└── jid
└── tasks
├── tid_min.log
├── ...
└── tid_max.log
```

- _backend.log_ contains the logs for the backend process
- _farm.pid_ contains the PID for the backend process
- _backend.port_ contains the port used for the TCP connection
- In the jobs folder you can find all logs for the tasks of each job. The structure is : `jobs/{jid}/tasks/{tid}.log`
5 changes: 3 additions & 2 deletions localfarm/localFarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(name)s][%(levelname)s] %(message)s'
format='%(asctime)s [%(name)s][%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
logger = logging.getLogger("LocalFarm")
logger.setLevel(logging.INFO)
Expand All @@ -31,7 +32,7 @@ def __init__(self, root):

def connect(self):
""" Connect to the backend. """
print("Connect to farm located at", self.root)
logger.info(f"Connect to farm located at {self.root}")
if self.tcpPortFile.exists():
try:
port = int(self.tcpPortFile.read_text())
Expand Down
1 change: 1 addition & 0 deletions localfarm/localFarmBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ def startTask(self, task: Task):
with open(task.logFile, "w") as log:
log.write(f"# ========== Starting task {task.tid} at {task.started_at.isoformat()}"
f" (command=\"{task.command}\") ==========\n")
log.write(f"# metadata: {task.metadata}\n")
log.write(f"# process_env:\n")
log.write(f"# Additional env variables:\n")
for _k, _v in additional_env.items():
Expand Down
19 changes: 17 additions & 2 deletions meshroom/core/desc/node.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# desc/node.py

import enum
from inspect import getfile
from pathlib import Path
Expand Down Expand Up @@ -232,6 +234,11 @@ def preprocess(self, node):
"""
pass

@property
def _hasPreprocess(self):
""" Returns True if the class has a preprocess """
return type(self).preprocess is not BaseNode.preprocess

def postprocess(self, node):
""" Gets invoked after the processChunk method for the node.

Expand All @@ -240,6 +247,11 @@ def postprocess(self, node):
"""
pass

@property
def _hasPostprocess(self):
""" Returns True if the class has a postprocess """
return type(self).postprocess is not BaseNode.postprocess

def process(self, node):
raise NotImplementedError(f'No process implementation on node: "{node.name}"')

Expand Down Expand Up @@ -395,8 +407,11 @@ def processChunkInEnvironment(self, chunk):
meshroomComputeCmd = f"{chunk.node.nodeDesc.pythonExecutable} {_MESHROOM_COMPUTE}" + \
f" \"{chunk.node.graph.filepath}\" --node {chunk.node.name}" + \
" --extern --inCurrentEnv"

if len(chunk.node.getChunks()) > 1:
if chunk.isPreprocess:
meshroomComputeCmd += f" --preprocess"
elif chunk.isPostprocess:
meshroomComputeCmd += f" --postprocess"
elif len(chunk.node.getChunks()) >= 1:
meshroomComputeCmd += f" --iteration {chunk.range.iteration}"

runtimeEnv = chunk.node.nodeDesc.plugin.runtimeEnv
Expand Down
4 changes: 2 additions & 2 deletions meshroom/core/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1781,7 +1781,7 @@ def executeGraph(graph, toNodes=None, forceCompute=False, forceStatus=False):
logging.warning(f"{node.name} is in Compatibility Mode and cannot be computed: {node.issueDetails}.")
continue

node.preprocess()
node.preprocess(forceCompute)
if not node._chunksCreated:
node.createChunks()
multiChunks = len(node.chunks) > 1
Expand All @@ -1793,7 +1793,7 @@ def executeGraph(graph, toNodes=None, forceCompute=False, forceStatus=False):
else:
print(f'\n[{n + 1}/{len(nodes)}] {node.nodeType}')
chunk.process(forceCompute)
node.postprocess()
node.postprocess(forceCompute)
except Exception as exc:
logging.error(f"Error on node computation: {exc}")
graph.clearSubmittedNodes()
Expand Down
Loading
Loading