Skip to content
Merged
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
.DS_Store
# Windows
Thumbs.db
# vscode
.vscode

# python
*.pyc
Expand All @@ -19,6 +21,7 @@ __pycache__

# backup files
*.json
!*Config.json

# datas or personal files
/data
Expand Down
51 changes: 37 additions & 14 deletions meshroom/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,19 @@ def loadClasses(folder: str, packageName: str, classType: type) -> list[type]:
else:
classes.append(p)
except Exception as e:
tb = traceback.extract_tb(e.__traceback__)
last_call = tb[-1]
errors.append(f' * {pluginName} ({type(e).__name__}): {e}\n'
# filename:lineNumber functionName
f'{last_call.filename}:{last_call.lineno} {last_call.name}\n'
# line of code with the error
f'{last_call.line}'
# Full traceback
f'\n{traceback.format_exc()}\n\n'
)
if classType == BaseSubmitter:
logging.warning(f" Could not load submitter {pluginName} from package '{package.__name__}'")
else:
tb = traceback.extract_tb(e.__traceback__)
last_call = tb[-1]
errors.append(f' * {pluginName} ({type(e).__name__}): {e}\n'
# filename:lineNumber functionName
f'{last_call.filename}:{last_call.lineno} {last_call.name}\n'
# line of code with the error
f'{last_call.line}'
# Full traceback
f'\n{traceback.format_exc()}\n\n'
)

if errors:
logging.warning(' The following "{package}" plugins could not be loaded:\n'
Expand Down Expand Up @@ -381,14 +384,24 @@ def registerSubmitter(s: BaseSubmitter):
submitters[s.name] = s


def loadSubmitters(folder, packageName):
def loadSubmitters(folder, packageName) -> list[BaseSubmitter]:
if not os.path.isdir(folder):
logging.error(f"Submitters folder '{folder}' does not exist.")
return

return loadClassesSubmitters(folder, packageName)


def loadAllSubmitters(folder) -> list[BaseSubmitter]:
submitters = []
for _, package, ispkg in pkgutil.iter_modules([folder]):
if ispkg:
subs = loadSubmitters(folder, package)
if subs:
submitters.extend(subs)
return submitters


def loadPipelineTemplates(folder: str):
if not os.path.isdir(folder):
logging.error(f"Pipeline templates folder '{folder}' does not exist.")
Expand All @@ -409,10 +422,20 @@ def initNodes():


def initSubmitters():
""" Detect and register submitter plugins
Note: Make sure the package name (folder inside the additionalPaths folders)
are unique : so we cannot name them "submitters" because it's already taken
by the submitters package inside meshroom
"""
# Load meshroom default submitters
# Use directly loadSubmitters because we don't want any folder except submitters to be registered
subs = loadSubmitters(meshroomFolder, "submitters")
for sub in subs:
registerSubmitter(sub())
# Load additional submitters
additionalPaths = EnvVar.getList(EnvVar.MESHROOM_SUBMITTERS_PATH)
allSubmittersFolders = [meshroomFolder] + additionalPaths
for folder in allSubmittersFolders:
subs = loadSubmitters(folder, "submitters")
for folder in additionalPaths:
subs = loadAllSubmitters(folder)
for sub in subs:
registerSubmitter(sub())

Expand Down
2 changes: 1 addition & 1 deletion meshroom/core/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1711,7 +1711,7 @@ def submitGraph(graph, submitter, toNodes=None, submitLabel="{projectName}"):
res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath, submitLabel=submitLabel)
if res:
for node in nodesToProcess:
node.submit() # update node status
node.initStatusOnSubmit() # update node status
except Exception as e:
logging.error(f"Error on submit : {e}")

Expand Down
3 changes: 2 additions & 1 deletion meshroom/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,8 @@ def updateStatusFromCache(self):
# logging.warning(f"updateStatusFromCache: {self.name}, status: {s} => {self.globalStatus}")
self.updateOutputAttr()

def submit(self, forceCompute=False):
def initStatusOnSubmit(self, forceCompute=False):
""" Prepare chunks status when the node is in a graph that was submitted """
for chunk in self._chunks:
if forceCompute or chunk.status.status != Status.SUCCESS:
chunk._status.setNode(self)
Expand Down
4 changes: 2 additions & 2 deletions meshroom/core/taskManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,64 +33,64 @@
def isRunning(self):
return self._state == State.RUNNING

def run(self):
""" Consume compute tasks. """
self._state = State.RUNNING

stopAndRestart = False

for nId, node in enumerate(self._manager._nodesToProcess):

# skip already finished/running nodes
if node.isFinishedOrRunning():
continue

# if a node does not exist anymore, node.chunks becomes a PySide property
try:
multiChunks = len(node.chunks) > 1
except TypeError:
continue

node.preprocess()
for cId, chunk in enumerate(node.chunks):
if chunk.isFinishedOrRunning() or not self.isRunning():
continue

if multiChunks:
logging.info('[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format(
node=nId+1, nbNodes=len(self._manager._nodesToProcess),
chunk=cId+1, nbChunks=len(node.chunks), nodeName=node.nodeType))
else:
logging.info('[{node}/{nbNodes}] {nodeName}'.format(
node=nId+1, nbNodes=len(self._manager._nodesToProcess), nodeName=node.nodeType))
try:
chunk.process(self.forceCompute)
except Exception as e:
if chunk.isStopped():
stopAndRestart = True
break
else:
logging.error(f"Error on node computation: {e}.")
nodesToRemove, _ = self._manager._graph.dfsOnDiscover(startNodes=[node], reverse=True)
# remove following nodes from the task queue
for n in nodesToRemove[1:]: # exclude current node
try:
self._manager._nodesToProcess.remove(n)
except ValueError:
# Node already removed (for instance a global clear of _nodesToProcess)
pass
n.clearSubmittedChunks()
node.postprocess()

if stopAndRestart:
break

if stopAndRestart:
self._state = State.STOPPED
self._manager.restartRequested.emit()
else:
self._manager._nodesToProcess = []
self._state = State.DEAD

Check notice on line 93 in meshroom/core/taskManager.py

View check run for this annotation

codefactor.io / CodeFactor

meshroom/core/taskManager.py#L36-L93

Complex Method


class TaskManager(BaseObject):
Expand Down Expand Up @@ -388,7 +388,7 @@
sub = None
if submitter:
sub = meshroom.core.submitters.get(submitter, None)
elif len(meshroom.core.submitters) == 1:
elif len(meshroom.core.submitters) >= 1:
# if only one submitter available use it
allSubmitters = meshroom.core.submitters.values()
sub = next(iter(allSubmitters)) # retrieve the first element
Expand Down Expand Up @@ -434,7 +434,7 @@
if res:
for node in nodesToProcess:
node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name))
node.submit() # update node status
node.initStatusOnSubmit() # update node status
self._nodes.update(nodesToProcess)
self._nodesExtern.extend(nodesToProcess)

Expand Down
22 changes: 9 additions & 13 deletions meshroom/submitters/simpleFarmSubmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

class SimpleFarmSubmitter(BaseSubmitter):

filepath = os.environ.get('SIMPLEFARMCONFIG', os.path.join(currentDir, 'simpleFarmConfig.json'))
filepath = os.environ.get('SIMPLEFARMCONFIG', os.path.join(currentDir, 'tractorConfig.json'))
config = json.load(open(filepath))

reqPackages = []
Expand Down Expand Up @@ -82,20 +82,16 @@ def createTask(self, meshroomFile, node):

tags['nbFrames'] = nbFrames
tags['prod'] = self.prod
allRequirements = list()
allRequirements.extend(self.config['CPU'].get(node.nodeDesc.cpu.name, []))
allRequirements.extend(self.config['RAM'].get(node.nodeDesc.ram.name, []))
allRequirements.extend(self.config['GPU'].get(node.nodeDesc.gpu.name, []))
allRequirements = set()
allRequirements.update(self.config['CPU'].get(node.nodeDesc.cpu.name, []))
allRequirements.update(self.config['RAM'].get(node.nodeDesc.ram.name, []))
allRequirements.update(self.config['GPU'].get(node.nodeDesc.gpu.name, []))

executable = 'meshroom_compute' if self.reqPackages else os.path.join(binDir, 'meshroom_compute')
taskCommand = f"{executable} --node {node.name} \"{meshroomFile}\" {parallelArgs} --extern"
task = simpleFarm.Task(
name=node.name,
command='{exe} --node {nodeName} "{meshroomFile}" {parallelArgs} --extern'.format(
exe='meshroom_compute' if self.reqPackages else os.path.join(binDir, 'meshroom_compute'),
nodeName=node.name, meshroomFile=meshroomFile, parallelArgs=parallelArgs),
tags=tags,
rezPackages=self.reqPackages,
requirements={'service': str(','.join(allRequirements))},
**arguments)
name=node.name, command=taskCommand, tags=tags, rezPackages=self.reqPackages,
requirements={'service': str(','.join(allRequirements))}, **arguments)
return task

def submit(self, nodes, edges, filepath, submitLabel="{projectName}"):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"BASE": ["mikrosRender", "!ld7"],
"BASE": ["mikrosRender"],
"CPU": {
"NONE": [],
"NORMAL": ["mikrosRender"],
Expand All @@ -16,3 +16,4 @@
"INTENSIVE": ["cuda16G"]
}
}

Loading
Loading