Skip to content

Commit 0ee1757

Browse files
committed
New notion of local isolated computation for python nodes using meshroom_compute
Reoganization - BaseNode: is the base class for all nodes - Node: is now dedicated to python nodes, with the implentation directly in the process function - CommandLineNode: dedicated to generate and run external command lines
1 parent 455c53e commit 0ee1757

6 files changed

Lines changed: 285 additions & 152 deletions

File tree

bin/meshroom_compute

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ meshroom.setupEnvironment()
1616

1717
import meshroom.core
1818
import meshroom.core.graph
19-
from meshroom.core.node import Status
19+
from meshroom.core.node import Status, ExecMode
2020

2121

2222
parser = argparse.ArgumentParser(description='Execute a Graph of processes.')
@@ -26,6 +26,8 @@ parser.add_argument('--node', metavar='NODE_NAME', type=str,
2626
help='Process the node. It will generate an error if the dependencies are not already computed.')
2727
parser.add_argument('--toNode', metavar='NODE_NAME', type=str,
2828
help='Process the node with its dependencies.')
29+
parser.add_argument('--inCurrentEnv', help='Execute process in current env without creating a dedicated runtime environment.',
30+
action='store_true')
2931
parser.add_argument('--forceStatus', help='Force computation if status is RUNNING or SUBMITTED.',
3032
action='store_true')
3133
parser.add_argument('--forceCompute', help='Compute in all cases even if already computed.',
@@ -81,7 +83,11 @@ if args.node:
8183
chunks = node.chunks
8284
for chunk in chunks:
8385
if chunk.status.status in submittedStatuses:
84-
print('Warning: Node is already submitted with status "{}". See file: "{}"'.format(chunk.status.status.name, chunk.statusFile))
86+
# Particular case for the LOCAL_ISOLATED, the node status is set to RUNNING by the submitter directly.
87+
# We ensure that no other instance has started to compute, by checking that the sessionUid is empty.
88+
if chunk.status.execMode == ExecMode.LOCAL_ISOLATED and not chunk.status.sessionUid and chunk.status.submitterSessionUid:
89+
continue
90+
print(f'Warning: Node is already submitted with status "{chunk.status.status.name}". See file: "{chunk.statusFile}". ExecMode: {chunk.status.execMode.name}, SessionUid: {chunk.status.sessionUid}, submitterSessionUid: {chunk.status.submitterSessionUid}')
8591
# sys.exit(-1)
8692

8793
if args.extern:
@@ -91,9 +97,9 @@ if args.node:
9197
node.preprocess()
9298
if args.iteration != -1:
9399
chunk = node.chunks[args.iteration]
94-
chunk.process(args.forceCompute)
100+
chunk.process(args.forceCompute, args.inCurrentEnv)
95101
else:
96-
node.process(args.forceCompute)
102+
node.process(args.forceCompute, args.inCurrentEnv)
97103
node.postprocess()
98104
else:
99105
if args.iteration != -1:

meshroom/core/desc/__init__.py

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,36 +21,9 @@
2121
)
2222
from .node import (
2323
AVCommandLineNode,
24+
BaseNode,
2425
CommandLineNode,
2526
InitNode,
2627
InputNode,
2728
Node,
2829
)
29-
30-
__all__ = [
31-
# attribute
32-
"Attribute",
33-
"BoolParam",
34-
"ChoiceParam",
35-
"ColorParam",
36-
"File",
37-
"FloatParam",
38-
"GroupAttribute",
39-
"IntParam",
40-
"ListAttribute",
41-
"PushButtonParam",
42-
"StringParam",
43-
# computation
44-
"DynamicNodeSize",
45-
"Level",
46-
"MultiDynamicNodeSize",
47-
"Parallelization",
48-
"Range",
49-
"StaticNodeSize",
50-
# node
51-
"AVCommandLineNode",
52-
"CommandLineNode",
53-
"InitNode",
54-
"InputNode",
55-
"Node",
56-
]

meshroom/core/desc/node.py

Lines changed: 141 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,34 @@
11
from inspect import getfile
22
from pathlib import Path
3+
import logging
34
import os
45
import psutil
56
import shlex
7+
import shutil
8+
import sys
69

710
from .computation import Level, StaticNodeSize
811
from .attribute import StringParam, ColorParam
12+
from .process import ProcessEnvironment, _MESHROOM_COMPUTE
13+
from .config import getProcessEnvironment
914

1015
from meshroom.core import cgroup
1116

1217

13-
class Node(object):
18+
def isNodeSaved(node):
19+
"""Returns whether a node is identical to its serialized counterpart in the current graph file."""
20+
filepath = node.graph.filepath
21+
if not filepath:
22+
return False
23+
24+
from meshroom.core.graph import loadGraph
25+
graphSaved = loadGraph(filepath)
26+
nodeSaved = graphSaved.node(node.name)
27+
if nodeSaved is None:
28+
return False
29+
return nodeSaved._uid == node._uid
30+
31+
class BaseNode(object):
1432
"""
1533
"""
1634
cpu = Level.NORMAL
@@ -62,7 +80,7 @@ class Node(object):
6280
category = 'Other'
6381

6482
def __init__(self):
65-
super(Node, self).__init__()
83+
super(BaseNode, self).__init__()
6684
self.hasDynamicOutputAttribute = any(output.isDynamicValue for output in self.outputs)
6785
self.sourceCodeFolder = Path(getfile(self.__class__)).parent.resolve().as_posix()
6886

@@ -113,13 +131,102 @@ def postprocess(self, node):
113131
pass
114132

115133
def stopProcess(self, chunk):
116-
raise NotImplementedError('No stopProcess implementation on node: {}'.format(chunk.node.name))
134+
logging.warning(f'No stopProcess implementation on node: {chunk.node.name}')
117135

118136
def processChunk(self, chunk):
119137
raise NotImplementedError(f'No processChunk implementation on node: "{chunk.node.name}"')
120138

139+
def executeChunkCommandLine(self, chunk, cmd, env=None):
140+
try:
141+
with open(chunk.logFile, 'w') as logF:
142+
chunk.status.commandLine = cmd
143+
chunk.saveStatusFile()
144+
cmdList = shlex.split(cmd)
145+
# Resolve executable to full path
146+
prog = shutil.which(cmdList[0], path=env.get('PATH') if env else None)
147+
148+
print(f"Starting Process for '{chunk.node.name}'")
149+
print(f' - commandLine: {cmd}')
150+
print(f' - logFile: {chunk.logFile}')
151+
if prog:
152+
cmdList[0] = prog
153+
print(f' - command full path: {prog}')
154+
155+
# Change the process group to avoid Meshroom main process being killed if the subprocess
156+
# gets terminated by the user or an Out Of Memory (OOM kill).
157+
if sys.platform == "win32":
158+
platformArgs = {"creationflags": psutil.CREATE_NEW_PROCESS_GROUP}
159+
# Note: DETACHED_PROCESS means fully detached process.
160+
# We don't want a fully detached process to ensure that if Meshroom is killed,
161+
# the subprocesses are killed too.
162+
else:
163+
platformArgs = {"start_new_session": True}
164+
# Note: "preexec_fn"=os.setsid is the old way before python-3.2
165+
166+
chunk.subprocess = psutil.Popen(
167+
cmdList,
168+
stdout=logF,
169+
stderr=logF,
170+
cwd=chunk.node.internalFolder,
171+
env=env,
172+
**platformArgs,
173+
)
174+
175+
if hasattr(chunk, "statThread"):
176+
# We only have a statThread if the node is running in the current process
177+
# and not in a dedicated environment/process.
178+
chunk.statThread.proc = chunk.subprocess
179+
180+
stdout, stderr = chunk.subprocess.communicate()
121181

122-
class InputNode(Node):
182+
chunk.status.returnCode = chunk.subprocess.returncode
183+
184+
if chunk.subprocess.returncode and chunk.subprocess.returncode < 0:
185+
signal_num = -chunk.subprocess.returncode
186+
logF.write(f"Process was killed by signal: {signal_num}")
187+
try:
188+
status = chunk.subprocess.status()
189+
logF.write(f"Process status: {status}")
190+
except:
191+
pass
192+
193+
if chunk.subprocess.returncode != 0:
194+
with open(chunk.logFile, 'r') as logF:
195+
logContent = ''.join(logF.readlines())
196+
raise RuntimeError('Error on node "{}":\nLog:\n{}'.format(chunk.name, logContent))
197+
finally:
198+
chunk.subprocess = None
199+
200+
def stopProcess(self, chunk):
201+
# The same node could exists several times in the graph and
202+
# only one would have the running subprocess; ignore all others
203+
if not chunk.subprocess:
204+
print(f"[{chunk.node.name}] stopProcess: no subprocess")
205+
return
206+
207+
# Retrieve process tree
208+
processes = chunk.subprocess.children(recursive=True) + [chunk.subprocess]
209+
logging.debug(f"[{chunk.node.name}] Processes to stop: {len(processes)}")
210+
for process in processes:
211+
try:
212+
# With terminate, the process has a chance to handle cleanup
213+
process.terminate()
214+
except psutil.NoSuchProcess:
215+
pass
216+
217+
# If it is still running, force kill it
218+
for process in processes:
219+
try:
220+
# Use is_running() instead of poll() as we use a psutil.Process object
221+
if process.is_running(): # Check if process is still alive
222+
process.kill() # Forcefully kill it
223+
except psutil.NoSuchProcess:
224+
logging.info(f"[{chunk.node.name}] Process already terminated.")
225+
except psutil.AccessDenied:
226+
logging.info(f"[{chunk.node.name}] Permission denied to kill the process.")
227+
228+
229+
class InputNode(BaseNode):
123230
"""
124231
Node that does not need to be processed, it is just a placeholder for inputs.
125232
"""
@@ -130,7 +237,24 @@ def processChunk(self, chunk):
130237
pass
131238

132239

133-
class CommandLineNode(Node):
240+
class Node(BaseNode):
241+
242+
def __init__(self):
243+
super(Node, self).__init__()
244+
245+
def processChunkInEnvironment(self, chunk):
246+
if not isNodeSaved(chunk.node):
247+
raise RuntimeError("File must be saved before computing in isolated environment.")
248+
249+
meshroomComputeCmd = f"python {_MESHROOM_COMPUTE} {chunk.node.graph.filepath} --node {chunk.node.name} --extern --inCurrentEnv"
250+
if len(chunk.node.getChunks()) > 1:
251+
meshroomComputeCmd += f" --iteration {chunk.range.iteration}"
252+
253+
runtimeEnv = None
254+
self.executeChunkCommandLine(chunk, meshroomComputeCmd, env=runtimeEnv)
255+
256+
257+
class CommandLineNode(BaseNode):
134258
"""
135259
"""
136260
commandLine = '' # need to be defined on the node
@@ -143,63 +267,25 @@ def __init__(self):
143267
def buildCommandLine(self, chunk):
144268

145269
cmdPrefix = ''
146-
# If rez available in env, we use it
147-
if "REZ_ENV" in os.environ and chunk.node.packageVersion:
148-
# If the node package is already in the environment, we don't need a new dedicated rez environment
149-
alreadyInEnv = os.environ.get("REZ_{}_VERSION".format(chunk.node.packageName.upper()),
150-
"").startswith(chunk.node.packageVersion)
151-
if not alreadyInEnv:
152-
cmdPrefix = '{rez} {packageFullName} -- '.format(rez=os.environ.get("REZ_ENV"),
153-
packageFullName=chunk.node.packageFullName)
270+
# # If rez available in env, we use it
271+
# if "REZ_ENV" in os.environ and chunk.node.packageVersion:
272+
# # If the node package is already in the environment, we don't need a new dedicated rez environment
273+
# alreadyInEnv = os.environ.get("REZ_{}_VERSION".format(chunk.node.packageName.upper()),
274+
# "").startswith(chunk.node.packageVersion)
275+
# if not alreadyInEnv:
276+
# cmdPrefix = '{rez} {packageFullName} -- '.format(rez=os.environ.get("REZ_ENV"),
277+
# packageFullName=chunk.node.packageFullName)
154278

155279
cmdSuffix = ''
156280
if chunk.node.isParallelized and chunk.node.size > 1:
157281
cmdSuffix = ' ' + self.commandLineRange.format(**chunk.range.toDict())
158282

159283
return cmdPrefix + chunk.node.nodeDesc.commandLine.format(**chunk.node._cmdVars) + cmdSuffix
160284

161-
def stopProcess(self, chunk):
162-
# The same node could exists several times in the graph and
163-
# only one would have the running subprocess; ignore all others
164-
if not hasattr(chunk, "subprocess"):
165-
return
166-
if chunk.subprocess:
167-
# Kill process tree
168-
processes = chunk.subprocess.children(recursive=True) + [chunk.subprocess]
169-
try:
170-
for process in processes:
171-
process.terminate()
172-
except psutil.NoSuchProcess:
173-
pass
174-
175285
def processChunk(self, chunk):
176-
try:
177-
with open(chunk.logFile, 'w') as logF:
178-
cmd = self.buildCommandLine(chunk)
179-
chunk.status.commandLine = cmd
180-
chunk.saveStatusFile()
181-
print(' - commandLine: {}'.format(cmd))
182-
print(' - logFile: {}'.format(chunk.logFile))
183-
chunk.subprocess = psutil.Popen(shlex.split(cmd), stdout=logF, stderr=logF, cwd=chunk.node.internalFolder)
184-
185-
# Store process static info into the status file
186-
# chunk.status.env = node.proc.environ()
187-
# chunk.status.createTime = node.proc.create_time()
188-
189-
chunk.statThread.proc = chunk.subprocess
190-
stdout, stderr = chunk.subprocess.communicate()
191-
chunk.subprocess.wait()
192-
193-
chunk.status.returnCode = chunk.subprocess.returncode
194-
195-
if chunk.subprocess.returncode != 0:
196-
with open(chunk.logFile, 'r') as logF:
197-
logContent = ''.join(logF.readlines())
198-
raise RuntimeError('Error on node "{}":\nLog:\n{}'.format(chunk.name, logContent))
199-
except Exception:
200-
raise
201-
finally:
202-
chunk.subprocess = None
286+
cmd = self.buildCommandLine(chunk)
287+
# TODO: Setup runtime env
288+
self.executeChunkCommandLine(chunk, cmd)
203289

204290

205291
# Specific command line node for AliceVision apps
@@ -282,3 +368,4 @@ def setAttributes(self, node, attributesDict):
282368
for attr in attributesDict:
283369
if node.hasAttribute(attr):
284370
node.attribute(attr).value = attributesDict[attr]
371+

0 commit comments

Comments
 (0)