Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
2eca1dc
[core] Update submitter API
Alxiice Oct 13, 2025
b7c9594
[core] node/taskManager: create _chunksCreated to delay chunk creatio…
Alxiice Oct 13, 2025
90df8e2
[node] Add licenses list on node desc to provide a source where we ca…
Alxiice Oct 13, 2025
1d64ca9
[core] computation : update computation levels
Alxiice Oct 13, 2025
3ad4182
[bin] Add createChunks script
Alxiice Oct 13, 2025
577f4c3
[submitter] Fix SubmitterOptionsEnum.ALL mode on py 3.9
Alxiice Oct 13, 2025
5ed11ef
[qml] Fix anchor issue when chunks are emptied
Alxiice Oct 13, 2025
6a459d5
[core] Node : add defaultStatus in _createChunks
Alxiice Oct 17, 2025
254c106
[core] Start updating taskmanager and submitter for new chunk process
Alxiice Oct 21, 2025
0dda5c1
[core] First implementation to kill submitted tasks
Alxiice Oct 21, 2025
c48d0c4
[core] graph : Manage nodeStatus file monitoring
Alxiice Oct 21, 2025
b28417f
[code] submitter : fix issues in dynamic chunks & submitting
Alxiice Oct 24, 2025
280ad9b
[code] graph : Better management of statuses after task/job actions
Alxiice Oct 24, 2025
945e375
[core] Fix issues on missing chunks fro sfm node & node chunk indicator
Alxiice Oct 27, 2025
ac02b20
[core] submitter : retrieve job on node update + fix some ui issues
Alxiice Oct 28, 2025
747e8e8
[chunks] Apply typos/cleaning suggestions from @cbentejac
Alxiice Nov 6, 2025
527cff5
[submitter] Add tools to avoid autoretry on farm
Alxiice Nov 6, 2025
bf666e7
[submitter] Fix interruptJob UI updates
Alxiice Nov 6, 2025
7770d5c
[bin] Update permissions on `meshroom_createChunks`
cbentejac Nov 18, 2025
9492acb
[core] node: Correctly use custom size for non-parallelized nodes
cbentejac Nov 24, 2025
445cab2
[bin] Fix typo: Replace occurrences of "infos" with "info"
cbentejac Nov 24, 2025
5aca21f
[core] Fix typo: Replace all occurrences of "infos" with "info"
cbentejac Nov 24, 2025
d12e434
[core] Linting: Remove all trailing whitespaces
cbentejac Nov 24, 2025
c169a1d
[core] node: Remove references to `packageVersion` in `NodeStatusData`
cbentejac Nov 26, 2025
02f50d9
[core] node: Remove static info from the chunks' status file
cbentejac Nov 26, 2025
af397c2
Linting: Remove trailing whitespaces
cbentejac Nov 26, 2025
ced2c05
[core] node: Use explicit keys for chunks' blockSize, fullSize and nb…
cbentejac Nov 27, 2025
32c280e
[core] node: Detect whether external jobs can be stopped or canceled
cbentejac Nov 27, 2025
67a67b0
[ui] Add `stoppable` state for the `Submit` button
cbentejac Dec 1, 2025
4f96f9d
[ui] NodeActions: Add `Retry` button for submitted tasks on error state
cbentejac Dec 1, 2025
5c468f4
[GraphEditor] NodeChunks: Remove specific color for dynamic chunks
cbentejac Dec 1, 2025
1222ec5
[ui] NodeActions: Fix status of `compute` and `submit` in `deletable`…
cbentejac Dec 2, 2025
c22e40f
[GraphEditor] Add "Retry Error Tasks" menu to match NodeActions
cbentejac Dec 2, 2025
b2d9ab6
[GraphEditor] Add "Interrupt/Cancel Job" menus
cbentejac Dec 2, 2025
60074bd
[GraphEditor] NodeChunks: Don't add specific display when there's no …
cbentejac Dec 3, 2025
e58dce2
[core] node: Add `chunkPlaceholder` property
cbentejac Dec 3, 2025
ccbc890
[ui] Use chunk placeholder for uncreated dynamic chunks
cbentejac Dec 3, 2025
4764d71
[core] graph: Trigger `onGraphUpdated` slot when chunks change
cbentejac Dec 3, 2025
22ae573
[ui] Application: Update state of the global "Submit" icon when needed
cbentejac Dec 3, 2025
b25aeb7
`.git-blame-ignore-revs`: Add linting commits
cbentejac Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Linting: Remove trailing whitespaces
af397c2ab6b9a8bc446a81ce7fd162a351895673
# [core] Linting: Remove all trailing whitespaces
d12e434998dcdbe9d014bccae4f9294665bd96d4
# [tests] Linting: Remove trailing whitespaces
5fe886b6b08fa19082dc0e1bf837fa34c2e2de2d
# [core] Linting: Remove remaining trailing whitespaces
Expand Down
41 changes: 37 additions & 4 deletions bin/meshroom_compute
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import argparse
import logging
import os
import sys
from typing import NoReturn

try:
import meshroom
Expand All @@ -16,7 +17,7 @@ meshroom.setupEnvironment()

import meshroom.core
import meshroom.core.graph
from meshroom.core.node import Status, ExecMode
from meshroom.core.node import Status


parser = argparse.ArgumentParser(description='Execute a Graph of processes.')
Expand Down Expand Up @@ -63,12 +64,28 @@ else:

meshroom.core.initPlugins()
meshroom.core.initNodes()
meshroom.core.initSubmitters()

graph = meshroom.core.graph.loadGraph(args.graphFile)
if args.cache:
graph.cacheDir = args.cache
graph.update()


def killRunningJob(node) -> NoReturn:
""" Kills current job and try to avoid job restarting """
jobInfo = node.nodeStatus.jobInfo
submitterName = jobInfo.get("submitterName")
if not submitterName:
sys.exit(meshroom.MeshroomExitStatus.ERROR_NO_RETRY)
from meshroom.core import submitters
for subName, sub in submitters.items():
if submitterName == subName:
sub.killRunningJob()
break
sys.exit(meshroom.MeshroomExitStatus.ERROR_NO_RETRY)


if args.node:
node = graph.findNode(args.node)
submittedStatuses = [Status.RUNNING]
Expand All @@ -83,6 +100,15 @@ if args.node:
# If running as "extern", the task is supposed to have the status SUBMITTED.
# If not running as "extern", the SUBMITTED status should generate a warning.
submittedStatuses.append(Status.SUBMITTED)

if not node._chunksCreated:
print(f"Error: Node {node} has been submitted before chunks have been created." \
"See file: \"{node.nodeStatusFile}\".")
sys.exit(-1)

if node._isInputNode():
print(f"InputNode: No computation to do.")

if not args.forceStatus and not args.forceCompute:
if args.iteration != -1:
chunks = [node.chunks[args.iteration]]
Expand All @@ -91,10 +117,11 @@ if args.node:
for chunk in chunks:
if chunk.status.status in submittedStatuses:
# Particular case for the local isolated, the node status is set to RUNNING by the submitter directly.
# We ensure that no other instance has started to compute, by checking that the sessionUid is empty.
if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and not chunk.status.sessionUid and chunk.status.submitterSessionUid:
# We ensure that no other instance has started to compute, by checking that the computeSessionUid is empty.
if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and \
not chunk.status.computeSessionUid and node._nodeStatus.submitterSessionUid:
continue
print(f'Warning: Node is already submitted with status "{chunk.status.status.name}". See file: "{chunk.statusFile}". ExecMode: {chunk.status.execMode.name}, SessionUid: {chunk.status.sessionUid}, submitterSessionUid: {chunk.status.submitterSessionUid}')
print(f'Warning: Node is already submitted with status "{chunk.status.status.name}". See file: "{chunk.statusFile}". ExecMode: {chunk.status.execMode.name}, computeSessionUid: {chunk.status.computeSessionUid}, submitterSessionUid: {node._nodeStatus.submitterSessionUid}')
# sys.exit(-1)

if args.extern:
Expand All @@ -105,8 +132,14 @@ if args.node:
node.preprocess()
if args.iteration != -1:
chunk = node.chunks[args.iteration]
if chunk._status.status == Status.STOPPED:
print(f"Chunk {chunk} : status is STOPPED")
killRunningJob(node)
chunk.process(args.forceCompute, args.inCurrentEnv)
else:
if node.nodeStatus.status == Status.STOPPED:
print(f"Node {node} : status is STOPPED")
killRunningJob(node)
node.process(args.forceCompute, args.inCurrentEnv)
node.postprocess()
node.restoreLogger()
Expand Down
145 changes: 145 additions & 0 deletions bin/meshroom_createChunks
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#!/usr/bin/env python

"""
This is a script used to wrap the process of processing a node on the farm
It will handle chunk creation and create all the jobs for these chunks
If the submitter cannot create chunks, then it will process the chunks serially
in the current process
"""

import argparse
import logging
import os
import sys
try:
import meshroom
except Exception:
# If meshroom module is not in the PYTHONPATH, add our root using the relative path
import pathlib
meshroomRootFolder = pathlib.Path(__file__).parent.parent.resolve()
sys.path.append(meshroomRootFolder)
import meshroom
meshroom.setupEnvironment()

import meshroom.core
import meshroom.core.graph
from meshroom.core import submitters
from meshroom.core.submitter import SubmitterOptionsEnum
from meshroom.core.node import Status


parser = argparse.ArgumentParser(description='Execute a Graph of processes.')
parser.add_argument('graphFile', metavar='GRAPHFILE.mg', type=str,
help='Filepath to a graph file.')

parser.add_argument('--submitter', type=str, required=True,
help='Name of the submitter used to create the job.')
parser.add_argument('--node', metavar='NODE_NAME', type=str, required=True,
help='Process the node. It will generate an error if the dependencies are not already computed.')
parser.add_argument('--inCurrentEnv', help='Execute process in current env without creating a dedicated runtime environment.',
action='store_true')
parser.add_argument('--forceStatus', help='Force computation if status is RUNNING or SUBMITTED.',
action='store_true')
parser.add_argument('--forceCompute', help='Compute in all cases even if already computed.',
action='store_true')
parser.add_argument('--extern', help='Use this option when you compute externally after submission to a render farm from meshroom.',
action='store_true')
parser.add_argument('--cache', metavar='FOLDER', type=str,
default=None,
help='Override the cache folder')
parser.add_argument('-v', '--verbose',
help='Set the verbosity level for logging:\n'
' - fatal: Show only critical errors.\n'
' - error: Show errors only.\n'
' - warning: Show warnings and errors.\n'
' - info: Show standard informational messages.\n'
' - debug: Show detailed debug information.\n'
' - trace: Show all messages, including trace-level details.',
default=os.environ.get('MESHROOM_VERBOSE', 'info'),
choices=['fatal', 'error', 'warning', 'info', 'debug', 'trace'])

args = parser.parse_args()

# For extern computation, we want to focus on the node computation log.
# So, we avoid polluting the log with general warning about plugins, versions of nodes in file, etc.
logging.getLogger().setLevel(level=logging.INFO)

meshroom.core.initPlugins()
meshroom.core.initNodes()
meshroom.core.initSubmitters() # Required to spool child job

graph = meshroom.core.graph.loadGraph(args.graphFile)
if args.cache:
graph.cacheDir = args.cache
graph.update()

# Execute the node
node = graph.findNode(args.node)
submittedStatuses = [Status.RUNNING]

# Find submitter
submitter = None
# It's required if we want to spool chunks on different machines
for subName, sub in submitters.items():
if args.submitter == subName:
submitter = sub
break

if node._nodeStatus.status in (Status.STOPPED, Status.KILLED):
logging.error("Node status is STOPPED or KILLED.")
if submitter:
submitter.killRunningJob()
sys.exit(meshroom.MeshroomExitStatus.ERROR_NO_RETRY)

if not node._chunksCreated:
# Create node chunks
# Once created we don't have to do it again even if we relaunch the job
node.createChunks()
# Set the chunks statuses
for chunk in node._chunks:
if args.forceCompute or chunk._status.status != Status.SUCCESS:
hasChunkToLaunch = True
chunk._status.setNode(node)
chunk._status.initExternSubmit()
chunk.upgradeStatusFile()

# Get chunks to process in the current process
chunksToProcess = []
if submitter:
if not submitter._options.includes(SubmitterOptionsEnum.EDIT_TASKS):
chunksToProcess = node.chunks
else:
# Cannot retrieve job -> execute process serially
chunksToProcess = node.chunks

logging.info(f"[MeshroomCreateChunks] Chunks to process here : {chunksToProcess}")

if not args.forceStatus and not args.forceCompute:
for chunk in chunksToProcess:
if chunk.status.status in submittedStatuses:
# Particular case for the local isolated, the node status is set to RUNNING by the submitter directly.
# We ensure that no other instance has started to compute, by checking that the sessicomputeSessionUidonUid is empty.
if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and \
not chunk.status.computeSessionUid and node._nodeStatus.submitterSessionUid:
continue
logging.warning(
f"[MeshroomCreateChunks] Node is already submitted with status " \
f"\"{chunk.status.status.name}\". See file: \"{chunk.statusFile}\". " \
f"ExecMode: {chunk.status.execMode.name}, computeSessionUid: {chunk.status.computeSessionUid}, " \
f"submitterSessionUid: {node._nodeStatus.submitterSessionUid}")

if chunksToProcess:
node.prepareLogger()
node.preprocess()
for chunk in chunksToProcess:
logging.info(f"[MeshroomCreateChunks] process chunk {chunk}")
chunk.process(args.forceCompute, args.inCurrentEnv)
node.postprocess()
node.restoreLogger()
else:
logging.info(f"[MeshroomCreateChunks] -> create job to process chunks {node.chunks}")
submitter.createChunkTask(node, graphFile=args.graphFile, cache=args.cache,
forceStatus=args.forceStatus, forceCompute=args.forceCompute)

# Restore the log level
logging.getLogger().setLevel(meshroom.logStringToPython[args.verbose])
14 changes: 13 additions & 1 deletion meshroom/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from distutils import util
from enum import Enum
from enum import Enum, IntEnum
import logging
import os
import sys
Expand Down Expand Up @@ -76,6 +76,18 @@ def logToRoot(message, *args, **kwargs):
logging.getLogger().setLevel(logStringToPython[os.environ.get('MESHROOM_VERBOSE', 'warning')])


class MeshroomExitStatus(IntEnum):
""" In case we want to catch some special case from the parent process
We could use 3-125 for custom exist codes :
https://tldp.org/LDP/abs/html/exitcodes.html
"""
SUCCESS = 0
ERROR = 1
# In some farm tools jobs are automatically re-tried,
# using ERROR_NO_RETRY will try to prevent that
ERROR_NO_RETRY = -999 # It's actually -999 % 256 => 25


def setupEnvironment(backend=Backend.STANDALONE):
"""
Setup environment for Meshroom to work in a prebuilt, standalone configuration.
Expand Down
2 changes: 1 addition & 1 deletion meshroom/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def loadClasses(folder: str, packageName: str, classType: type) -> list[type]:
classes.append(p)
except Exception as exc:
if classType == BaseSubmitter:
logging.warning(f" Could not load submitter {pluginName} from package '{package.__name__}'")
logging.warning(f" Could not load submitter {pluginName} from package '{package.__name__}'\n{exc}")
else:
tb = traceback.extract_tb(exc.__traceback__)
last_call = tb[-1]
Expand Down
9 changes: 7 additions & 2 deletions meshroom/core/desc/computation.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import math
from enum import Enum
from enum import IntEnum

from .attribute import ListAttribute, IntParam


class Level(Enum):
class Level(IntEnum):
NONE = 0
NORMAL = 1
INTENSIVE = 2
EXTREME = 3
SCRIPT=-1


class Range:
Expand Down Expand Up @@ -46,6 +48,9 @@ def toDict(self):
"rangeBlocksCount": self.nbBlocks
}

def __repr__(self):
return f"<Range {self.iteration}({self.blockSize})/{self.nbBlocks}({self.fullSize})>"


class Parallelization:
def __init__(self, staticNbBlocks=0, blockSize=0):
Expand Down
40 changes: 37 additions & 3 deletions meshroom/core/desc/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import shlex
import shutil
import sys
import signal
import subprocess

import psutil

Expand All @@ -20,6 +22,34 @@
_MESHROOM_COMPUTE_DEPS = ["psutil"]


# Handle cleanup
class ExitCleanup:
"""
Make sure we kill child subprocesses when the main process exits receive SIGTERM.
"""

def __init__(self):
self._subprocesses = []
signal.signal(signal.SIGTERM, self.exit)

def addSubprocess(self, process):
logging.debug(f"[ExitCleanup] Register subprocess {process}")
self._subprocesses.append(process)

def exit(self, signum, frame):
for proc in self._subprocesses:
logging.debug(f"[ExitCleanup] Kill subprocess {proc}")
try:
if proc.is_running():
proc.terminate()
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
sys.exit(0)

exitCleanup = ExitCleanup()


class MrNodeType(enum.Enum):
NONE = enum.auto()
BASENODE = enum.auto()
Expand Down Expand Up @@ -90,6 +120,9 @@ class BaseNode(object):
documentation = ""
category = "Other"
plugin = None
# Licenses required to run the plugin
# Only used to select machines on the farm when the node is submitted
_licenses = []

def __init__(self):
super(BaseNode, self).__init__()
Expand Down Expand Up @@ -158,7 +191,7 @@ def processChunk(self, chunk):

def executeChunkCommandLine(self, chunk, cmd, env=None):
try:
with open(chunk.logFile, 'w') as logF:
with open(chunk.getLogFile(), 'w') as logF:
chunk.status.commandLine = cmd
chunk.saveStatusFile()
cmdList = shlex.split(cmd)
Expand All @@ -167,7 +200,7 @@ def executeChunkCommandLine(self, chunk, cmd, env=None):

print(f"Starting Process for '{chunk.node.name}'")
print(f" - commandLine: {cmd}")
print(f" - logFile: {chunk.logFile}")
print(f" - logFile: {chunk.getLogFile()}")
if prog:
cmdList[0] = Path(prog).as_posix()
print(f" - command full path: {cmdList[0]}")
Expand All @@ -192,6 +225,7 @@ def executeChunkCommandLine(self, chunk, cmd, env=None):
env=env,
**platformArgs,
)
exitCleanup.addSubprocess(chunk.subprocess)

if hasattr(chunk, "statThread"):
# We only have a statThread if the node is running in the current process
Expand All @@ -212,7 +246,7 @@ def executeChunkCommandLine(self, chunk, cmd, env=None):
pass

if chunk.subprocess.returncode != 0:
with open(chunk.logFile, "r") as logF:
with open(chunk.getLogFile(), "r") as logF:
logContent = "".join(logF.readlines())
raise RuntimeError(f'Error on node "{chunk.name}":\nLog:\n{logContent}')
finally:
Expand Down
Loading