Skip to content

Commit 6593892

Browse files
committed
wipppp
1 parent 598b8d6 commit 6593892

File tree

5 files changed

+90
-48
lines changed

5 files changed

+90
-48
lines changed

meshroom/core/node.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -837,8 +837,10 @@ def __init__(self, nodeType: str, position: Position = None, parent: BaseObject
837837
self.graph = None
838838
self.dirty: bool = True # whether this node's outputs must be re-evaluated on next Graph update
839839
self._chunks: list[NodeChunk] = ListModel(parent=self)
840-
self._preprocessChunk = NodeChunk(self, desc.Range(ChunkIndex.PREPROCESS)) if self.nodeDesc._hasPreprocess else None
841-
self._postprocessChunk = NodeChunk(self, desc.Range(ChunkIndex.POSTPROCESS)) if self.nodeDesc._hasPostprocess else None
840+
self._preprocessChunk = NodeChunk(self, desc.Range(ChunkIndex.PREPROCESS)) if \
841+
self.nodeDesc and self.nodeDesc._hasPreprocess else None
842+
self._postprocessChunk = NodeChunk(self, desc.Range(ChunkIndex.POSTPROCESS)) if \
843+
self.nodeDesc and self.nodeDesc._hasPostprocess else None
842844
self._chunksCreated = False # Only initialize chunks on compute
843845
self._chunkPlaceholder: list[NodeChunk] = ListModel(parent=self) # Placeholder chunk for nodes with dynamic ones
844846
self._uid: str = uid
@@ -1716,10 +1718,10 @@ def processIteration(self, iteration):
17161718

17171719
def preprocess(self, forceCompute=False, inCurrentEnv=False):
17181720
""" Prepare the node processing """
1719-
self.prepareLogger(ChunkIndex.PREPROCESS)
17201721
if self.nodeDesc._hasPreprocess:
1722+
self.prepareLogger(ChunkIndex.PREPROCESS)
17211723
self._preprocessChunk.process(forceCompute, inCurrentEnv)
1722-
self.restoreLogger()
1724+
self.restoreLogger()
17231725

17241726
def process(self, forceCompute=False, inCurrentEnv=False):
17251727
for chunk in self._chunks:
@@ -1730,10 +1732,10 @@ def postprocess(self, forceCompute=False, inCurrentEnv=False):
17301732
Invoke the post process on Client Node to execute after the processing on the
17311733
node is completed
17321734
"""
1733-
self.prepareLogger(ChunkIndex.POSTPROCESS)
17341735
if self.nodeDesc._hasPostprocess:
1736+
self.prepareLogger(ChunkIndex.POSTPROCESS)
17351737
self._postprocessChunk.process(forceCompute, inCurrentEnv)
1736-
self.restoreLogger()
1738+
self.restoreLogger()
17371739

17381740
def getLogHandlers(self):
17391741
return self._handlers

meshroom/core/submitter.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def __repr__(self):
104104

105105

106106
class OrderedNode:
107-
""" A task = a node"""
107+
""" Intermediate structure used to order tasks """
108108

109109
def __init__(self, node, dependencies=None):
110110
# node can be None for placeholder tasks (task that don't do anything else than regrouping dependencies)
@@ -214,32 +214,32 @@ def createNodeTasks(self, orderedNode: OrderedNode, parentTask: OrderedTask):
214214
""" Create tasks corresponding to a node and link them correctly.
215215
Also link them to the parent task, and recursively create children tasks.
216216
"""
217-
print(f"* (createNodeTasks) node {orderedNode.node._name}, parent {parentTask.node}")
217+
logger.debug(f"* (createNodeTasks) node {orderedNode.node._name}, parent {parentTask.node}")
218218
# Check if task has already been created
219219
visited = (nodeUid:=orderedNode.node._uid) in self._nodeUidToLastTask
220220
if visited:
221-
print(" -> is visited")
221+
logger.debug(" -> is visited")
222222
# If task is already created simply create the connection
223223
lastTask = self._nodeUidToLastTask[nodeUid]
224224
parentTask.addDependency(lastTask)
225225
return
226226
# Create node tasks
227227
if orderedNode.isPlaceholder:
228-
print(" -> is placeholder")
228+
logger.debug(" -> is placeholder")
229229
task = OrderedTask(OrderedTaskType.PLACEHOLDER, orderedNode)
230230
firstTask = lastTask = task
231231
else:
232232
lastTask = firstTask = None
233233
# Create pre/post tasks if needed
234234
if orderedNode.hasPostprocess:
235-
print(" -> postprocess")
235+
logger.debug(" -> postprocess")
236236
lastTask = OrderedTask(OrderedTaskType.POSTPROCESS, orderedNode)
237237
if orderedNode.hasPreprocess:
238-
print(" -> preprocess")
238+
logger.debug(" -> preprocess")
239239
firstTask = OrderedTask(OrderedTaskType.PREPROCESS, orderedNode)
240240
# Process
241241
if orderedNode.isExpanding:
242-
print(" -> is expanding")
242+
logger.debug(" -> is expanding")
243243
expandingTask = OrderedTask(OrderedTaskType.EXPANDING, orderedNode)
244244
if lastTask:
245245
lastTask.addDependency(expandingTask)
@@ -250,7 +250,7 @@ def createNodeTasks(self, orderedNode: OrderedNode, parentTask: OrderedTask):
250250
else:
251251
firstTask = expandingTask
252252
else:
253-
print(" -> has chunks :", orderedNode.chunksIterations)
253+
logger.debug(f" -> has chunks : {orderedNode.chunksIterations}")
254254
# Handle 0 chunks case
255255
if len(orderedNode.chunksIterations) == 0:
256256
if firstTask and lastTask:
@@ -267,19 +267,19 @@ def createNodeTasks(self, orderedNode: OrderedNode, parentTask: OrderedTask):
267267
lastTask = lastTask if lastTask else OrderedTask(OrderedTaskType.PLACEHOLDER, orderedNode)
268268
firstTask = firstTask if firstTask else OrderedTask(OrderedTaskType.PLACEHOLDER, orderedNode)
269269
for iteration in orderedNode.chunksIterations:
270-
print(f" - chunk {iteration}")
270+
logger.debug(f" - chunk {iteration}")
271271
chunkTask = OrderedTask(OrderedTaskType.CHUNK, orderedNode, iteration=iteration)
272272
lastTask.addDependency(chunkTask)
273273
chunkTask.addDependency(firstTask)
274274
# Add parent dependency
275275
parentTask.addDependency(lastTask)
276276
# Create children
277277
for n in orderedNode.dependencies:
278-
print(" -> create deps", n)
278+
logger.debug(f" -> create deps {n}")
279279
self.createNodeTasks(n, firstTask)
280280
# Add the last task to execute for this node to _nodeUidToLastTask
281281
self._nodeUidToLastTask[nodeUid] = lastTask
282-
print(f" -> done {orderedNode.node._name}")
282+
logger.debug(f" -> done {orderedNode.node._name}")
283283

284284
def _orderTasks(self):
285285
""" Use the nodesByLevel info to create all tasks to send to the submitter """
@@ -292,7 +292,7 @@ def _orderTasks(self):
292292
def display(self, task:OrderedTask=None, level=0):
293293
if task is None:
294294
task = self.rootTask
295-
print(f"{' '*4*level}[{level:02d}] {task}")
295+
logger.debug(f"{' '*4*level}[{level:02d}] {task}")
296296
for child in task.dependencies:
297297
self.display(child, level+1)
298298

tests/plugins/meshroom/pluginSubmitter/PluginSubmitter.py

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
LOGGER = logging.getLogger("TestSubmit")
99

1010

11-
class PluginSubmitterA(desc.BaseNode):
11+
class PluginSubmitterA(desc.Node):
1212
"""
1313
Test process no parallelization
1414
"""
@@ -58,27 +58,19 @@ class PluginSubmitterB(PluginSubmitterA):
5858
size = desc.StaticNodeSize(2)
5959
parallelization = desc.Parallelization(blockSize=1)
6060

61+
def postprocess(self, node):
62+
LOGGER.info(f"> PluginSubmitterB postprocess Done")
63+
6164

6265
class PluginSubmitterC(PluginSubmitterA):
6366
"""
6467
Test process with parallelization and dynamic node size
6568
"""
6669
size = desc.DynamicNodeSize("nbChunks")
6770
parallelization = desc.Parallelization(blockSize=1)
68-
69-
70-
class PluginSubmitterAPrePost(PluginSubmitterA):
71+
7172
def preprocess(self, node):
72-
LOGGER.info(f"> Done")
73-
def postprocess(self, node):
74-
LOGGER.info(f"> Done")
75-
73+
LOGGER.info(f"> PluginSubmitterC preprocess Done")
7674

77-
class PluginSubmitterBPrePost(PluginSubmitterB):
7875
def postprocess(self, node):
79-
LOGGER.info(f"> Done")
80-
81-
82-
class PluginSubmitterCPrePost(PluginSubmitterC):
83-
def postprocess(self, node):
84-
LOGGER.info(f"> Done")
76+
LOGGER.info(f"> PluginSubmitterC postprocess Done")

tests/test_compute.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from meshroom.core.graph import Graph, loadGraph
1515
from meshroom.core import desc, pluginManager, loadClassesNodes
16-
from meshroom.core.node import Status
16+
from meshroom.core.node import Status, ChunkIndex
1717
from meshroom.core.plugins import Plugin
1818
from .utils import registerNodeDesc, unregisterNodeDesc
1919

@@ -25,7 +25,8 @@ def executeChunks(node, size):
2525
logFiles = {}
2626
node.preprocess()
2727
for chunkIndex in range(size):
28-
iteration = chunkIndex if size > 1 else -1
28+
iteration = chunkIndex if size >= 0 else ChunkIndex.NONE
29+
node.prepareLogger(iteration)
2930
logFileName = f"{chunkIndex}.log"
3031
logFile = Path(node.internalFolder) / logFileName
3132
logFiles[chunkIndex] = logFile

tests/test_submit.py

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,15 @@
1717
from meshroom.core.node import Node, Status
1818
from meshroom.core.submitter import BaseSubmitter
1919
from meshroom.core.submitter import jobManager
20+
from meshroom.core.submitter import OrderedTask, OrderedTasks, OrderedTaskType
2021
from meshroom.submitters.localFarmSubmitter import LocalFarmSubmitter, LocalFarmJob
2122

2223
from localfarm.localFarmLauncher import FarmLauncher
2324

25+
import logging
26+
from meshroom.core.submitter import logger
27+
logger.setLevel(logging.DEBUG)
28+
2429

2530
IS_LINUX = (platform == "linux" or platform == "linux2")
2631

@@ -133,26 +138,68 @@ def registerNode(self, name):
133138
registerNodeDesc(nodeType)
134139
return nodeType.__name__
135140

136-
def addNewNode(self, graph, name, nodeParams):
141+
def addNewNode(self, graph, name, nodeParams=None):
137142
nodeTypeName = self.registerNode(name)
138-
if nodeParams:
139-
node = graph.addNewNode(nodeTypeName, **nodeParams)
140-
else:
141-
node = graph.addNewNode(nodeTypeName)
143+
nodeParams = nodeParams or {}
144+
node = graph.addNewNode(nodeTypeName, **nodeParams)
142145
return node
143146

144-
def test_buildTaskGraph(self):
147+
def test_orderTasks(self):
148+
"""
149+
phd=placeholder
150+
chk=chunk
151+
*" [B chk_0] "*
152+
[phd (start_A)] - [A chks] - [phd (end_A)] - [phd (start_B)] [B post] - [C pre] - [C expand] - [C post] - [phd (root)]
153+
*_ [B chk_1] _*
154+
"""
145155
graph = Graph("")
146156
# Add nodes
147-
nodeA = self.addNewNode(graph, "PluginSubmitter"+"A"+"PrePost", nodeParams={})
148-
nodeB = self.addNewNode(graph, "PluginSubmitter"+"B"+"PrePost", nodeParams={"inputs": [nodeA.output]})
149-
nodeC = self.addNewNode(graph, "PluginSubmitter"+"C"+"PrePost", nodeParams={"inputs": [nodeB.output]})
150-
# Submit
157+
nodeA = self.addNewNode(graph, "PluginSubmitter"+"A", nodeParams={})
158+
nodeB = self.addNewNode(graph, "PluginSubmitter"+"B", nodeParams={"inputs": [nodeA.output]})
159+
nodeC = self.addNewNode(graph, "PluginSubmitter"+"C", nodeParams={"inputs": [nodeB.output]})
160+
# Order tasks
151161
submitter = get_submitter()
152162
nodes, edges = graph.dfsOnFinish(startNodes=[nodeC])
153-
print(nodes, edges)
154-
res = submitter.submit(nodes, edges, "")
155-
print("res", res)
163+
orderedTasks = OrderedTasks(nodes, edges)
164+
# === Test result ===
165+
def checkTask(task, taskType, nbDependencies):
166+
assert task.taskType == taskType
167+
assert len(task.dependencies) == nbDependencies
168+
# root
169+
rootTask = orderedTasks.rootTask
170+
checkTask(rootTask, OrderedTaskType.PLACEHOLDER, 1)
171+
# C (post)
172+
task: OrderedTask = rootTask.dependencies[0]
173+
checkTask(task, OrderedTaskType.POSTPROCESS, 1)
174+
# C (expand)
175+
task: OrderedTask = task.dependencies[0]
176+
checkTask(task, OrderedTaskType.EXPANDING, 1)
177+
# C (pre)
178+
task: OrderedTask = task.dependencies[0]
179+
checkTask(task, OrderedTaskType.PREPROCESS, 1)
180+
# B (post)
181+
task: OrderedTask = task.dependencies[0]
182+
checkTask(task, OrderedTaskType.POSTPROCESS, 2)
183+
# B (chunks)
184+
task_0: OrderedTask = task.dependencies[0]
185+
task_1: OrderedTask = task.dependencies[1]
186+
checkTask(task_0, OrderedTaskType.CHUNK, 1)
187+
checkTask(task_1, OrderedTaskType.CHUNK, 1)
188+
assert (task_0.iteration, task_1.iteration) == (0, 1)
189+
assert task_0.dependencies[0] == task_1.dependencies[0]
190+
# B (pre)
191+
task: OrderedTask = task_0.dependencies[0]
192+
checkTask(task, OrderedTaskType.PLACEHOLDER, 1)
193+
# A (post)
194+
task: OrderedTask = task.dependencies[0]
195+
checkTask(task, OrderedTaskType.PLACEHOLDER, 1)
196+
# A (chunks)
197+
task: OrderedTask = task.dependencies[0]
198+
checkTask(task, OrderedTaskType.CHUNK, 1)
199+
assert task.iteration == -1
200+
# A (pre)
201+
task: OrderedTask = task.dependencies[0]
202+
checkTask(task, OrderedTaskType.PLACEHOLDER, 0)
156203

157204
def test_submitNoParallel(self, tmp_path):
158205
graph = Graph("")

0 commit comments

Comments
 (0)