Skip to content

Commit bcbdb65

Browse files
committed
[core] First implementation to kill submitted tasks
1 parent d0c2997 commit bcbdb65

File tree

8 files changed

+108
-22
lines changed

8 files changed

+108
-22
lines changed

meshroom/core/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def loadClasses(folder: str, packageName: str, classType: type) -> list[type]:
121121
classes.append(p)
122122
except Exception as e:
123123
if classType == BaseSubmitter:
124-
logging.warning(f" Could not load submitter {pluginName} from package '{package.__name__}'")
124+
logging.warning(f" Could not load submitter {pluginName} from package '{package.__name__}'\n{e}")
125125
else:
126126
tb = traceback.extract_tb(e.__traceback__)
127127
last_call = tb[-1]

meshroom/core/desc/node.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import shlex
88
import shutil
99
import sys
10+
import signal
11+
import subprocess
1012

1113
from .computation import Level, StaticNodeSize
1214
from .attribute import StringParam, ColorParam, ChoiceParam
@@ -20,6 +22,29 @@
2022
_MESHROOM_COMPUTE_DEPS = ["psutil"]
2123

2224

25+
# Handle cleanup
26+
class ExitCleanup:
27+
def __init__(self):
28+
self._subprocesses = []
29+
signal.signal(signal.SIGTERM, self.exit)
30+
31+
def addSubprocess(self, process):
32+
self._subprocesses.append(process)
33+
34+
def exit(self, signum, frame):
35+
for proc in self._subprocesses:
36+
print(f"[ExitCleanup] (exit) kill subprocess {proc}")
37+
try:
38+
if proc.is_running():
39+
proc.terminate()
40+
proc.wait(timeout=5)
41+
except subprocess.TimeoutExpired:
42+
proc.kill()
43+
raise RuntimeError("Process has been killed")
44+
45+
exitCleanup = ExitCleanup()
46+
47+
2348
class MrNodeType(enum.Enum):
2449
NONE = enum.auto()
2550
BASENODE = enum.auto()
@@ -196,6 +221,7 @@ def executeChunkCommandLine(self, chunk, cmd, env=None):
196221
env=env,
197222
**platformArgs,
198223
)
224+
exitCleanup.addSubprocess(chunk.subprocess)
199225

200226
if hasattr(chunk, "statThread"):
201227
# We only have a statThread if the node is running in the current process
@@ -281,6 +307,8 @@ def processChunkInEnvironment(self, chunk):
281307

282308
if len(chunk.node.getChunks()) > 1:
283309
meshroomComputeCmd += f" --iteration {chunk.range.iteration}"
310+
311+
print(f"(processChunkInEnvironment) meshroomComputeCmd={meshroomComputeCmd}")
284312

285313
runtimeEnv = chunk.node.nodeDesc.plugin.runtimeEnv
286314
cmdPrefix = chunk.node.nodeDesc.plugin.commandPrefix

meshroom/core/node.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -720,9 +720,6 @@ def _processInIsolatedEnvironment(self):
720720
self.node.updateOutputAttr()
721721

722722
def stopProcess(self):
723-
if self.isExtern():
724-
raise ValueError("Cannot stop process: node is computed externally (another instance of Meshroom)")
725-
726723
# Ensure that we are up-to-date
727724
self.updateStatusFromCache()
728725

meshroom/core/submitter.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ def __init__(self, jobId, submitter):
7575
def __repr__(self):
7676
return f"<{self.__class__.__name__} {self.jid}>"
7777

78+
def stopChunkTask(self, iteration):
79+
if self.submitterOptions.includes(SubmitterOptionsEnum.INTERRUPT_JOB):
80+
raise NotImplementedError("'stopChunkTask' method must be implemented in subclasses")
81+
else:
82+
raise RuntimeError(f"Submitter {self.__class__.__name__} cannot interrupt the job")
83+
7884
def interruptJob(self):
7985
if self.submitterOptions.includes(SubmitterOptionsEnum.INTERRUPT_JOB):
8086
raise NotImplementedError("'interruptJob' method must be implemented in subclasses")
@@ -137,6 +143,11 @@ def retreiveJob(self, submitter, jid) -> Optional[BaseSubmittedJob]:
137143
job = submitter.retrieveJob(jid)
138144
return job
139145

146+
def stopChunkTask(self, chunk):
147+
print(f"[JobManager] (stopChunkTask) {chunk}")
148+
job = self.getNodeJob(chunk.node)
149+
job.stopChunkTask(chunk.range.iteration)
150+
140151

141152
# Global instance that manages submitted jobs
142153
jobManager = JobManager()

meshroom/core/taskManager.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ def run(self):
9898
for cId, chunk in enumerate(node.chunks):
9999
if chunk.isFinishedOrRunning() or not self.isRunning():
100100
continue
101+
102+
if self._manager.isChunkCancelled(chunk):
103+
print(f"[TaskThread] Skip cancelled chunk {chunk}")
104+
continue
101105

102106
_nodeName, _node, _nbNodes = node.nodeType, nId+1, len(self._manager._nodesToProcess)
103107

@@ -148,13 +152,18 @@ def __init__(self, parent: BaseObject = None):
148152
self._graph = None
149153
self._nodes = DictModel(keyAttrName='_name', parent=self)
150154
self._nodesToProcess = []
155+
self._cancelledChunks = []
151156
self._nodesExtern = []
152157
# internal thread in which local tasks are executed
153158
self._thread = TaskThread(self)
154159

155160
self._blockRestart = False
156161
self.restartRequested.connect(self.restart)
157162

163+
def join(self):
164+
self._thread.wait()
165+
self._cancelledChunks = []
166+
158167
@Slot(BaseObject)
159168
def createChunks(self, node: Node):
160169
""" Create chunks on main process """
@@ -168,6 +177,13 @@ def createChunks(self, node: Node):
168177
logging.error(f"Failed to create chunks for {node.name}: {e}")
169178
self.chunksCreated.emit(node) # Still emit to unblock waiting thread
170179

180+
def isChunkCancelled(self, chunk):
181+
for i, ch in enumerate(self._cancelledChunks):
182+
if ch == chunk:
183+
del self._cancelledChunks[i]
184+
return True
185+
return False
186+
171187
def requestBlockRestart(self):
172188
"""
173189
Block computing.
@@ -188,6 +204,7 @@ def blockRestart(self):
188204

189205
self._blockRestart = False
190206
self._nodesToProcess = []
207+
self._cancelledChunks = []
191208
self._thread._state = State.DEAD
192209

193210
@Slot()
@@ -198,7 +215,7 @@ def restart(self):
198215
"""
199216
# Make sure to wait the end of the current thread
200217
if self._thread.isRunning():
201-
self._thread.wait()
218+
self.join()
202219

203220
# Avoid restart if thread was globally stopped
204221
if self._blockRestart:

meshroom/ui/graph.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from meshroom.core.graphIO import GraphIO
2929

3030
from meshroom.core.taskManager import TaskManager
31+
from meshroom.core.submitter import jobManager
3132

3233
from meshroom.core.node import NodeChunk, Node, Status, ExecMode, CompatibilityNode, Position
3334
from meshroom.core import submitters, MrNodeType
@@ -377,6 +378,7 @@ def __init__(self, undoStack: commands.UndoStack, taskManager: TaskManager, pare
377378
self._sortedDFSChunks: QObjectListModel = QObjectListModel(parent=self)
378379
self._layout: GraphLayout = GraphLayout(self)
379380
self._selectedNode = None
381+
self._selectedChunk = None
380382
self._nodeSelection: QItemSelectionModel = QItemSelectionModel(self._graph.nodes, parent=self)
381383
self._hoveredNode = None
382384

@@ -561,7 +563,7 @@ def stopExecution(self):
561563
return
562564
self._taskManager.requestBlockRestart()
563565
self._graph.stopExecution()
564-
self._taskManager._thread.wait()
566+
self._taskManager.join()
565567

566568
@Slot(Node)
567569
def stopNodeComputation(self, node):
@@ -572,7 +574,7 @@ def stopNodeComputation(self, node):
572574

573575
# Stop the node and wait Task Manager
574576
node.stopComputation()
575-
self._taskManager._thread.wait()
577+
self._taskManager.join()
576578

577579
@Slot(Node)
578580
def cancelNodeComputation(self, node):
@@ -608,13 +610,12 @@ def stopTask(self, chunk: NodeChunk):
608610
if self.isChunkComputingLocally(chunk):
609611
print(f"-> is local")
610612
chunk.stopProcess()
611-
chunk.upgradeStatusTo(Status.STOPPED)
612-
# TODO : remove the chunk from the thread process
613-
self._taskManager._thread.wait()
613+
# Remove the chunk from the thread process
614+
self._taskManager._cancelledChunks.append(chunk)
614615
elif self.isChunkComputingExternally(chunk):
615-
print("[UIGraph] (stopTask) Stop task is not implemented for ")
616-
# TODO
617-
pass
616+
jobManager.stopChunkTask(chunk)
617+
chunk.upgradeStatusTo(Status.STOPPED)
618+
chunk.stopProcess()
618619

619620
@Slot(NodeChunk)
620621
def pauseTask(self, chunk: NodeChunk):
@@ -1319,6 +1320,9 @@ def canSubmitNode(self, node: Node) -> bool:
13191320
selectedNodeChanged = Signal()
13201321
# Current main selected node
13211322
selectedNode = makeProperty(QObject, "_selectedNode", selectedNodeChanged, resetOnDestroy=True)
1323+
# Current chunk selected (used to send signals from TaskManager to ChunksListView)
1324+
selectedChunkChanged = Signal()
1325+
selectedChunk = makeProperty(QObject, "_selectedChunk", selectedChunkChanged, resetOnDestroy=True)
13221326

13231327
nodeSelection = makeProperty(QObject, "_nodeSelection")
13241328

meshroom/ui/qml/GraphEditor/ChunksListView.qml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import QtQuick.Layouts
55
import Utils 1.0
66

77
/**
8-
* ChunkListView
8+
* ChunksListView
99
*/
1010

1111
ColumnLayout {
1212
id: root
13+
14+
property var uigraph: null
1315
property variant chunks
1416
property int currentIndex: 0
1517
property variant currentChunk: (chunks && currentIndex >= 0) ? chunks.at(currentIndex) : undefined
@@ -90,4 +92,17 @@ ColumnLayout {
9092
}
9193
}
9294
}
95+
96+
Connections {
97+
target: _reconstruction
98+
function onSelectedChunkChanged() {
99+
for (var i = 0; i < root.chunks.count; i++) {
100+
if (_reconstruction.selectedChunk === root.chunks.at(i)) {
101+
root.currentIndex = i
102+
break;
103+
}
104+
}
105+
}
106+
ignoreUnknownSignals: true
107+
}
93108
}

meshroom/ui/qml/GraphEditor/TaskManager.qml

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ Item {
2828
function selectNode(node) {
2929
uigraph.selectedNode = node
3030
}
31+
32+
function selectChunk(chunk) {
33+
root.selectedChunk = chunk
34+
uigraph.selectedChunk = chunk
35+
}
3136

3237
TextMetrics {
3338
id: nbMetrics
@@ -234,8 +239,21 @@ Item {
234239

235240
MouseArea {
236241
anchors.fill: parent
237-
onPressed: {
238-
selectNode(object)
242+
acceptedButtons: Qt.LeftButton | Qt.RightButton
243+
onPressed: (mouse) => {
244+
if (mouse.button === Qt.LeftButton) {
245+
selectNode(object)
246+
} else if (mouse.button === Qt.RightButton) {
247+
contextMenu.popup()
248+
}
249+
}
250+
Menu {
251+
id: contextMenu
252+
MenuItem {
253+
text: "Open Folder"
254+
height: visible ? implicitHeight : 0
255+
onTriggered: Qt.openUrlExternally(Filepath.stringToUrl(object.internalFolder))
256+
}
239257
}
240258
}
241259
}
@@ -306,10 +324,6 @@ Item {
306324
model: object.chunks
307325
property var node: object
308326

309-
onModelChanged: {
310-
console.log("TaskManager model size :", chunkList.model.count)
311-
}
312-
313327
spacing: 3
314328

315329
delegate: Loader {
@@ -341,7 +355,7 @@ Item {
341355
anchors.fill: parent
342356
onPressed: {
343357
selectNode(chunkList.node)
344-
selectedChunk = object
358+
selectChunk(object)
345359
}
346360
}
347361
}
@@ -363,7 +377,7 @@ Item {
363377
anchors.fill: parent
364378
onPressed: {
365379
selectNode(chunkList.node)
366-
selectedChunk = null
380+
selectChunk(null)
367381
}
368382
}
369383
}

0 commit comments

Comments
 (0)