diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index 70c67b9696..2648f77e67 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -1,3 +1,7 @@ +# Linting: Remove trailing whitespaces +af397c2ab6b9a8bc446a81ce7fd162a351895673 +# [core] Linting: Remove all trailing whitespaces +d12e434998dcdbe9d014bccae4f9294665bd96d4 # [tests] Linting: Remove trailing whitespaces 5fe886b6b08fa19082dc0e1bf837fa34c2e2de2d # [core] Linting: Remove remaining trailing whitespaces diff --git a/bin/meshroom_compute b/bin/meshroom_compute index 99c225a86c..5876d511d7 100755 --- a/bin/meshroom_compute +++ b/bin/meshroom_compute @@ -3,6 +3,7 @@ import argparse import logging import os import sys +from typing import NoReturn try: import meshroom @@ -16,7 +17,7 @@ meshroom.setupEnvironment() import meshroom.core import meshroom.core.graph -from meshroom.core.node import Status, ExecMode +from meshroom.core.node import Status parser = argparse.ArgumentParser(description='Execute a Graph of processes.') @@ -63,12 +64,28 @@ else: meshroom.core.initPlugins() meshroom.core.initNodes() +meshroom.core.initSubmitters() graph = meshroom.core.graph.loadGraph(args.graphFile) if args.cache: graph.cacheDir = args.cache graph.update() + +def killRunningJob(node) -> NoReturn: + """ Kills current job and try to avoid job restarting """ + jobInfo = node.nodeStatus.jobInfo + submitterName = jobInfo.get("submitterName") + if not submitterName: + sys.exit(meshroom.MeshroomExitStatus.ERROR_NO_RETRY) + from meshroom.core import submitters + for subName, sub in submitters.items(): + if submitterName == subName: + sub.killRunningJob() + break + sys.exit(meshroom.MeshroomExitStatus.ERROR_NO_RETRY) + + if args.node: node = graph.findNode(args.node) submittedStatuses = [Status.RUNNING] @@ -83,6 +100,15 @@ if args.node: # If running as "extern", the task is supposed to have the status SUBMITTED. # If not running as "extern", the SUBMITTED status should generate a warning. submittedStatuses.append(Status.SUBMITTED) + + if not node._chunksCreated: + print(f"Error: Node {node} has been submitted before chunks have been created." \ + "See file: \"{node.nodeStatusFile}\".") + sys.exit(-1) + + if node._isInputNode(): + print(f"InputNode: No computation to do.") + if not args.forceStatus and not args.forceCompute: if args.iteration != -1: chunks = [node.chunks[args.iteration]] @@ -91,10 +117,11 @@ if args.node: for chunk in chunks: if chunk.status.status in submittedStatuses: # Particular case for the local isolated, the node status is set to RUNNING by the submitter directly. - # We ensure that no other instance has started to compute, by checking that the sessionUid is empty. - if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and not chunk.status.sessionUid and chunk.status.submitterSessionUid: + # We ensure that no other instance has started to compute, by checking that the computeSessionUid is empty. + if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and \ + not chunk.status.computeSessionUid and node._nodeStatus.submitterSessionUid: continue - print(f'Warning: Node is already submitted with status "{chunk.status.status.name}". See file: "{chunk.statusFile}". ExecMode: {chunk.status.execMode.name}, SessionUid: {chunk.status.sessionUid}, submitterSessionUid: {chunk.status.submitterSessionUid}') + print(f'Warning: Node is already submitted with status "{chunk.status.status.name}". See file: "{chunk.statusFile}". ExecMode: {chunk.status.execMode.name}, computeSessionUid: {chunk.status.computeSessionUid}, submitterSessionUid: {node._nodeStatus.submitterSessionUid}') # sys.exit(-1) if args.extern: @@ -105,8 +132,14 @@ if args.node: node.preprocess() if args.iteration != -1: chunk = node.chunks[args.iteration] + if chunk._status.status == Status.STOPPED: + print(f"Chunk {chunk} : status is STOPPED") + killRunningJob(node) chunk.process(args.forceCompute, args.inCurrentEnv) else: + if node.nodeStatus.status == Status.STOPPED: + print(f"Node {node} : status is STOPPED") + killRunningJob(node) node.process(args.forceCompute, args.inCurrentEnv) node.postprocess() node.restoreLogger() diff --git a/bin/meshroom_createChunks b/bin/meshroom_createChunks new file mode 100755 index 0000000000..2fd42f9395 --- /dev/null +++ b/bin/meshroom_createChunks @@ -0,0 +1,145 @@ +#!/usr/bin/env python + +""" +This is a script used to wrap the process of processing a node on the farm +It will handle chunk creation and create all the jobs for these chunks +If the submitter cannot create chunks, then it will process the chunks serially +in the current process +""" + +import argparse +import logging +import os +import sys +try: + import meshroom +except Exception: + # If meshroom module is not in the PYTHONPATH, add our root using the relative path + import pathlib + meshroomRootFolder = pathlib.Path(__file__).parent.parent.resolve() + sys.path.append(meshroomRootFolder) + import meshroom +meshroom.setupEnvironment() + +import meshroom.core +import meshroom.core.graph +from meshroom.core import submitters +from meshroom.core.submitter import SubmitterOptionsEnum +from meshroom.core.node import Status + + +parser = argparse.ArgumentParser(description='Execute a Graph of processes.') +parser.add_argument('graphFile', metavar='GRAPHFILE.mg', type=str, + help='Filepath to a graph file.') + +parser.add_argument('--submitter', type=str, required=True, + help='Name of the submitter used to create the job.') +parser.add_argument('--node', metavar='NODE_NAME', type=str, required=True, + help='Process the node. It will generate an error if the dependencies are not already computed.') +parser.add_argument('--inCurrentEnv', help='Execute process in current env without creating a dedicated runtime environment.', + action='store_true') +parser.add_argument('--forceStatus', help='Force computation if status is RUNNING or SUBMITTED.', + action='store_true') +parser.add_argument('--forceCompute', help='Compute in all cases even if already computed.', + action='store_true') +parser.add_argument('--extern', help='Use this option when you compute externally after submission to a render farm from meshroom.', + action='store_true') +parser.add_argument('--cache', metavar='FOLDER', type=str, + default=None, + help='Override the cache folder') +parser.add_argument('-v', '--verbose', + help='Set the verbosity level for logging:\n' + ' - fatal: Show only critical errors.\n' + ' - error: Show errors only.\n' + ' - warning: Show warnings and errors.\n' + ' - info: Show standard informational messages.\n' + ' - debug: Show detailed debug information.\n' + ' - trace: Show all messages, including trace-level details.', + default=os.environ.get('MESHROOM_VERBOSE', 'info'), + choices=['fatal', 'error', 'warning', 'info', 'debug', 'trace']) + +args = parser.parse_args() + +# For extern computation, we want to focus on the node computation log. +# So, we avoid polluting the log with general warning about plugins, versions of nodes in file, etc. +logging.getLogger().setLevel(level=logging.INFO) + +meshroom.core.initPlugins() +meshroom.core.initNodes() +meshroom.core.initSubmitters() # Required to spool child job + +graph = meshroom.core.graph.loadGraph(args.graphFile) +if args.cache: + graph.cacheDir = args.cache +graph.update() + +# Execute the node +node = graph.findNode(args.node) +submittedStatuses = [Status.RUNNING] + +# Find submitter +submitter = None +# It's required if we want to spool chunks on different machines +for subName, sub in submitters.items(): + if args.submitter == subName: + submitter = sub + break + +if node._nodeStatus.status in (Status.STOPPED, Status.KILLED): + logging.error("Node status is STOPPED or KILLED.") + if submitter: + submitter.killRunningJob() + sys.exit(meshroom.MeshroomExitStatus.ERROR_NO_RETRY) + +if not node._chunksCreated: + # Create node chunks + # Once created we don't have to do it again even if we relaunch the job + node.createChunks() + # Set the chunks statuses + for chunk in node._chunks: + if args.forceCompute or chunk._status.status != Status.SUCCESS: + hasChunkToLaunch = True + chunk._status.setNode(node) + chunk._status.initExternSubmit() + chunk.upgradeStatusFile() + +# Get chunks to process in the current process +chunksToProcess = [] +if submitter: + if not submitter._options.includes(SubmitterOptionsEnum.EDIT_TASKS): + chunksToProcess = node.chunks +else: + # Cannot retrieve job -> execute process serially + chunksToProcess = node.chunks + +logging.info(f"[MeshroomCreateChunks] Chunks to process here : {chunksToProcess}") + +if not args.forceStatus and not args.forceCompute: + for chunk in chunksToProcess: + if chunk.status.status in submittedStatuses: + # Particular case for the local isolated, the node status is set to RUNNING by the submitter directly. + # We ensure that no other instance has started to compute, by checking that the sessicomputeSessionUidonUid is empty. + if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and \ + not chunk.status.computeSessionUid and node._nodeStatus.submitterSessionUid: + continue + logging.warning( + f"[MeshroomCreateChunks] Node is already submitted with status " \ + f"\"{chunk.status.status.name}\". See file: \"{chunk.statusFile}\". " \ + f"ExecMode: {chunk.status.execMode.name}, computeSessionUid: {chunk.status.computeSessionUid}, " \ + f"submitterSessionUid: {node._nodeStatus.submitterSessionUid}") + +if chunksToProcess: + node.prepareLogger() + node.preprocess() + for chunk in chunksToProcess: + logging.info(f"[MeshroomCreateChunks] process chunk {chunk}") + chunk.process(args.forceCompute, args.inCurrentEnv) + node.postprocess() + node.restoreLogger() +else: + logging.info(f"[MeshroomCreateChunks] -> create job to process chunks {node.chunks}") + submitter.createChunkTask(node, graphFile=args.graphFile, cache=args.cache, + forceStatus=args.forceStatus, forceCompute=args.forceCompute) + +# Restore the log level +logging.getLogger().setLevel(meshroom.logStringToPython[args.verbose]) diff --git a/meshroom/__init__.py b/meshroom/__init__.py index 45f409ccb1..7fb69d72d4 100644 --- a/meshroom/__init__.py +++ b/meshroom/__init__.py @@ -1,5 +1,5 @@ from distutils import util -from enum import Enum +from enum import Enum, IntEnum import logging import os import sys @@ -76,6 +76,18 @@ def logToRoot(message, *args, **kwargs): logging.getLogger().setLevel(logStringToPython[os.environ.get('MESHROOM_VERBOSE', 'warning')]) +class MeshroomExitStatus(IntEnum): + """ In case we want to catch some special case from the parent process + We could use 3-125 for custom exist codes : + https://tldp.org/LDP/abs/html/exitcodes.html + """ + SUCCESS = 0 + ERROR = 1 + # In some farm tools jobs are automatically re-tried, + # using ERROR_NO_RETRY will try to prevent that + ERROR_NO_RETRY = -999 # It's actually -999 % 256 => 25 + + def setupEnvironment(backend=Backend.STANDALONE): """ Setup environment for Meshroom to work in a prebuilt, standalone configuration. diff --git a/meshroom/core/__init__.py b/meshroom/core/__init__.py index 4f007ee963..ff10ba5416 100644 --- a/meshroom/core/__init__.py +++ b/meshroom/core/__init__.py @@ -119,7 +119,7 @@ def loadClasses(folder: str, packageName: str, classType: type) -> list[type]: classes.append(p) except Exception as exc: if classType == BaseSubmitter: - logging.warning(f" Could not load submitter {pluginName} from package '{package.__name__}'") + logging.warning(f" Could not load submitter {pluginName} from package '{package.__name__}'\n{exc}") else: tb = traceback.extract_tb(exc.__traceback__) last_call = tb[-1] diff --git a/meshroom/core/desc/computation.py b/meshroom/core/desc/computation.py index 0386fab474..72bb404b77 100644 --- a/meshroom/core/desc/computation.py +++ b/meshroom/core/desc/computation.py @@ -1,13 +1,15 @@ import math -from enum import Enum +from enum import IntEnum from .attribute import ListAttribute, IntParam -class Level(Enum): +class Level(IntEnum): NONE = 0 NORMAL = 1 INTENSIVE = 2 + EXTREME = 3 + SCRIPT=-1 class Range: @@ -46,6 +48,9 @@ def toDict(self): "rangeBlocksCount": self.nbBlocks } + def __repr__(self): + return f"" + class Parallelization: def __init__(self, staticNbBlocks=0, blockSize=0): diff --git a/meshroom/core/desc/node.py b/meshroom/core/desc/node.py index c8c4e99711..9469f3ede0 100644 --- a/meshroom/core/desc/node.py +++ b/meshroom/core/desc/node.py @@ -5,6 +5,8 @@ import shlex import shutil import sys +import signal +import subprocess import psutil @@ -20,6 +22,34 @@ _MESHROOM_COMPUTE_DEPS = ["psutil"] +# Handle cleanup +class ExitCleanup: + """ + Make sure we kill child subprocesses when the main process exits receive SIGTERM. + """ + + def __init__(self): + self._subprocesses = [] + signal.signal(signal.SIGTERM, self.exit) + + def addSubprocess(self, process): + logging.debug(f"[ExitCleanup] Register subprocess {process}") + self._subprocesses.append(process) + + def exit(self, signum, frame): + for proc in self._subprocesses: + logging.debug(f"[ExitCleanup] Kill subprocess {proc}") + try: + if proc.is_running(): + proc.terminate() + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + sys.exit(0) + +exitCleanup = ExitCleanup() + + class MrNodeType(enum.Enum): NONE = enum.auto() BASENODE = enum.auto() @@ -90,6 +120,9 @@ class BaseNode(object): documentation = "" category = "Other" plugin = None + # Licenses required to run the plugin + # Only used to select machines on the farm when the node is submitted + _licenses = [] def __init__(self): super(BaseNode, self).__init__() @@ -158,7 +191,7 @@ def processChunk(self, chunk): def executeChunkCommandLine(self, chunk, cmd, env=None): try: - with open(chunk.logFile, 'w') as logF: + with open(chunk.getLogFile(), 'w') as logF: chunk.status.commandLine = cmd chunk.saveStatusFile() cmdList = shlex.split(cmd) @@ -167,7 +200,7 @@ def executeChunkCommandLine(self, chunk, cmd, env=None): print(f"Starting Process for '{chunk.node.name}'") print(f" - commandLine: {cmd}") - print(f" - logFile: {chunk.logFile}") + print(f" - logFile: {chunk.getLogFile()}") if prog: cmdList[0] = Path(prog).as_posix() print(f" - command full path: {cmdList[0]}") @@ -192,6 +225,7 @@ def executeChunkCommandLine(self, chunk, cmd, env=None): env=env, **platformArgs, ) + exitCleanup.addSubprocess(chunk.subprocess) if hasattr(chunk, "statThread"): # We only have a statThread if the node is running in the current process @@ -212,7 +246,7 @@ def executeChunkCommandLine(self, chunk, cmd, env=None): pass if chunk.subprocess.returncode != 0: - with open(chunk.logFile, "r") as logF: + with open(chunk.getLogFile(), "r") as logF: logContent = "".join(logF.readlines()) raise RuntimeError(f'Error on node "{chunk.name}":\nLog:\n{logContent}') finally: diff --git a/meshroom/core/graph.py b/meshroom/core/graph.py index b2c4eaf29e..a39f15738f 100644 --- a/meshroom/core/graph.py +++ b/meshroom/core/graph.py @@ -15,12 +15,14 @@ import meshroom.core from meshroom.common import BaseObject, DictModel, Slot, Signal, Property from meshroom.core import Version +from meshroom.core import submitters from meshroom.core.attribute import Attribute, ListAttribute, GroupAttribute from meshroom.core.exception import GraphCompatibilityError, StopGraphVisit, StopBranchVisit from meshroom.core.graphIO import GraphIO, GraphSerializer, TemplateGraphSerializer, PartialGraphSerializer from meshroom.core.node import BaseNode, Status, Node, CompatibilityNode from meshroom.core.nodeFactory import nodeFactory from meshroom.core.mtyping import PathLike +from meshroom.core.submitter import BaseSubmittedJob, jobManager # Replace default encoder to support Enums @@ -498,6 +500,7 @@ def _addNode(self, node, uniqueName): node._name = uniqueName node.graph = self self._nodes.add(node) + node.chunksChanged.connect(self.updated) def addNode(self, node, uniqueName=None): """ @@ -1107,6 +1110,10 @@ def discoverVertex(vertex, graph): raise StopBranchVisit() def finishVertex(vertex, graph): + if not vertex.chunks: + # Chunks have not been initialized + nodes.append(vertex) + return chunksToProcess = [] for chunk in vertex.chunks: if chunk.status.status is not Status.SUCCESS: @@ -1468,6 +1475,19 @@ def updateNodesPerUid(self): # Now, update each individual node for node in self.nodes: node.updateDuplicates(nodesPerUid) + + def updateJobManagerWithNode(self, node): + if node._uid in jobManager._nodeToJob.keys(): + return + jobInfo = node._nodeStatus.jobInfo + if not jobInfo: + return + jid, subName = jobInfo.get("jid"), jobInfo.get("submitterName") + for _subName, sub in submitters.items(): + if _subName == subName: + job = sub.retrieveJob(int(jid)) + jobManager.addJob(job, [node]) + break def update(self): if not self._updateEnabled: @@ -1480,6 +1500,7 @@ def update(self): self.updateStatusFromCache() for node in self.nodes: node.dirty = False + self.updateJobManagerWithNode(node) self.updateNodesPerUid() @@ -1490,6 +1511,9 @@ def update(self): self.dirtyTopology = False self.updated.emit() + + def updateMonitoredFiles(self): + self.statusUpdated.emit() def markNodesDirty(self, fromNode): """ @@ -1615,6 +1639,7 @@ def setVerbose(self, v): cacheDirChanged = Signal() cacheDir = Property(str, cacheDir.fget, cacheDir.fset, notify=cacheDirChanged) updated = Signal() + statusUpdated = Signal() canComputeLeavesChanged = Signal() canComputeLeaves = Property(bool, lambda self: self._canComputeLeaves, notify=canComputeLeavesChanged) @@ -1679,7 +1704,7 @@ def executeGraph(graph, toNodes=None, forceCompute=False, forceStatus=False): graph.save() for node in nodes: - node.beginSequence(forceCompute) + node.initStatusOnCompute(forceCompute) for n, node in enumerate(nodes): try: @@ -1730,11 +1755,19 @@ def submitGraph(graph, submitter, toNodes=None, submitLabel="{projectName}"): raise RuntimeError("Unknown Submitter: '{submitter}'. Available submitters are: '{allSubmitters}'.".format( submitter=submitter, allSubmitters=str(meshroom.core.submitters.keys()))) + for node in nodesToProcess: + node.initStatusOnSubmit() + jobManager.resetNodeJob(node) + try: res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath, submitLabel=submitLabel) if res: + if isinstance(res, BaseSubmittedJob): + jobManager.addJob(res, nodesToProcess) + else: for node in nodesToProcess: - node.initStatusOnSubmit() # update node status + # TODO : Notify the node that there was an issue on submit + pass except Exception as exc: logging.error(f"Error on submit: {exc}") diff --git a/meshroom/core/node.py b/meshroom/core/node.py index 5e0540fd29..7f1594cb04 100644 --- a/meshroom/core/node.py +++ b/meshroom/core/node.py @@ -31,7 +31,7 @@ def renameWritingToFinalPath(writingFilepath: str, filepath: str) -> str: if platform.system() == 'Windows': # On Windows, attempting to remove a file that is in use causes an exception to be raised. # So we may need multiple trials, if someone is reading it at the same time. - for i in range(20): + for _ in range(20): try: os.remove(filepath) # If remove is successful, we can stop the iterations @@ -40,7 +40,6 @@ def renameWritingToFinalPath(writingFilepath: str, filepath: str) -> str: pass os.rename(writingFilepath, filepath) - class Status(Enum): """ """ @@ -62,31 +61,37 @@ class ExecMode(Enum): EXTERN = auto() -class StatusData(BaseObject): - """ - """ - dateTimeFormatting = '%Y-%m-%d %H:%M:%S.%f' +# Simple structure for storing chunk information +NodeChunkSetup = namedtuple("NodeChunks", ["blockSize", "fullSize", "nbBlocks"]) + +class NodeStatusData(BaseObject): + __slots__ = ("nodeName", "nodeType", "status", "execMode", "packageName", "mrNodeType", + "submitterSessionUid", "chunksBlockSize", "chunksFullSize", "chunksNbBlocks", "jobInfo") def __init__(self, nodeName='', nodeType='', packageName='', mrNodeType: MrNodeType = MrNodeType.NONE, parent: BaseObject = None): super().__init__(parent) - self.nodeName: str = nodeName self.nodeType: str = nodeType self.packageName: str = packageName - self.mrNodeType = mrNodeType + self.mrNodeType: str = mrNodeType - self.sessionUid: Optional[str] = None + # Session UID where the node was submitted self.submitterSessionUid: Optional[str] = None - self.execMode: ExecMode = ExecMode.NONE + self.reset() + def reset(self): + self.resetChunkInfo() self.resetDynamicValues() - def setNode(self, node): - """ Set the node information from one node instance. """ - self.nodeName = node.name - self.setNodeType(node) + def resetChunkInfo(self): + self.chunks: NodeChunkSetup = None + + def resetDynamicValues(self): + self.status: Status = Status.NONE + self.execMode: ExecMode = ExecMode.NONE + self.jobInfo: dict = {} def setNodeType(self, node): """ @@ -97,17 +102,131 @@ def setNodeType(self, node): self.packageName = node.packageName self.mrNodeType = node.getMrNodeType() - def merge(self, other): - self.startDateTime = min(self.startDateTime, other.startDateTime) - self.endDateTime = max(self.endDateTime, other.endDateTime) - self.elapsedTime += other.elapsedTime + def setNode(self, node): + """ Set the node information from one node instance. """ + self.nodeName = node.name + self.setNodeType(node) + + def setJob(self, jid, submitterName): + """ Set Job information on the node. """ + self.jobInfo = { + "jid": str(jid), + "submitterName": str(submitterName), + } + + @property + def jobName(self): + if self.jobInfo: + return f"{self.jobInfo['submitterName']}<{self.jobInfo['jid']}>" + else: + return "UNKNOWN" + + def initExternSubmit(self): + """ + When submitting a node, we reset the status information to ensure that we do not keep + outdated information. + """ + self.resetDynamicValues() + self.submitterSessionUid = meshroom.core.sessionUid + self.status = Status.SUBMITTED + self.execMode = ExecMode.EXTERN + + def initLocalSubmit(self): + """ + When submitting a node, we reset the status information to ensure that we do not keep + outdated information. + """ + self.resetDynamicValues() + self.submitterSessionUid = meshroom.core.sessionUid + self.status = Status.SUBMITTED + self.execMode = ExecMode.LOCAL + + def toDict(self): + keys = list(self.__slots__) or [] + d = {key:getattr(self, key, "") for key in keys} + for _k, _v in d.items(): + if isinstance(_v, Enum): + d[_k] = _v.name + if self.chunks: + d["chunksBlockSize"] = self.chunks.blockSize + d["chunksFullSize"] = self.chunks.fullSize + d["chunksNbBlocks"] = self.chunks.nbBlocks + return d + + def fromDict(self, d): + self.reset() + if "mrNodeType" in d: + self.mrNodeType = MrNodeType[d.pop("mrNodeType")] + if "chunksBlockSize" in d and "chunksFullSize" in d and "chunksNbBlocks" in d: + blockSize = int(d.pop("chunksBlockSize") or 0) + fullSize = int(d.pop("chunksFullSize") or 0) + nbBlocks = int(d.pop("chunksNbBlocks") or 0) + self.chunks = NodeChunkSetup(blockSize, fullSize, nbBlocks) + if "status" in d: + self.status: Status = Status[d.pop("status")] + if "execMode" in d: + self.execMode = ExecMode[d.pop("execMode")] + for _key, _value in d.items(): + if _key in self.__slots__: + setattr(self, _key, _value) + + def loadFromCache(self, statusFile): + self.reset() + try: + with open(statusFile) as jsonFile: + statusData = json.load(jsonFile) + self.fromDict(statusData) + except Exception as e: + logging.warning(f"(loadFromCache) {self.nodeName}: Error while loading status file {statusFile}: {e}") + self.reset() + + @property + def nbChunks(self): + nbBlocks = self.chunks.nbBlocks if self.chunks else -1 + return nbBlocks + + def getChunkRanges(self): + if not self.chunks: + return [] + ranges = [] + for i in range(self.chunks.nbBlocks): + ranges.append(desc.Range( + iteration=i, + blockSize=self.chunks.blockSize, + fullSize=self.chunks.fullSize, + nbBlocks=self.chunks.nbBlocks + )) + return ranges + + def setChunks(self, chunks): + blockSize, fullSize, nbBlocks = 1, 1, 1 + for c in chunks: + r = c.range + blockSize, fullSize, nbBlocks = r.blockSize, r.fullSize, r.nbBlocks + break + self.chunks = NodeChunkSetup(blockSize, fullSize, nbBlocks) + + +class ChunkStatusData(BaseObject): + """ + """ + dateTimeFormatting = '%Y-%m-%d %H:%M:%S.%f' + + __slots__ = ( + "nodeName", "mrNodeType", "computeSessionUid", "execMode", "status", + "commandLine", "startDateTime", "endDateTime", "elapsedTime", "hostname" + ) + + def __init__(self, nodeName='', mrNodeType: MrNodeType = MrNodeType.NONE, parent: BaseObject = None): + super().__init__(parent) + + self.nodeName: str = nodeName + self.mrNodeType = mrNodeType + + self.computeSessionUid: Optional[str] = None # Session where computation is done - def reset(self): - self.nodeName: str = "" - self.nodeType: str = "" - self.packageName: str = "" - self.mrNodeType: MrNodeType = MrNodeType.NONE self.execMode: ExecMode = ExecMode.NONE + self.resetDynamicValues() def resetDynamicValues(self): @@ -119,9 +238,25 @@ def resetDynamicValues(self): self.elapsedTime: float = 0.0 self.hostname: str = "" + def setNode(self, node): + """ Set the node information from one node instance. """ + self.nodeName = node.name + self.mrNodeType = node.getMrNodeType() + + def merge(self, other): + self.startDateTime = min(self.startDateTime, other.startDateTime) + self.endDateTime = max(self.endDateTime, other.endDateTime) + self.elapsedTime += other.elapsedTime + + def reset(self): + self.nodeName: str = "" + self.mrNodeType: MrNodeType = MrNodeType.NONE + self.execMode: ExecMode = ExecMode.NONE + self.resetDynamicValues() + def initStartCompute(self): import platform - self.sessionUid = meshroom.core.sessionUid + self.computeSessionUid = meshroom.core.sessionUid self.hostname = platform.node() self._startTime = time.time() self.startDateTime = datetime.datetime.now().strftime(self.dateTimeFormatting) @@ -139,8 +274,7 @@ def initIsolatedCompute(self): self.resetDynamicValues() self.initStartCompute() assert self.mrNodeType == MrNodeType.NODE - self.sessionUid = None - self.submitterSessionUid = meshroom.core.sessionUid + self.computeSessionUid = None def initExternSubmit(self): """ @@ -148,8 +282,7 @@ def initExternSubmit(self): outdated information. """ self.resetDynamicValues() - self.sessionUid = None - self.submitterSessionUid = meshroom.core.sessionUid + self.computeSessionUid = None self.status = Status.SUBMITTED self.execMode = ExecMode.EXTERN @@ -159,13 +292,12 @@ def initLocalSubmit(self): outdated information. """ self.resetDynamicValues() - self.sessionUid = None - self.submitterSessionUid = meshroom.core.sessionUid + self.computeSessionUid = None self.status = Status.SUBMITTED self.execMode = ExecMode.LOCAL def initEndCompute(self): - self.sessionUid = meshroom.core.sessionUid + self.computeSessionUid = meshroom.core.sessionUid self.endDateTime = datetime.datetime.now().strftime(self.dateTimeFormatting) if self._startTime != None: self.elapsedTime = time.time() - self._startTime @@ -175,38 +307,24 @@ def elapsedTimeStr(self): return str(datetime.timedelta(seconds=self.elapsedTime)) def toDict(self): - d = self.__dict__.copy() - d["elapsedTimeStr"] = self.elapsedTimeStr - - # Skip some attributes (some are from BaseObject) - d.pop("destroyed", None) - d.pop("objectNameChanged", None) - d.pop("_parent", None) - d.pop("_startTime", None) - + keys = list(self.__slots__) or [] + d = {key:getattr(self, key) for key in keys} + for _k, _v in d.items(): + if isinstance(_v, Enum): + d[_k] = _v.name return d def fromDict(self, d): - self.status = d.get("status", Status.NONE) - if not isinstance(self.status, Status): - self.status = Status[self.status] - self.execMode = d.get("execMode", ExecMode.NONE) - if not isinstance(self.execMode, ExecMode): - self.execMode = ExecMode[self.execMode] - self.mrNodeType = d.get("mrNodeType", MrNodeType.NONE) - if not isinstance(self.mrNodeType, MrNodeType): - self.mrNodeType = MrNodeType[self.mrNodeType] - - self.nodeName = d.get("nodeName", "") - self.nodeType = d.get("nodeType", "") - self.packageName = d.get("packageName", "") - self.commandLine = d.get("commandLine", "") - self.startDateTime = d.get("startDateTime", "") - self.endDateTime = d.get("endDateTime", "") - self.elapsedTime = d.get("elapsedTime", 0) - self.hostname = d.get("hostname", "") - self.sessionUid = d.get("sessionUid", "") - self.submitterSessionUid = d.get("submitterSessionUid", "") + self.reset() + if "status" in d: + self.status: Status = Status[d.pop("status")] + if "execMode" in d: + self.execMode = ExecMode[d.pop("execMode")] + if "mrNodeType" in d: + self.mrNodeType = MrNodeType[d.pop("mrNodeType")] + for _key, _value in d.items(): + if _key in self.__slots__: + setattr(self, _key, _value) class LogManager: @@ -333,14 +451,16 @@ def __init__(self, node, range, parent=None): self.node = node self.range = range self._logManager = None - self._status: StatusData = StatusData(node.name, node.nodeType, node.packageName, - node.getMrNodeType()) + self._status: ChunkStatusData = ChunkStatusData(nodeName=node.name, mrNodeType=node.getMrNodeType()) self.statistics: stats.Statistics = stats.Statistics() self.statusFileLastModTime = -1 self.subprocess = None # Notify update in filepaths when node's internal folder changes self.node.internalFolderChanged.connect(self.nodeFolderChanged) + def __repr__(self): + return f"" + @property def index(self): return self.range.iteration @@ -356,77 +476,84 @@ def name(self): def logManager(self): if self._logManager is None: logger = logging.getLogger(self.node.getName()) - self._logManager = LogManager(logger, self.logFile) + self._logManager = LogManager(logger, self.getLogFile()) return self._logManager - @property - def statusName(self): + def getStatusName(self): return self._status.status.name @property def logger(self): return self.logManager.logger - @property - def execModeName(self): + def getExecModeName(self): return self._status.execMode.name + def shouldMonitorChanges(self): + """ + Check whether we should monitor changes in minimal mode. + Only chunks that are run externally or local_isolated should be monitored, + when run locally, status changes are already notified. + Chunks with an ERROR status may be re-submitted externally and should thus still be + monitored. + """ + return (self.isExtern() and self._status.status in (Status.SUBMITTED, Status.RUNNING, Status.ERROR)) or \ + (self.node.getMrNodeType() == MrNodeType.NODE and self._status.status in (Status.SUBMITTED, Status.RUNNING)) + def updateStatusFromCache(self): """ - Update node status based on status file content/existence. + Update chunk status based on status file content/existence. """ - statusFile = self.statusFile + # TODO : If this is a placeholder chunk + # Then we shouldn't do anything here + + statusFile = self.getStatusFile() oldStatus = self._status.status # No status file => reset status to Status.None if not os.path.exists(statusFile): self.statusFileLastModTime = -1 self._status.reset() - self._status.setNodeType(self.node) + self._status.setNode(self.node) else: try: with open(statusFile) as jsonFile: statusData = json.load(jsonFile) - # logging.debug(f"updateStatusFromCache({self.node.name}): From status {self.status.status} to {statusData['status']}") + # logging.debug(f"updateStatusFromCache({self.node.name}): From status {self._status.status} to {statusData['status']}") self._status.fromDict(statusData) self.statusFileLastModTime = os.path.getmtime(statusFile) except Exception as exc: logging.debug(f"updateStatusFromCache({self.node.name}): Error while loading status file {statusFile}: {exc}") self.statusFileLastModTime = -1 self._status.reset() - self._status.setNodeType(self.node) + self._status.setNode(self.node) - if oldStatus != self.status.status: + if oldStatus != self._status.status: self.statusChanged.emit() - @property - def statusFile(self): + def getStatusFile(self): if self.range.blockSize == 0: return os.path.join(self.node.internalFolder, "status") else: - return os.path.join(self.node.internalFolder, - str(self.index) + ".status") + return os.path.join(self.node.internalFolder, str(self.index) + ".status") - @property - def statisticsFile(self): + def getStatisticsFile(self): if self.range.blockSize == 0: return os.path.join(self.node.internalFolder, "statistics") else: return os.path.join(self.node.internalFolder, str(self.index) + ".statistics") - @property - def logFile(self): + def getLogFile(self): if self.range.blockSize == 0: return os.path.join(self.node.internalFolder, "log") else: - return os.path.join(self.node.internalFolder, - str(self.index) + ".log") + return os.path.join(self.node.internalFolder, str(self.index) + ".log") def saveStatusFile(self): """ Write node status on disk. """ data = self._status.toDict() - statusFilepath = self.statusFile + statusFilepath = self.getStatusFile() folder = os.path.dirname(statusFilepath) os.makedirs(folder, exist_ok=True) @@ -440,6 +567,8 @@ def upgradeStatusFile(self): Upgrade node status file based on the current status. """ self.saveStatusFile() + # We want to make sure the nodeStatus is up to date too + self.node.upgradeStatusFile() self.statusChanged.emit() def upgradeStatusTo(self, newStatus, execMode=None): @@ -455,7 +584,7 @@ def updateStatisticsFromCache(self): """ """ oldTimes = self.statistics.times - statisticsFile = self.statisticsFile + statisticsFile = self.getStatisticsFile() if not os.path.exists(statisticsFile): return with open(statisticsFile) as jsonFile: @@ -466,7 +595,7 @@ def updateStatisticsFromCache(self): def saveStatistics(self): data = self.statistics.toDict() - statisticsFilepath = self.statisticsFile + statisticsFilepath = self.getStatisticsFile() folder = os.path.dirname(statisticsFilepath) os.makedirs(folder, exist_ok=True) statisticsFilepathWriting = getWritingFilepath(statisticsFilepath) @@ -567,9 +696,6 @@ def _processInIsolatedEnvironment(self): self.node.updateOutputAttr() def stopProcess(self): - if self.isExtern(): - raise ValueError("Cannot stop process: node is computed externally (another instance of Meshroom)") - # Ensure that we are up-to-date self.updateStatusFromCache() @@ -604,20 +730,20 @@ def isExtern(self): return True elif self._status.execMode == ExecMode.LOCAL: if self._status.status in (Status.SUBMITTED, Status.RUNNING): - return meshroom.core.sessionUid not in (self._status.submitterSessionUid, self._status.sessionUid) + return meshroom.core.sessionUid not in (self.node._nodeStatus.submitterSessionUid, self._status.computeSessionUid) return False return False statusChanged = Signal() status = Property(Variant, lambda self: self._status, notify=statusChanged) - statusName = Property(str, statusName.fget, notify=statusChanged) - execModeName = Property(str, execModeName.fget, notify=statusChanged) + statusName = Property(str, getStatusName, notify=statusChanged) + execModeName = Property(str, getExecModeName, notify=statusChanged) statisticsChanged = Signal() nodeFolderChanged = Signal() - statusFile = Property(str, statusFile.fget, notify=nodeFolderChanged) - logFile = Property(str, logFile.fget, notify=nodeFolderChanged) - statisticsFile = Property(str, statisticsFile.fget, notify=nodeFolderChanged) + statusFile = Property(str, getStatusFile, notify=nodeFolderChanged) + logFile = Property(str, getLogFile, notify=nodeFolderChanged) + statisticsFile = Property(str, getStatisticsFile, notify=nodeFolderChanged) nodeName = Property(str, lambda self: self.node.name, constant=True) statusNodeName = Property(str, lambda self: self._status.nodeName, notify=statusChanged) @@ -670,7 +796,9 @@ def __init__(self, nodeType: str, position: Position = None, parent: BaseObject self._name: str = f"_{nodeType}_{uuid.uuid1()}" self.graph = None self.dirty: bool = True # whether this node's outputs must be re-evaluated on next Graph update - self._chunks = ListModel(parent=self) + self._chunks: list[NodeChunk] = ListModel(parent=self) + self._chunksCreated = False # Only initialize chunks on compute + self._chunkPlaceholder: list[NodeChunk] = ListModel(parent=self) # Placeholder chunk for nodes with dynamic ones self._uid: str = uid self._expVars: dict = {} self._size: int = 0 @@ -684,6 +812,10 @@ def __init__(self, nodeType: str, position: Position = None, parent: BaseObject self._duplicates = ListModel(parent=self) # list of nodes with the same uid self._hasDuplicates: bool = False + self._nodeStatus: NodeStatusData = NodeStatusData(self._name, nodeType, self.packageName, + self.getMrNodeType()) + self.nodeStatusFileLastModTime = -1 + self.globalStatusChanged.connect(self.updateDuplicatesStatusAndLocked) self._staticExpVars = { @@ -1076,13 +1208,11 @@ def _buildAttributeCmdLineVars(cmdLineVars, name, attr): def isParallelized(self): return bool(self.nodeDesc.parallelization) if meshroom.useMultiChunks else False - @property - def nbParallelizationBlocks(self): - return len(self._chunks) - def hasStatus(self, status: Status): - if not self._chunks: - return status == Status.INPUT + if not self._chunks or not self._chunksCreated: + if self.isInputNode: + return status == Status.INPUT + return status == Status.NONE for chunk in self._chunks: if chunk.status.status != status: return False @@ -1105,6 +1235,10 @@ def clearData(self): """ Delete this Node internal folder. Status will be reset to Status.NONE """ + # Clear cache + self._nodeStatus.reset() + # Reset chunks + self._resetChunks() if self.internalFolder and os.path.exists(self.internalFolder): try: shutil.rmtree(self.internalFolder) @@ -1125,16 +1259,16 @@ def getStartDateTime(self): return min(dateTime) if len(dateTime) != 0 else "" def isAlreadySubmitted(self): - for chunk in self._chunks: - if chunk.isAlreadySubmitted(): - return True - return False + if self._chunksCreated: + return any(c.isAlreadySubmitted() for c in self._chunks) + else: + return self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING) def isAlreadySubmittedOrFinished(self): - for chunk in self._chunks: - if not chunk.isAlreadySubmittedOrFinished(): - return False - return True + if self._chunksCreated: + return all(c.isAlreadySubmittedOrFinished() for c in self._chunks) + else: + return self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING, Status.SUCCESS) @Slot(result=bool) def isSubmittedOrRunning(self): @@ -1142,6 +1276,8 @@ def isSubmittedOrRunning(self): Return True if all chunks are at least submitted and there is one running chunk, False otherwise. """ + if not self._chunksCreated: + return False if not self.isAlreadySubmittedOrFinished(): return False for chunk in self._chunks: @@ -1160,6 +1296,8 @@ def isFinishedOrRunning(self): Return True if all chunks of this Node is either finished or running, False otherwise. """ + if not self._chunks: + return False return all(chunk.isFinishedOrRunning() for chunk in self._chunks) @Slot(result=bool) @@ -1167,9 +1305,6 @@ def isPartiallyFinished(self): """ Return True is at least one chunk of this Node is finished, False otherwise. """ return any(chunk.isFinished() for chunk in self._chunks) - def alreadySubmittedChunks(self): - return [ch for ch in self._chunks if ch.isAlreadySubmitted()] - def isExtern(self): """ Return True if at least one chunk of this Node has an external execution mode, @@ -1181,7 +1316,11 @@ def isExtern(self): interrupted, its execution mode will always be local, even if computations resume externally. """ - if len(self._chunks) == 0: + if not self._chunksCreated: + if self._nodeStatus.execMode == ExecMode.EXTERN: + return True + elif self._nodeStatus.execMode == ExecMode.LOCAL and self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING): + return meshroom.core.sessionUid != self._nodeStatus.submitterSessionUid return False return any(chunk.isExtern() for chunk in self._chunks) @@ -1195,26 +1334,54 @@ def clearSubmittedChunks(self): This must be used with caution. This could lead to inconsistent node status if the graph is still being computed. """ - for chunk in self._chunks: - if chunk.isAlreadySubmitted(): - chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE) + if self._chunksCreated: + for chunk in self._chunks: + if chunk.isAlreadySubmitted(): + chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE) + else: + if self.isAlreadySubmitted(): + self.upgradeStatusTo(Status.NONE, ExecMode.NONE) + self.globalStatusChanged.emit() def clearLocallySubmittedChunks(self): """ Reset all locally submitted chunks to Status.NONE. """ - for chunk in self._chunks: - if chunk.isAlreadySubmitted() and not chunk.isExtern(): - chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE) + if self._chunksCreated: + for chunk in self._chunks: + if chunk.isAlreadySubmitted() and not chunk.isExtern(): + chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE) + else: + if self.isAlreadySubmitted() and not self.isExtern(): + self.upgradeStatusTo(Status.NONE, ExecMode.NONE) + self.globalStatusChanged.emit() - def upgradeStatusTo(self, newStatus): + def upgradeStatusTo(self, newStatus, execMode=None): """ Upgrade node to the given status and save it on disk. """ - for chunk in self._chunks: - chunk.upgradeStatusTo(newStatus) + if self._chunksCreated: + for chunk in self._chunks: + chunk.upgradeStatusTo(newStatus) + else: + if execMode is not None: + self._nodeStatus.execMode = execMode + self._nodeStatus.status = newStatus + self.upgradeStatusFile() + chunkPlaceholder = NodeChunk(self, desc.computation.Range()) + chunkPlaceholder._status.execMode = self._nodeStatus.execMode + chunkPlaceholder._status.status = self._nodeStatus.status + self.chunkPlaceholder.setObjectList([chunkPlaceholder]) + self.chunksChanged.emit() + self.globalStatusChanged.emit() def updateStatisticsFromCache(self): for chunk in self._chunks: chunk.updateStatisticsFromCache() - def _updateChunks(self): + def _resetChunks(self): + pass + + def createChunksFromCache(self): + pass + + def _createChunks(self): pass def _updateNodeSize(self): @@ -1303,8 +1470,8 @@ def updateInternals(self, cacheDir=None): self._updateNodeSize() - # Update chunks splitting - self._updateChunks() + # Reset chunks splitting + self._resetChunks() # Retrieve current internal folder (if possible) try: folder = self.internalFolder @@ -1332,30 +1499,128 @@ def internalFolder(self): def sourceCodeFolder(self): return self._sourceCodeFolder - def updateStatusFromCache(self): + @property + def nodeStatusFile(self): + return os.path.join(self.graph.cacheDir, self.internalFolder, "nodeStatus") + + def shouldMonitorChanges(self): + """ Check whether we should monitor changes in minimal mode. + Only chunks that are run externally or local_isolated should be monitored, + when run locally, status changes are already notified. + Chunks with an ERROR status may be re-submitted externally and should thus still be monitored """ - Update node status based on status file content/existence. + if self._chunksCreated: + # Only monitor when chunks are not created (in this case monitor chunk status files instead) + return False + return (self.isExtern() and self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING, Status.ERROR)) or \ + (self.getMrNodeType() == MrNodeType.NODE and self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING)) + + def updateNodeStatusFromCache(self): """ + Update node status based on status file content/existence. + # TODO : integrate nodeStatusFileLastModTime ? + Returns True if a change on the chunk setup has been detected + """ + chunksRangeHasChanged = False + if os.path.exists(self.nodeStatusFile): + oldChunkSetup = self._nodeStatus.chunks + self._nodeStatus.loadFromCache(self.nodeStatusFile) + if self._nodeStatus.chunks != oldChunkSetup: + chunksRangeHasChanged = True + self.nodeStatusFileLastModTime = os.path.getmtime(self.nodeStatusFile) + else: + # No status file => reset status to Status.None + self.nodeStatusFileLastModTime = -1 + self._nodeStatus.reset() + self._nodeStatus.setNodeType(self) + return chunksRangeHasChanged + + def updateStatusFromCache(self): + """ Update node status based on status file content/existence. """ + # Update nodeStatus from cache + chunkChanged = self.updateNodeStatusFromCache() + # Create chunks if we found info on them on the node cache + if chunkChanged and self._nodeStatus.nbChunks > 0: + # Update number of chunks + try: + self.createChunksFromCache() + except Exception as e: + logging.warning(f"Could not create chunks from cache: {e}") + return s = self.globalStatus - for chunk in self._chunks: - chunk.updateStatusFromCache() - # logging.warning(f"updateStatusFromCache: {self.name}, status: {s} => {self.globalStatus}") + if self._chunksCreated: + for chunk in self._chunks: + chunk.updateStatusFromCache() + else: + # Restore placeholder chunk if needed + chunkPlaceholder = NodeChunk(self, desc.computation.Range()) + chunkPlaceholder._status.execMode = self._nodeStatus.execMode + chunkPlaceholder._status.status = self._nodeStatus.status + self._chunkPlaceholder.setObjectList([chunkPlaceholder]) + # logging.debug(f"updateStatusFromCache: {self.name}, status: {s} => {self.globalStatus}") self.updateOutputAttr() + def upgradeStatusFile(self): + """ Write node status on disk. """ + # Make sure the node has the globalStatus before saving it + self._nodeStatus.status = self.getGlobalStatus() + data = self._nodeStatus.toDict() + statusFilepath = self.nodeStatusFile + folder = os.path.dirname(statusFilepath) + os.makedirs(folder, exist_ok=True) + statusFilepathWriting = getWritingFilepath(statusFilepath) + with open(statusFilepathWriting, 'w') as jsonFile: + json.dump(data, jsonFile, indent=4) + renameWritingToFinalPath(statusFilepathWriting, statusFilepath) + + def setJobId(self, jid, submitterName): + self._nodeStatus.setJob(jid, submitterName) + self.upgradeStatusFile() + def initStatusOnSubmit(self, forceCompute=False): """ Prepare chunks status when the node is in a graph that was submitted """ + hasChunkToLaunch = False + if not self._chunksCreated: + hasChunkToLaunch = True for chunk in self._chunks: - if forceCompute or chunk.status.status != Status.SUCCESS: + if forceCompute or chunk._status.status != Status.SUCCESS: + hasChunkToLaunch = True chunk._status.setNode(self) chunk._status.initExternSubmit() chunk.upgradeStatusFile() - - def beginSequence(self, forceCompute=False): + if hasChunkToLaunch: + self._nodeStatus.setNode(self) + self._nodeStatus.initExternSubmit() + self.upgradeStatusFile() + self.globalStatusChanged.emit() + if self._nodeStatus.execMode == ExecMode.EXTERN and self._nodeStatus.status in (Status.RUNNING, Status.SUBMITTED): + chunkPlaceholder = NodeChunk(self, desc.computation.Range()) + chunkPlaceholder._status.execMode = self._nodeStatus.execMode + chunkPlaceholder._status.status = self._nodeStatus.status + self._chunkPlaceholder.setObjectList([chunkPlaceholder]) + self.chunksChanged.emit() + + def initStatusOnCompute(self, forceCompute=False): + hasChunkToLaunch = False + if not self._chunksCreated: + hasChunkToLaunch = True for chunk in self._chunks: - if forceCompute or (chunk.status.status not in (Status.RUNNING, Status.SUCCESS)): + if forceCompute or (chunk._status.status not in (Status.RUNNING, Status.SUCCESS)): + hasChunkToLaunch = True chunk._status.setNode(self) chunk._status.initLocalSubmit() chunk.upgradeStatusFile() + if hasChunkToLaunch: + self._nodeStatus.setNode(self) + self._nodeStatus.initLocalSubmit() + self.upgradeStatusFile() + self.globalStatusChanged.emit() + if self._nodeStatus.execMode == ExecMode.LOCAL and self._nodeStatus.status in (Status.RUNNING, Status.SUBMITTED): + chunkPlaceholder = NodeChunk(self, desc.computation.Range()) + chunkPlaceholder._status.execMode = self._nodeStatus.execMode + chunkPlaceholder._status.status = self._nodeStatus.status + self._chunkPlaceholder.setObjectList([chunkPlaceholder]) + self.chunksChanged.emit() def processIteration(self, iteration): self._chunks[iteration].process() @@ -1465,8 +1730,15 @@ def endSequence(self): def stopComputation(self): """ Stop the computation of this node. """ - for chunk in self._chunks.values(): - chunk.stopProcess() + if self._chunks: + for chunk in self._chunks.values(): + chunk.stopProcess() + else: + # Ensure that we are up-to-date + self.updateNodeStatusFromCache() + # The only status possible here is submitted + if self._nodeStatus.status is Status.SUBMITTED: + self.upgradeStatusTo(Status.NONE) def getGlobalStatus(self): """ @@ -1477,12 +1749,15 @@ def getGlobalStatus(self): """ if isinstance(self.nodeDesc, desc.InputNode): return Status.INPUT + if not self._chunksCreated: + # Get status from nodeStatus + return self._nodeStatus.status if not self._chunks: return Status.NONE if len( self._chunks) == 1: - return self._chunks[0].status.status + return self._chunks[0]._status.status - chunksStatus = [chunk.status.status for chunk in self._chunks] + chunksStatus = [chunk._status.status for chunk in self._chunks] anyOf = (Status.ERROR, Status.STOPPED, Status.KILLED, Status.RUNNING, Status.SUBMITTED) @@ -1497,18 +1772,18 @@ def getGlobalStatus(self): return Status.NONE - @Slot(result=StatusData) + @Slot(result=ChunkStatusData) def getFusedStatus(self): if not self._chunks: - return StatusData() - fusedStatus = StatusData() - fusedStatus.fromDict(self._chunks[0].status.toDict()) + return ChunkStatusData() + fusedStatus = ChunkStatusData() + fusedStatus.fromDict(self._chunks[0]._status.toDict()) for chunk in self._chunks[1:]: - fusedStatus.merge(chunk.status) + fusedStatus.merge(chunk._status) fusedStatus.status = self.getGlobalStatus() return fusedStatus - @Slot(result=StatusData) + @Slot(result=ChunkStatusData) def getRecursiveFusedStatus(self): fusedStatus = self.getFusedStatus() nodes = self.getInputNodes(recursive=True, dependenciesOnly=True) @@ -1524,11 +1799,22 @@ def _isInputNode(self): @property def globalExecMode(self): + if not self._chunksCreated: + return self._nodeStatus.execMode.name if len(self._chunks): - return self._chunks.at(0).execModeName + return self._chunks.at(0).getExecModeName() else: return ExecMode.NONE + def _getJobName(self): + execMode = self._nodeStatus.execMode + if execMode == ExecMode.LOCAL: + return "LOCAL" + elif execMode == ExecMode.EXTERN: + return self._nodeStatus.jobName + else: + return "NONE" + def getChunks(self) -> list[NodeChunk]: return self._chunks @@ -1644,26 +1930,28 @@ def updateDuplicates(self, nodesPerUid): self.hasDuplicatesChanged.emit() def statusInThisSession(self) -> bool: + """ Check if chunks of the node are being computed in the current session + TODO: Not used -> depreciate ? + """ if not self._chunks: return False for chunk in self._chunks: - if chunk.status.sessionUid != meshroom.core.sessionUid: + if chunk._status.computeSessionUid != meshroom.core.sessionUid: return False return True def submitterStatusInThisSession(self) -> bool: - if not self._chunks: - return False - for chunk in self._chunks: - if chunk.status.submitterSessionUid != meshroom.core.sessionUid: - return False - return True + """ Check if the node is submitted by the current session + TODO: Not used -> depreciate ? + """ def initFromThisSession(self) -> bool: - if len(self._chunks) == 0: - return False + """ Check if the node was submitted from the current session """ + if not self._chunksCreated or not self._chunks: + return meshroom.core.sessionUid == self._nodeStatus.submitterSessionUid for chunk in self._chunks: - if meshroom.core.sessionUid not in (chunk.status.sessionUid, chunk.status.submitterSessionUid): + # Technically the check on chunk._status.computeSessionUid is useless + if meshroom.core.sessionUid not in (chunk._status.computeSessionUid, self._nodeStatus.submitterSessionUid): return False return True @@ -1679,29 +1967,49 @@ def isMainNode(self) -> bool: @Slot(result=bool) def canBeStopped(self) -> bool: + """ + Return True if this node can be stopped, False otherwise. A node can be stopped if: + - it has the "RUNNING" status (it is currently being computed) + - it is executed locally and started from this Meshroom session OR it is executed externally on a render farm + (and is thus associated to a job name). A node that is executed externally but without an associated job is + likely a node that was started from another Meshroom instance, and thus cannot be stopped from this one. + """ if not self.isComputableType: return False if self.isCompatibilityNode: return False # Only locked nodes running in local with the same - # sessionUid as the Meshroom instance can be stopped - return (self.getGlobalStatus() == Status.RUNNING and - self.globalExecMode == ExecMode.LOCAL.name and - self.isMainNode() and - self.initFromThisSession()) + # computeSessionUid as the Meshroom instance can be stopped + return (self.getGlobalStatus() == Status.RUNNING and self.isMainNode() and + ( + (self.globalExecMode == ExecMode.LOCAL.name and self.initFromThisSession()) + or + (self.globalExecMode == ExecMode.EXTERN.name and self._nodeStatus.jobName != "UNKNOWN") + ) + ) @Slot(result=bool) def canBeCanceled(self) -> bool: + """ + Return True if this node can be canceled, False otherwise. A node can be canceled if: + - it has the "SUBMITTED" status (it is not running yet, but is expected to be in the near future) + - it is executed locally and started from this Meshroom session OR it is executed externally on a render farm + (and is thus associated to a job name). A node that is executed externally but without an associated job is + likely a node that was started from another Meshroom instance, and thus cannot be canceled from this one. + """ if not self.isComputableType: return False if self.isCompatibilityNode: return False # Only locked nodes submitted in local with the same - # sessionUid as the Meshroom instance can be canceled - return (self.getGlobalStatus() == Status.SUBMITTED and - self.globalExecMode == ExecMode.LOCAL.name and - self.isMainNode() and - self.initFromThisSession()) + # computeSessionUid as the Meshroom instance can be canceled + return (self.getGlobalStatus() == Status.SUBMITTED and self.isMainNode() and + ( + (self.globalExecMode == ExecMode.LOCAL.name and self.initFromThisSession()) + or + (self.globalExecMode == ExecMode.EXTERN.name and self._nodeStatus.jobName != "UNKNOWN") + ) + ) def hasImageOutputAttribute(self) -> bool: """ @@ -1749,6 +2057,9 @@ def _hasDisplayableShape(self): nodeType = Property(str, nodeType.fget, constant=True) documentation = Property(str, getDocumentation, constant=True) nodeInfo = Property(Variant, getNodeInfo, constant=True) + nodeStatusChanged = Signal() + nodeStatus = Property(Variant, lambda self: self._nodeStatus, notify=nodeStatusChanged) + nodeStatusNodeName = Property(str, lambda self: self._nodeStatus.nodeName, notify=nodeStatusChanged) positionChanged = Signal() position = Property(Variant, position.fget, position.fset, notify=positionChanged) x = Property(float, lambda self: self._position.x, notify=positionChanged) @@ -1766,13 +2077,17 @@ def _hasDisplayableShape(self): depthChanged = Signal() depth = Property(int, depth.fget, notify=depthChanged) minDepth = Property(int, minDepth.fget, notify=depthChanged) + chunksCreatedChanged = Signal() + chunksCreated = Property(bool, lambda self: self._chunksCreated, notify=chunksCreatedChanged) chunksChanged = Signal() chunks = Property(Variant, getChunks, notify=chunksChanged) + chunkPlaceholder = Property(Variant, lambda self: self._chunkPlaceholder, notify=chunksChanged) + nbParallelizationBlocks = Property(int, lambda self: len(self._chunks) if self._chunksCreated else 0, notify=chunksChanged) sizeChanged = Signal() size = Property(int, getSize, notify=sizeChanged) globalStatusChanged = Signal() globalStatus = Property(str, lambda self: self.getGlobalStatus().name, notify=globalStatusChanged) - fusedStatus = Property(StatusData, getFusedStatus, notify=globalStatusChanged) + fusedStatus = Property(ChunkStatusData, getFusedStatus, notify=globalStatusChanged) elapsedTime = Property(float, lambda self: self.getFusedStatus().elapsedTime, notify=globalStatusChanged) recursiveElapsedTime = Property(float, lambda self: self.getRecursiveFusedStatus().elapsedTime, notify=globalStatusChanged) @@ -1781,6 +2096,7 @@ def _hasDisplayableShape(self): isInputNode = Property(bool, lambda self: self._isInputNode(), constant=True) globalExecMode = Property(str, globalExecMode.fget, notify=globalStatusChanged) + jobName = Property(str, lambda self: self._getJobName(), notify=globalStatusChanged) isExternal = Property(bool, isExtern, notify=globalStatusChanged) isComputed = Property(bool, _isComputed, notify=globalStatusChanged) isComputableType = Property(bool, _isComputableType, notify=globalStatusChanged) @@ -1903,18 +2219,62 @@ def toDict(self): 'outputs': outputs, } - def _updateChunks(self): - """ Update Node's computation task splitting into NodeChunks based on its description """ + def _resetChunks(self): + """ Set chunks on the node. + # TODO : Maybe don't delete chunks if we will recreate them as before ? + """ if isinstance(self.nodeDesc, desc.InputNode): + self._chunksCreated = True return - if self.isParallelized: + # Disconnect signals + for chunk in self._chunks: + chunk.statusChanged.disconnect(self.globalStatusChanged) + # Empty list + self._chunks.setObjectList([]) + self._chunkPlaceholder.setObjectList([]) + # Recreate list with reset values (1 chunk or the static size) + if not self.isParallelized: + if not self.nodeDesc.size: + self.setSize(1) + else: + self.setSize(self.nodeDesc.size.computeSize(self)) + self._chunks.setObjectList([NodeChunk(self, desc.Range())]) + self._chunks[0].statusChanged.connect(self.globalStatusChanged) + self._chunksCreated = True + elif isinstance(self.nodeDesc.size, desc.computation.StaticNodeSize): + self._chunksCreated = True + self.setSize(self.nodeDesc.size.computeSize(self)) + self._chunks.setObjectList([NodeChunk(self, desc.Range())]) + self._chunks[0].statusChanged.connect(self.globalStatusChanged) try: ranges = self.nodeDesc.parallelization.getRanges(self) + self._chunks.setObjectList([NodeChunk(self, range) for range in ranges]) + for c in self._chunks: + c.statusChanged.connect(self.globalStatusChanged) + logging.debug(f"Created {len(self._chunks)} chunks for node: {self.name}") + except RuntimeError: + # TODO: set node internal status to error + logging.warning(f"Invalid Parallelization on node {self._name}") + self._chunks.clear() + else: + self._chunksCreated = False + self.setSize(0) + self._chunkPlaceholder.setObjectList([NodeChunk(self, desc.computation.Range())]) + + # Create chunks when possible + self.chunksCreatedChanged.emit() + self.chunksChanged.emit() + self.globalStatusChanged.emit() + + def __createChunks(self, ranges): + if self.isParallelized: + try: if len(ranges) != len(self._chunks): self._chunks.setObjectList([NodeChunk(self, range) for range in ranges]) for c in self._chunks: c.statusChanged.connect(self.globalStatusChanged) - else: + logging.debug(f"Created {len(self._chunks)} chunks for node: {self.name}") + else: for chunk, range in zip(self._chunks, ranges): chunk.range = range except RuntimeError: @@ -1927,6 +2287,51 @@ def _updateChunks(self): self._chunks[0].statusChanged.connect(self.globalStatusChanged) else: self._chunks[0].range = desc.Range() + self._chunksCreated = True + # Update node status + # TODO : update all chunks status ? + # TODO : update node status ? + # Emit signals for UI updates + self.chunksChanged.emit() + self.chunksCreatedChanged.emit() + + def createChunksFromCache(self): + """ Create chunks when a node cache exists. """ + try: + # Get size from cache + size = self._nodeStatus.nbChunks + self.setSize(size) + ranges = self._nodeStatus.getChunkRanges() + self.__createChunks(ranges) + except Exception as e: + logging.error(f"Failed to create chunks for {self.name}") + self._chunks.clear() + self._chunksCreated = False + raise e + + def createChunks(self): + """ Create chunks when computation is about to start. """ + if self._chunksCreated: + return + if isinstance(self.nodeDesc, desc.InputNode): + self._chunksCreated = True + self.chunksChanged.emit() + return + # Grab current chunk information + logging.debug(f"Creating chunks for node: {self.name}") + try: + size = self.nodeDesc.size.computeSize(self) + self.setSize(size) + ranges = self.nodeDesc.parallelization.getRanges(self) + self.__createChunks(ranges) + except Exception as e: + logging.error(f"Failed to create chunks for {self.name}: {e}") + self._chunks.clear() + self._chunksCreated = False + raise e + # Update status + self._nodeStatus.setChunks(self._chunks) + self.upgradeStatusFile() class CompatibilityIssue(Enum): diff --git a/meshroom/core/submitter.py b/meshroom/core/submitter.py index fc9996398f..2b47ecb622 100644 --- a/meshroom/core/submitter.py +++ b/meshroom/core/submitter.py @@ -1,23 +1,241 @@ #!/usr/bin/env python +import sys import logging +import operator + +from enum import IntFlag, auto +from typing import Optional +from itertools import accumulate + +import meshroom from meshroom.common import BaseObject, Property + logger = logging.getLogger("Submitter") logger.setLevel(logging.INFO) +class SubmitterOptionsEnum(IntFlag): + RETRIEVE = auto() # Can retrieve job (read job tasks, ...) + INTERRUPT_JOB = auto() # Can interrupt + RESUME_JOB = auto() # Can resume after interruption + EDIT_TASKS = auto() # Can edit tasks + ATTACH_JOB = auto() # Can attach a job that will execute after another job + + @classmethod + def get(cls, option): + if isinstance(option, str): + # Try to cast to SubmitterOptionsEnum + option = getattr(cls, option.upper(), None) + elif isinstance(option, int): + option = cls(option) + if isinstance(option, cls): + return option + return 0 + +# SubmitterOptionsEnum.ALL = SubmitterOptionsEnum(SubmitterOptionsEnum._all_bits_) # _all_bits_ -> py 3.11 +SubmitterOptionsEnum.ALL = list(accumulate(SubmitterOptionsEnum, operator.__ior__))[-1] + + +class SubmitterOptions: + def __init__(self, *args): + self._options = 0 + for option in args: + self.addOption(option) + + def addOption(self, option): + option = SubmitterOptionsEnum.get(option) + self._options |= option + + def includes(self, option): + option = SubmitterOptionsEnum.get(option) + return self._options & option > 0 + + def __iter__(self): + for o in SubmitterOptionsEnum: + if self.includes(o): + yield(o) + + def __repr__(self): + if self._options == 0: + return f"" + if self._options == SubmitterOptionsEnum.ALL: + return f"" + return f"" + + +class BaseSubmittedJob: + """ + Interface to manipulate the job via Meshroom + """ + + def __init__(self, jobId, submitter): + self.jid = jobId + self.submitterName: str = submitter._name + self.submitterOptions: SubmitterOptions = submitter._options + + def __repr__(self): + return f"<{self.__class__.__name__} {self.jid}>" + + # Task actions + # For all methods if If iteration is -1 then it kills all the tasks for the given node + + def stopChunkTask(self, node, iteration): + """ This will kill one task. + If iteration is -1 then it kills all the tasks for the given node + """ + if self.submitterOptions.includes(SubmitterOptionsEnum.INTERRUPT_JOB): + raise NotImplementedError(f"'stopChunkTask' method must be implemented in subclasses") + else: + raise RuntimeError(f"Submitter {self.__class__.__name__} cannot interrupt the job") + + def skipChunkTask(self, node, iteration): + """ This will kill one task """ + if self.submitterOptions.includes(SubmitterOptionsEnum.INTERRUPT_JOB): + raise NotImplementedError("'skipChunkTask' method must be implemented in subclasses") + else: + raise RuntimeError(f"Submitter {self.__class__.__name__} cannot interrupt the job") + + def restartChunkTask(self, node, iteration): + """ This will kill one task """ + if self.submitterOptions.includes(SubmitterOptionsEnum.RESUME_JOB): + raise NotImplementedError("'restartChunkTask' method must be implemented in subclasses") + else: + raise RuntimeError(f"Submitter {self.__class__.__name__} cannot interrupt the job") + + # Job actions + + def pauseJob(self): + """ This will pause the job : new tasks will not be processed """ + if self.submitterOptions.includes(SubmitterOptionsEnum.INTERRUPT_JOB): + raise NotImplementedError("'pauseJob' method must be implemented in subclasses") + else: + raise RuntimeError(f"Submitter {self.__class__.__name__} cannot interrupt the job") + + def resumeJob(self): + """ This will unpause the job """ + if self.submitterOptions.includes(SubmitterOptionsEnum.RESUME_JOB): + raise NotImplementedError("'resumeJob' method must be implemented in subclasses") + else: + raise RuntimeError(f"Submitter {self.__class__.__name__} cannot interrupt the job") + + def interruptJob(self): + """ This will interrupt the job (and kill running tasks) """ + if self.submitterOptions.includes(SubmitterOptionsEnum.INTERRUPT_JOB): + raise NotImplementedError("'interruptJob' method must be implemented in subclasses") + else: + raise RuntimeError(f"Submitter {self.__class__.__name__} cannot interrupt the job") + + def restartErrorTasks(self): + if self.submitterOptions.includes(SubmitterOptionsEnum.RESUME_JOB): + raise NotImplementedError("'restartErrorTasks' method must be implemented in subclasses") + else: + raise RuntimeError(f"Submitter {self.__class__.__name__} cannot restart the job") + + +class JobManager(BaseObject): + """ Central manager for all jobs """ + + def __init__(self): + super().__init__() + self._jobs = {} # jobId -> BaseSubmittedJob + self._nodeToJob = {} # node uid -> Job + + def addJob(self, job: BaseSubmittedJob, nodes): + jid = job.jid + if jid not in self._jobs: + self._jobs[jid] = job + for node in nodes: + nodeUid = node._uid + self._nodeToJob[nodeUid] = jid + # Update the node status file to store the job ID + node.setJobId(jid, job.submitterName) + + def resetNodeJob(self, node): + node._nodeStatus.jobInfo = {} + if node._uid in self._nodeToJob: + del self._nodeToJob[node._uid] + + def getJob(self, jobId: str) -> Optional[BaseSubmittedJob]: + return self._jobs.get(jobId) + + def removeJob(self, jobId: str): + with self._lock: + if jobId in self._jobs: + del self._jobs[jobId] + + def getNodeJob(self, node): + nodeUid = node._uid + jobId = self._nodeToJob.get(nodeUid) + if jobId: + return self.getJob(jobId) + return None + + def getAllNodesUIDForJob(self, job): + return [n for n, j in self._nodeToJob.items() if j == job.jid] + + def retreiveJob(self, submitter, jid) -> Optional[BaseSubmittedJob]: + if not submitter._options.includes(SubmitterOptionsEnum.RETRIEVE): + return None + job = submitter.retrieveJob(jid) + return job + + +# Global instance that manages submitted jobs +jobManager = JobManager() + + class BaseSubmitter(BaseObject): - def __init__(self, name, parent=None): + _options: SubmitterOptions = SubmitterOptions() + _name = "" + + def __init__(self, parent=None): + if not self._name: + raise ValueError("Could not register submitter without name") super().__init__(parent) - self._name = name - logger.info(f"Registered submitter {self._name}") + logger.info(f"Registered submitter {self._name} (options={self._options})") + + @property + def name(self): + return self._name + + def createJob(self, nodes, edges, filepath, submitLabel="{projectName}"): + """ Submit the given graph + Returns: + bool: whether the submission succeeded + """ + raise NotImplementedError("'createJob' method must be implemented in subclasses") + + def createChunkTask(self, node, graphFile, **kwargs): + if self._options.includes(SubmitterOptionsEnum.RESUME_JOB): + raise NotImplementedError("'createChunkTask' method must be implemented in subclasses") + else: + raise RuntimeError(f"Submitter {self.name} cannot edit the job") - def submit(self, nodes, edges, filepath, submitLabel="{projectName}"): + def retrieveJob(self, jobId) -> BaseSubmittedJob: + raise NotImplementedError("'retrieveJob' method must be implemented in subclasses") + + def submit(self, nodes, edges, filepath, submitLabel="{projectName}") -> BaseSubmittedJob: """ Submit the given graph Returns: bool: whether the submission succeeded """ - raise NotImplementedError("'submit' method must be implemented in subclasses") + job = self.createJob(nodes, edges, filepath, submitLabel) + if not job: + # Failed to create the job + return None + return job + + @staticmethod + def killRunningJob(): + """ Sometimes farms are automatically re-trying job once in case it was + killed by a user that don't want his machine to be used. Unfortunately this + means jobs will be launched twice even if they failed for a good reason. + This function can be used to make sure the current job will not restart + Note : the ERROR_NO_RETRY itself won't do anything. This function must be + implemented on a case-by-case for each possible farm system + """ + sys.exit(meshroom.MeshroomExitStatus.ERROR_NO_RETRY) name = Property(str, lambda self: self._name, constant=True) diff --git a/meshroom/core/taskManager.py b/meshroom/core/taskManager.py index 8fb6b7cc70..4b4acb4c83 100644 --- a/meshroom/core/taskManager.py +++ b/meshroom/core/taskManager.py @@ -1,11 +1,14 @@ +import traceback import logging from threading import Thread +from PySide6.QtCore import QThread, QEventLoop, QTimer from enum import Enum import meshroom from meshroom.common import BaseObject, DictModel, Property, Signal, Slot -from meshroom.core.node import Status, Node +from meshroom.core.node import Node, Status, Node from meshroom.core.graph import Graph +from meshroom.core.submitter import jobManager, BaseSubmittedJob import meshroom.core.graph @@ -20,31 +23,73 @@ class State(Enum): ERROR = 4 -class TaskThread(Thread): +class TaskThread(QThread): """ A thread with a pile of nodes to compute """ def __init__(self, manager): - Thread.__init__(self, target=self.run) + QThread.__init__(self) self._state = State.IDLE self._manager = manager self.forceCompute = False + # Connect to manager's chunk creation handler + self.createChunksSignal.connect(manager.createChunks) def isRunning(self): return self._state == State.RUNNING + def waitForChunkCreation(self, node): + if hasattr(node, "_chunksCreated") and node._chunksCreated: + return True + + loop = QEventLoop() + + # A timer is used to make sure we don't indefinitely block the taskManager + timer = QTimer() + timer.timeout.connect(loop.quit) + timer.setSingleShot(True) + timer.start(1*60*1000) # 1 min timeout + + # Connect to completion signal + def onChunksCreated(createdNode): + if createdNode == node: + loop.quit() + + self._manager.chunksCreated.connect(onChunksCreated) + + try: + # Start the event loop - will block until signal or timeout + loop.exec() + success = hasattr(node, "_chunksCreated") and node._chunksCreated + if not success: + logging.error(f"Timeout or failure creating chunks for {node.name}") + return success + finally: + self._manager.chunksCreated.disconnect(onChunksCreated) + timer.stop() + def run(self): """ Consume compute tasks. """ self._state = State.RUNNING - stopAndRestart = False for nId, node in enumerate(self._manager._nodesToProcess): + if node not in self._manager._nodesToProcess: + # Node was removed from the processing list + continue # Skip already finished/running nodes or nodes in compatibility mode if node.isFinishedOrRunning() or node.isCompatibilityNode: continue + # Request chunk creation if not already done + if not (hasattr(node, "_chunksCreated") and node._chunksCreated): + self.createChunksSignal.emit(node) + # Wait for chunk creation to complete + if not self.waitForChunkCreation(node): + logging.error(f"Failed to create chunks for {node.name}, stopping the process") + break + # if a node does not exist anymore, node.chunks becomes a PySide property try: multiChunks = len(node.chunks) > 1 @@ -56,10 +101,16 @@ def run(self): if chunk.isFinishedOrRunning() or not self.isRunning(): continue + if self._manager.isChunkCancelled(chunk): + continue + + _nodeName, _node, _nbNodes = node.nodeType, nId+1, len(self._manager._nodesToProcess) + if multiChunks: - logging.info(f'[{nId+1}/{len(self._manager._nodesToProcess)}]({cId+1}/{len(node.chunks)}) {node.nodeType}') + _chunk, _nbChunks = cId+1, len(node.chunks) + logging.info(f"[{_node}/{_nbNodes}]({_chunk}/{_nbChunks}) {_nodeName}") else: - logging.info(f'[{nId+1}/{len(self._manager._nodesToProcess)}] {node.nodeType}') + logging.info(f"[{_node}/{_nbNodes}] {_nodeName}") try: chunk.process(self.forceCompute) except Exception as exc: @@ -89,6 +140,9 @@ def run(self): self._manager._nodesToProcess = [] self._state = State.DEAD + # Signals and properties + createChunksSignal = Signal(BaseObject) + class TaskManager(BaseObject): """ @@ -99,6 +153,7 @@ def __init__(self, parent: BaseObject = None): self._graph = None self._nodes = DictModel(keyAttrName='_name', parent=self) self._nodesToProcess = [] + self._cancelledChunks = [] self._nodesExtern = [] # internal thread in which local tasks are executed self._thread = TaskThread(self) @@ -106,6 +161,30 @@ def __init__(self, parent: BaseObject = None): self._blockRestart = False self.restartRequested.connect(self.restart) + def join(self): + self._thread.wait() + self._cancelledChunks = [] + + @Slot(BaseObject) + def createChunks(self, node: Node): + """ Create chunks on main process """ + try: + if not node._chunksCreated: + node.createChunks() + # Prepare all chunks + node.initStatusOnCompute() + self.chunksCreated.emit(node) + except Exception as e: + logging.error(f"Failed to create chunks for {node.name}: {e}") + self.chunksCreated.emit(node) # Still emit to unblock waiting thread + + def isChunkCancelled(self, chunk): + for i, ch in enumerate(self._cancelledChunks): + if ch == chunk: + del self._cancelledChunks[i] + return True + return False + def requestBlockRestart(self): """ Block computing. @@ -126,8 +205,25 @@ def blockRestart(self): self._blockRestart = False self._nodesToProcess = [] + self._cancelledChunks = [] self._thread._state = State.DEAD + @Slot() + def pauseProcess(self): + if self._thread.isRunning(): + self.join() + for node in self._nodesToProcess: + if node.getGlobalStatus() == Status.STOPPED: + # Remove node from the computing list + self.removeNode(node, displayList=False, processList=True) + + # Remove output nodes from display and computing lists + outputNodes = node.getOutputNodes(recursive=True, dependenciesOnly=True) + for n in outputNodes: + if n.getGlobalStatus() in (Status.ERROR, Status.SUBMITTED): + n.upgradeStatusTo(Status.NONE) + self.removeNode(n, displayList=True, processList=True) + @Slot() def restart(self): """ @@ -135,7 +231,8 @@ def restart(self): Note: this is done like this to avoid app freezing. """ # Make sure to wait the end of the current thread - self._thread.join() + if self._thread.isRunning(): + self.join() # Avoid restart if thread was globally stopped if self._blockRestart: @@ -171,9 +268,11 @@ def compute(self, graph: Graph = None, toNodes: list[Node] = None, forceCompute: :param forceCompute: force the computation despite nodes status. :param forceStatus: force the computation even if some nodes are submitted externally. """ + self._graph = graph self.updateNodes() + self._cancelledChunks = [] if forceCompute: nodes, edges = graph.dfsOnFinish(startNodes=toNodes) @@ -219,7 +318,7 @@ def compute(self, graph: Graph = None, toNodes: list[Node] = None, forceCompute: for node in nodes: node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name)) - node.beginSequence(forceCompute) + node.initStatusOnCompute(forceCompute) self._nodes.update(nodes) self._nodesToProcess.extend(nodes) @@ -379,7 +478,6 @@ def submit(self, graph, submitter=None, toNodes=None, submitLabel="{projectName} :param toNodes: :return: """ - # Ensure submitter is properly set sub = None if submitter: @@ -395,6 +493,8 @@ def submit(self, graph, submitter=None, toNodes=None, submitLabel="{projectName} f"Unknown Submitter called '{submitter}'. " f"Available submitters are: '{str(meshroom.core.submitters.keys())}'.") + # TODO : If possible with the submitter (ATTACH_JOB) + # Update task manager's lists self.updateNodes() graph.update() @@ -417,6 +517,14 @@ def submit(self, graph, submitter=None, toNodes=None, submitLabel="{projectName} self.checkCompatibilityNodes(graph, nodesToProcess, "SUBMITTING") # name of the context is important for QML self.checkDuplicates(nodesToProcess, "SUBMITTING") # name of the context is important for QML + # Update nodes status + for node in nodesToProcess: + node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name)) + node.initStatusOnSubmit() + jobManager.resetNodeJob(node) + + graph.updateMonitoredFiles() + flowEdges = graph.flowEdges(startNodes=toNodes) edgesToProcess = set(edgesToProcess).intersection(flowEdges) @@ -426,9 +534,12 @@ def submit(self, graph, submitter=None, toNodes=None, submitLabel="{projectName} try: res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath, submitLabel=submitLabel) if res: + if isinstance(res, BaseSubmittedJob): + jobManager.addJob(res, nodesToProcess) + else: for node in nodesToProcess: - node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name)) - node.initStatusOnSubmit() # update node status + # TODO : Notify the node that there was an issue on submit + pass self._nodes.update(nodesToProcess) self._nodesExtern.extend(nodesToProcess) @@ -436,7 +547,7 @@ def submit(self, graph, submitter=None, toNodes=None, submitLabel="{projectName} if not allReady: self.raiseDependenciesMessage("SUBMITTING") except Exception as exc: - logging.error(f"Error on submit: {exc}") + logging.error(f"Error on submit : {exc}\n{traceback.format_exc()}") def submitFromFile(self, graphFile, submitter, toNode=None, submitLabel="{projectName}"): """ @@ -460,4 +571,5 @@ def getAlreadySubmittedChunks(self, nodes): return out nodes = Property(BaseObject, lambda self: self._nodes, constant=True) + chunksCreated = Signal(BaseObject) restartRequested = Signal() diff --git a/meshroom/ui/graph.py b/meshroom/ui/graph.py index 178097d956..d1a0e5716b 100644 --- a/meshroom/ui/graph.py +++ b/meshroom/ui/graph.py @@ -8,6 +8,7 @@ from multiprocessing.pool import ThreadPool from typing import Optional, Union from collections.abc import Iterator +from collections import OrderedDict from PySide6.QtCore import ( Slot, @@ -28,6 +29,7 @@ from meshroom.core.graphIO import GraphIO from meshroom.core.taskManager import TaskManager +from meshroom.core.submitter import jobManager, SubmitterOptionsEnum from meshroom.core.node import NodeChunk, Node, Status, ExecMode, CompatibilityNode, Position from meshroom.core import submitters, MrNodeType @@ -87,6 +89,7 @@ def setFiles(self, files): Args: files: the list of files to monitor """ + logging.debug(f"FilesModTimePollerThread: Watch files {files}") with self._mutex: self._files = files @@ -129,68 +132,59 @@ def onFilePollerRefreshChanged(self, value): filePollerRefreshReady = Signal() # The refresh status has been updated and is ready to be used -class ChunksMonitor(QObject): +class NodeStatusMonitor(QObject): """ - ChunksMonitor regularly check NodeChunks' status files for modification and trigger their update on change. + NodeStatusMonitor regularly check status files for modification and trigger their update on change. When working locally, status changes are reflected through the emission of 'statusChanged' signals. But when a graph is being computed externally - either via a Submitter or on another machine, - NodeChunks status files are modified by another instance, potentially outside this machine file system scope. + Status files are modified by another instance, potentially outside this machine file system scope. Same goes when status files are deleted/modified manually. Thus, for genericity, monitoring is based on regular polling and not file system watching. """ - def __init__(self, chunks=(), parent=None): + def __init__(self, parent=None): super().__init__(parent) - self.monitorableChunks = [] - self.monitoredChunks = [] + self.monitorableNodes = [] + self.monitoredFiles = {} # Dict {filepath: node} self._filesTimePoller = FilesModTimePollerThread(parent=self) self._filesTimePoller.timesAvailable.connect(self.compareFilesTimes) self._filesTimePoller.start() - self.setChunks(chunks) - + self.setMonitored([]) self.filePollerRefreshChanged.connect(self._filesTimePoller.onFilePollerRefreshChanged) self._filesTimePoller.filePollerRefreshReady.connect(self.onFilePollerRefreshUpdated) - def setChunks(self, chunks): - """ - Set the lists of chunks that can be monitored and that are monitored. - When the file poller status is set to AUTO_ENABLED, the lists of monitorable and monitored chunks are identical. - """ - self.monitorableChunks = chunks - files, monitoredChunks = self.watchedStatusFiles - self._filesTimePoller.setFiles(files) - self.monitoredChunks = monitoredChunks + def setWatchedFiles(self): + self.monitoredItems = self.getMonitoredFiles() + monitoredFiles = list([f for f in self.monitoredItems.keys()]) + self._filesTimePoller.setFiles(monitoredFiles) + + def setMonitored(self, nodes): + self.monitorableNodes = nodes + self.setWatchedFiles() def stop(self): """ Stop the status files monitoring. """ self._filesTimePoller.stop() - @property - def statusFiles(self): - """ Get status file paths from the monitorable chunks. """ - return [c.statusFile for c in self.monitorableChunks] - - @property - def watchedStatusFiles(self): - """ - Get the status file paths from the currently monitored chunks. - Depending on the file poller status, the paths may either be those of all the current chunks, or those from the currently submitted/running chunks. - """ - - files = [] - chunks = [] - if self.filePollerRefresh is PollerRefreshStatus.AUTO_ENABLED.value: - return self.statusFiles, self.monitorableChunks - elif self.filePollerRefresh is PollerRefreshStatus.MINIMAL_ENABLED.value: - for c in self.monitorableChunks: + def getMonitoredFiles(self): + monitoredItems = OrderedDict() + for node in self.monitorableNodes: + if node._chunksCreated: + fileItems = {c.getStatusFile(): ("chunk", c) for c in node._chunks} + else: + fileItems = {node.nodeStatusFile: ("node", node)} + if self.filePollerRefresh is PollerRefreshStatus.AUTO_ENABLED.value: + # Add everything + monitoredItems.update(fileItems) + elif self.filePollerRefresh is PollerRefreshStatus.MINIMAL_ENABLED.value: # Only chunks that are run externally or local_isolated should be monitored, # when run locally, status changes are already notified. # Chunks with an ERROR status may be re-submitted externally and should thus still be monitored - if (c.isExtern() and c._status.status in (Status.SUBMITTED, Status.RUNNING, Status.ERROR)) or ( - (c.node.getMrNodeType() == MrNodeType.NODE) and (c._status.status in (Status.SUBMITTED, Status.RUNNING))): - files.append(c.statusFile) - chunks.append(c) - return files, chunks + for file, (_type, _item) in fileItems.items(): + if not _item.shouldMonitorChanges(): + continue + monitoredItems[file] = (_type, _item) + return monitoredItems def compareFilesTimes(self, times): """ @@ -200,14 +194,27 @@ def compareFilesTimes(self, times): Args: times: the last modification times for currently monitored files. """ - newRecords = dict(zip(self.monitoredChunks, times)) hasChangesAndSuccess = False - for chunk, fileModTime in newRecords.items(): - # update chunk status if last modification time has changed since previous record - if fileModTime != chunk.statusFileLastModTime: - chunk.updateStatusFromCache() - if chunk.status.status == Status.SUCCESS: - hasChangesAndSuccess = True + newRecords = dict(zip(self.monitoredItems.items(), times)) + for monitoredItem, fileModTime in newRecords.items(): + _, (_type, _item) = monitoredItem + if _type == "chunk": + chunk = _item + # update chunk status if last modification time has changed since previous record + if fileModTime != chunk.statusFileLastModTime: + chunk.updateStatusFromCache() + if chunk._status.status == Status.SUCCESS: + hasChangesAndSuccess = True + elif _type == "node": + node = _item + if fileModTime != node.nodeStatusFileLastModTime: + node.updateStatusFromCache() + # Check for success + if node.getGlobalStatus() == Status.SUCCESS: + hasChangesAndSuccess = True + elif node._chunksCreated: + # Chunks have been created -> set the watched files again + self.setWatchedFiles() if hasChangesAndSuccess: chunk.node.loadOutputAttr() @@ -219,9 +226,7 @@ def onFilePollerRefreshUpdated(self): In minimal auto-refresh mode, this includes only the chunks that are submitted or running. """ if self.filePollerRefresh is not PollerRefreshStatus.DISABLED.value: - files, chunks = self.watchedStatusFiles - self._filesTimePoller.setFiles(files) - self.monitoredChunks = chunks + self.setWatchedFiles() def onComputeStatusChanged(self): """ @@ -229,9 +234,7 @@ def onComputeStatusChanged(self): file poller status is minimal auto-refresh. """ if self.filePollerRefresh is PollerRefreshStatus.MINIMAL_ENABLED.value: - files, chunks = self.watchedStatusFiles - self._filesTimePoller.setFiles(files) - self.monitoredChunks = chunks + self.setWatchedFiles() filePollerRefreshChanged = Signal(int) filePollerRefresh = Property(int, lambda self: self._filesTimePoller.filePollerRefresh, notify=filePollerRefreshChanged) @@ -371,12 +374,13 @@ def __init__(self, undoStack: commands.UndoStack, taskManager: TaskManager, pare self._graph: Graph = Graph('', self) self._modificationCount = 0 - self._chunksMonitor: ChunksMonitor = ChunksMonitor(parent=self) + self._chunksMonitor: NodeStatusMonitor = NodeStatusMonitor(parent=self) self._computeThread: Thread = Thread() self._computingLocally = self._submitted = False self._sortedDFSChunks: QObjectListModel = QObjectListModel(parent=self) self._layout: GraphLayout = GraphLayout(self) self._selectedNode = None + self._selectedChunk = None self._nodeSelection: QItemSelectionModel = QItemSelectionModel(self._graph.nodes, parent=self) self._hoveredNode = None @@ -398,6 +402,7 @@ def setGraph(self, g): oldGraph.deleteLater() self._graph.updated.connect(self.onGraphUpdated) + self._graph.statusUpdated.connect(self.updateChunkMonitor) self._taskManager.update(self._graph) # update and connect chunks when the graph is set for the first time @@ -425,11 +430,20 @@ def onGraphUpdated(self): def updateChunks(self): dfsNodes = self._graph.dfsOnFinish(None)[0] - chunks = self._graph.getChunks(dfsNodes) - # Nothing has changed, return + chunks = [] + for node in dfsNodes: + if node._chunksCreated: + nodechunks = node.getChunks() + chunks.extend(nodechunks) + else: + chunks.extend(node.chunkPlaceholder) if self._sortedDFSChunks.objectList() == chunks: + # Nothing has changed, return return for chunk in self._sortedDFSChunks: + if chunk not in chunks: + # Chunk have been already deleted + continue chunk.statusChanged.disconnect(self.updateGraphComputingStatus) chunk.statusChanged.disconnect(self._chunksMonitor.onComputeStatusChanged) self._sortedDFSChunks.setObjectList(chunks) @@ -437,13 +451,19 @@ def updateChunks(self): chunk.statusChanged.connect(self.updateGraphComputingStatus) chunk.statusChanged.connect(self._chunksMonitor.onComputeStatusChanged) # provide ChunkMonitor with the update list of chunks - self.updateChunkMonitor(self._sortedDFSChunks) + self.updateChunkMonitor() # update graph computing status based on the new list of NodeChunks self.updateGraphComputingStatus() - def updateChunkMonitor(self, chunks): + def updateChunkMonitor(self): """ Update the list of chunks for status files monitoring. """ - self._chunksMonitor.setChunks(chunks) + nodes = set() + for node in self._graph.dfsOnFinish(None)[0]: + if not node._chunksCreated: + nodes.add(node) + for chunk in self._sortedDFSChunks: + nodes.add(chunk.node) + self._chunksMonitor.setMonitored(list(nodes)) def clear(self): if self._graph: @@ -512,7 +532,7 @@ def _saveAs(self, url, setupProjectFile=True, template=False): self._undoStack.setClean() # saving file on disk impacts cache folder location # => force re-evaluation of monitored status files paths - self.updateChunkMonitor(self._sortedDFSChunks) + self.updateChunkMonitor() @Slot() def saveAsTemp(self): @@ -542,25 +562,28 @@ def execute(self, nodes: Optional[Union[list[Node], Node]] = None): @Slot() def stopExecution(self): + self.updateChunks() if not self.isComputingLocally(): return self._taskManager.requestBlockRestart() self._graph.stopExecution() - self._taskManager._thread.join() + self._taskManager.join() @Slot(Node) def stopNodeComputation(self, node): """ Stop the computation of the node and update all the nodes depending on it. """ + self.updateChunks() if not self.isComputingLocally(): return # Stop the node and wait Task Manager node.stopComputation() - self._taskManager._thread.join() + self._taskManager.join() @Slot(Node) def cancelNodeComputation(self, node): """ Cancel the computation of the node and all the nodes depending on it. """ + self.updateChunks() if node.getGlobalStatus() == Status.SUBMITTED: # Status from SUBMITTED to NONE # Make sure to remove the nodes from the Task Manager list @@ -571,6 +594,222 @@ def cancelNodeComputation(self, node): n.clearSubmittedChunks() self._taskManager.removeNode(n, displayList=True, processList=True) + def isChunkComputingLocally(self, chunk): + # update graph computing status + computingLocally = chunk._status.execMode == ExecMode.LOCAL and \ + (sessionUid in (chunk.node._nodeStatus.submitterSessionUid, chunk._status.computeSessionUid)) and \ + (chunk._status.status in (Status.RUNNING, Status.SUBMITTED)) + return computingLocally + + def isChunkComputingExternally(self, chunk): + # Note: We do not check computeSessionUid for the submitted status, + # as the source instance of the submit has no importance. + return (chunk._status.execMode == ExecMode.EXTERN) and \ + chunk._status.status in (Status.RUNNING, Status.SUBMITTED) + + @Slot(NodeChunk) + def stopTask(self, chunk: NodeChunk): + """ Stop the selected task """ + chunk.updateStatusFromCache() + if not chunk.isAlreadySubmitted(): + return + node = chunk.node + job = jobManager.getNodeJob(node) + if job: + chunkIteration = chunk.range.iteration + try: + job.stopChunkTask(node, chunkIteration) + except Exception as e: + self.parent().showMessage(f"Failed to stop chunk {chunkIteration} of {node.label}", "error") + logging.warning(f"Error on stopTask:\n{e}") + else: + chunk.updateStatusFromCache() + chunk.upgradeStatusTo(Status.STOPPED) + # TODO : Stop depending nodes ? + self.parent().showMessage(f"Stopped chunk {chunkIteration} of {node.label}") + else: + chunk.stopProcess() + self._taskManager._cancelledChunks.append(chunk) + for chunk in node._chunks: + if chunk._status.status == Status.SUBMITTED: + chunk.stopProcess() + self._taskManager._cancelledChunks.append(chunk) + for n in node.getOutputNodes(recursive=True, dependenciesOnly=True): + n.clearSubmittedChunks() + self._taskManager.removeNode(n, displayList=True, processList=True) + + @Slot(Node) + def stopNode(self, node: Node): + """ Stop the selected task """ + job = jobManager.getNodeJob(node) + if job: + try: + job.stopChunkTask(node, -1) + except Exception as e: + self.parent().showMessage(f"Failed to stop node {node.label}", "error") + logging.warning(f"Error on stopTask:\n{e}") + else: + node.updateNodeStatusFromCache() + node.upgradeStatusTo(Status.STOPPED) + # TODO : Stop depending nodes ? + self.parent().showMessage(f"Stopped node {node.label}") + else: + self.cancelNodeComputation(node) + node.stopComputation() + + @Slot(NodeChunk) + def restartTask(self, chunk: NodeChunk): + """ Relaunch a stopped task """ + node = chunk.node + job = jobManager.getNodeJob(node) + if job: + chunkIteration = chunk.range.iteration + try: + chunk.updateStatusFromCache() + chunk.upgradeStatusTo(Status.SUBMITTED) + job.restartChunkTask(node, chunkIteration) + except Exception as e: + chunk.updateStatusFromCache() + chunk.upgradeStatusTo(Status.ERROR) + self.parent().showMessage(f"Failed to relaunch chunk {chunkIteration} of {node.label}", "error") + logging.warning(f"Error on restartTask:\n{e}") + else: + self.parent().showMessage(f"Relaunched chunk {chunkIteration} of {node.label}") + else: + # For this we would need to use a pool (with either chunks or nodes) + # instead of the list of nodes that are processed serially + self.parent().showMessage(f"Chunks cannot be launched individually locally", "warning") + if self.canComputeNode(node): + self.execute([node]) + + @Slot(NodeChunk) + def skipTask(self, chunk: NodeChunk): + """ Skip the task : the job will continue as if the task succeeded + In local mode, the chunk status will be set to success + """ + chunk.updateStatusFromCache() + node = chunk.node + chunkIteration = chunk.range.iteration + job = jobManager.getNodeJob(node) + if job: + try: + job.skipChunkTask(node, chunkIteration) + except Exception as e: + self.parent().showMessage(f"Failed to skip chunk {chunkIteration} of {node.label}", "error") + logging.warning(f"Error on skipTask:\n{e}") + else: + chunk.upgradeStatusTo(Status.SUCCESS) + self.parent().showMessage(f"Skipped chunk {chunkIteration} of {node.label}") + else: + chunk.stopProcess() + chunk.upgradeStatusTo(Status.SUCCESS) + self._taskManager._cancelledChunks.append(chunk) + self.parent().showMessage(f"Skipped chunk {chunkIteration} of {node.label}") + + @Slot(Node) + def pauseJob(self, node: Node): + """ Pause the running job : cancel all scheduled tasks. + Current task don't stop but future tasks won't be launched + """ + job = jobManager.getNodeJob(node) + if job: + try: + job.pauseJob() + except Exception as e: + logging.warning(f"Error on pauseJob:\n{e}") + self.parent().showMessage(f"Failed to pause the job for node {node}", "error") + else: + self.parent().showMessage(f"Paused node {node.label} on farm") + elif not node.isExtern(): + self.parent().showMessage(f"PauseJob is only available in external computation mode!", "warning") + else: + self.parent().showMessage(f"Cannot retrieve the job", "error") + + @Slot(Node) + def resumeJob(self, node: Node): + """ Resume the paused job + """ + job = jobManager.getNodeJob(node) + if job: + # Node is submitted to farm + try: + job.resumeJob() + except Exception as e: + self.parent().showMessage(f"Failed to resume node {node.label} on farm") + logging.warning(f"Error on resumeJob:\n{e}") + else: + self.parent().showMessage(f"Resumed the job for node {node}") + else: + # In this case user can just relaunch the node computation + # Could be implemented if we had a paused state on the task manager + # Where unprocessed nodes are retained + pass + + @Slot(Node) + def interruptJob(self, node: Node): + """ Interrupt the job that processes the node + """ + job = jobManager.getNodeJob(node) + if job: + try: + job.interruptJob() + except Exception as e: + self.parent().showMessage(f"Failed to interrupt node {node.label} on farm", "error") + logging.warning(f"Error on interruptJob:\n{e}") + else: + for chunk in self._sortedDFSChunks: + if jobManager.getNodeJob(chunk.node) == job: + if chunk._status.status in (Status.SUBMITTED, Status.RUNNING): + chunk.updateStatusFromCache() + chunk.upgradeStatusTo(Status.STOPPED) + for _node in self._graph.dfsOnFinish(None)[0]: + if jobManager.getNodeJob(_node) == job and not _node._chunksCreated and \ + _node._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING): + _node.upgradeStatusTo(Status.STOPPED) + self.parent().showMessage(f"Interrupted the job for node {node}") + elif not node.isExtern(): + for chunk in self._sortedDFSChunks: + if not chunk.isExtern() and chunk._status.status in (Status.SUBMITTED, Status.RUNNING): + chunk.updateStatusFromCache() + chunk.upgradeStatusTo(Status.STOPPED) + for node in self._graph.dfsOnFinish(None)[0]: + if not node.isExtern() and not node._chunksCreated and \ + node._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING): + node.upgradeStatusTo(Status.STOPPED) + self.stopExecution() + self.parent().showMessage(f"Stopped the local job process") + else: + self.parent().showMessage(f"Could not retrieve job for node {node}", "error") + + @Slot(Node) + def restartJobErrorTasks(self, node: Node): + """ Restart all tasks in the job that have failed + """ + job = jobManager.getNodeJob(node) + if job: + try: + # Fist update status of each chunk to submitted + for chunk in self._sortedDFSChunks: + if chunk._status.status not in (Status.ERROR, Status.STOPPED, Status.KILLED): + continue + if jobManager.getNodeJob(chunk.node) == job: + chunk.upgradeStatusTo(Status.SUBMITTED) + for node in self._graph.dfsOnFinish(None)[0]: + if not node._chunksCreated and node._nodeStatus.status in (Status.ERROR, Status.STOPPED, Status.KILLED): + node.upgradeStatusTo(Status.SUBMITTED) + job.restartErrorTasks() + job.resumeJob() + except Exception as e: + self.parent().showMessage(f"Failed to restart error tasks for node {node.label} on farm", "error") + logging.warning(f"Error on restartJobErrorTasks:\n{e}") + else: + self.parent().showMessage(f"Restarted error tasks for the node {node}") + else: + # In this case user can just relaunch the node computation + # Could be implemented if we had a paused state on the task manager + # Where error/failed nodes are retained + pass + @Slot() @Slot(Node) @Slot(list) @@ -587,18 +826,37 @@ def submit(self, nodes: Optional[Union[list[Node], Node]] = None): nodes = [nodes] if not isinstance(nodes, Iterable) and nodes else nodes mrDefaultSubmitter = os.environ.get('MESHROOM_DEFAULT_SUBMITTER', '') chosenSubmitter = self.parent()._defaultSubmitterName or mrDefaultSubmitter + self.parent().showMessage(f"Submit job on farm through {chosenSubmitter}") + self.parent().showMessage(f"Nodes to submit : {nodes}") self._taskManager.submit(self._graph, chosenSubmitter, nodes, submitLabel=self.submitLabel) def updateGraphComputingStatus(self): + dfsNodes = self._graph.dfsOnFinish(None)[0] + # TODO : these functions should go on the node part + # We should do any([node.isRunning for node in dfsNodes]) + # update graph computing status computingLocally = any([ - ch.status.execMode == ExecMode.LOCAL and - (sessionUid in (ch.status.submitterSessionUid, ch.status.sessionUid)) and ( - ch.status.status in (Status.RUNNING, Status.SUBMITTED)) - for ch in self._sortedDFSChunks]) - # Note: We do not check sessionUid for the submitted status, + ch._status.execMode == ExecMode.LOCAL and \ + (sessionUid in (ch.node._nodeStatus.submitterSessionUid, ch._status.computeSessionUid)) and \ + (ch._status.status in (Status.RUNNING, Status.SUBMITTED)) + for ch in self._sortedDFSChunks + ]) + # Note: We do not check computeSessionUid for the submitted status, # as the source instance of the submit has no importance. - submitted = any([ch.status.execMode == ExecMode.EXTERN and ch.status.status in (Status.RUNNING, Status.SUBMITTED) for ch in self._sortedDFSChunks]) + submitted = any([ch._status.execMode == ExecMode.EXTERN and ch._status.status in (Status.RUNNING, Status.SUBMITTED) for ch in self._sortedDFSChunks]) + + # Handle nodes with uninitialized chunks + for node in dfsNodes: + if node._chunksCreated: + continue + if node._nodeStatus.status in (Status.RUNNING, Status.SUBMITTED): + # TODO : save session ID in node + if node._nodeStatus.execMode == ExecMode.LOCAL: + computingLocally = True + elif node._nodeStatus.execMode == ExecMode.EXTERN: + submitted = True + if self._computingLocally != computingLocally or self._submitted != submitted: self._computingLocally = computingLocally self._submitted = submitted @@ -737,11 +995,6 @@ def alignVertically(self): for selectedNode in selectedNodes: self.moveNode(selectedNode, Position(meanX, selectedNode.y)) - @Slot() - def removeSelectedNodes(self): - """Remove selected nodes from the graph.""" - self.removeNodes(list(self.iterSelectedNodes())) - @Slot(list) def removeNodes(self, nodes: list[Node]): """ @@ -757,6 +1010,11 @@ def removeNodes(self, nodes: list[Node]): for node in nodes: self.push(commands.RemoveNodeCommand(self._graph, node)) + @Slot() + def removeSelectedNodes(self): + """Remove selected nodes from the graph.""" + self.removeNodes(list(self.iterSelectedNodes())) + @Slot(list) def removeNodesFrom(self, nodes: list[Node]): """ @@ -1244,6 +1502,9 @@ def canSubmitNode(self, node: Node) -> bool: selectedNodeChanged = Signal() # Current main selected node selectedNode = makeProperty(QObject, "_selectedNode", selectedNodeChanged, resetOnDestroy=True) + # Current chunk selected (used to send signals from TaskManager to ChunksListView) + selectedChunkChanged = Signal() + selectedChunk = makeProperty(QObject, "_selectedChunk", selectedChunkChanged, resetOnDestroy=True) nodeSelection = makeProperty(QObject, "_nodeSelection") diff --git a/meshroom/ui/qml/Application.qml b/meshroom/ui/qml/Application.qml index 0d56b1595c..e17106ccd3 100644 --- a/meshroom/ui/qml/Application.qml +++ b/meshroom/ui/qml/Application.qml @@ -1040,9 +1040,9 @@ Page { font.pointSize: 18 visible: _reconstruction ? _reconstruction.canSubmit : false - text: MaterialIcons.rocket_launch + text: !(_reconstruction.computingExternally) ? MaterialIcons.rocket_launch : MaterialIcons.paragliding - ToolTip.text: "Submit on Render Farm" + ToolTip.text: !(_reconstruction.computingExternally) ? "Submit on Render Farm" : "Interrupt Job" ToolTip.visible: hovered background: Rectangle { @@ -1050,7 +1050,7 @@ Page { border.color: Qt.darker(activePalette.window, 1.15) } - onClicked: computeManager.submit(null) + onClicked: !(_reconstruction.computingExternally) ? computeManager.submit(null) : _reconstruction.stopExecution() } } diff --git a/meshroom/ui/qml/Controls/NodeActions.qml b/meshroom/ui/qml/Controls/NodeActions.qml index 592cf930ac..691cc32067 100644 --- a/meshroom/ui/qml/Controls/NodeActions.qml +++ b/meshroom/ui/qml/Controls/NodeActions.qml @@ -18,10 +18,12 @@ Item { property var nodeRepeater: null // Reference to nodeRepeater to find delegates // Signals - signal computeRequest(var node) - signal stopComputeRequest(var node) - signal deleteDataRequest(var node) - signal submitRequest(var node) + signal computeRequest(var node) // Start local computation + signal stopComputeRequest(var node) // Stop local computation + signal deleteDataRequest(var node) // Delete node data + signal submitRequest(var node) // Start external computation (submission on farm) + signal stopSubmitRequest(var node) // Stop external computation (interrupt tasks on farm) + signal retrySubmitRequest(var node) // Retry error tasks on farm SystemPalette { id: activePalette } @@ -109,9 +111,10 @@ Item { property bool nodeIsLocked: false property bool canComputeNode: false property bool canStopNode: false - property bool canRestartNode: false + property bool canRestartNode: false // Node can be restarted, locally or externally property bool canSubmitNode: false property bool nodeSubmitted: false + property bool canRetryNode: false // Error tasks can be restarted for external node property int computeButtonState: NodeActions.ButtonState.LAUNCHABLE property string computeButtonIcon: { @@ -120,7 +123,26 @@ Item { default: return MaterialIcons.send } } + property string computeButtonTooltip: { + switch (computeButtonState) { + case NodeActions.ButtonState.STOPPABLE: return "Stop Compute" + default: return "Start Compute" + } + } + property int submitButtonState: NodeActions.ButtonState.LAUNCHABLE + property string submitButtonIcon: { + switch (submitButtonState) { + case NodeActions.ButtonState.STOPPABLE: return MaterialIcons.paragliding + default: return MaterialIcons.rocket_launch + } + } + property string submitButtonTooltip: { + switch (submitButtonState) { + case NodeActions.ButtonState.STOPPABLE: return "Interrupt Job on Render Farm" + default: return "Submit on Render Farm" + } + } function getComputeButtonState(node) { if (actionHeader.canStopNode) @@ -133,10 +155,10 @@ Item { } function getSubmitButtonState(node) { - if (actionHeader.nodeIsLocked || actionHeader.canStopNode) - return NodeActions.ButtonState.DISABLED + if (actionHeader.canStopNode) + return NodeActions.ButtonState.STOPPABLE if (!actionHeader.nodeIsLocked && node.globalStatus == "SUCCESS") - return NodeActions.ButtonState.DISABLED + return NodeActions.ButtonState.DELETABLE if (actionHeader.canSubmitNode) return NodeActions.ButtonState.LAUNCHABLE return NodeActions.ButtonState.DISABLED @@ -151,6 +173,10 @@ Item { ["ERROR", "STOPPED", "KILLED"].includes(node.globalStatus) } + function isNodeRetriable(node) { + return node.globalExecMode == "EXTERN" && ["ERROR", "STOPPED", "KILLED"].includes(node.globalStatus) + } + function updateProperties(node) { if (!node) return // Update properties values @@ -163,6 +189,7 @@ Item { actionHeader.computeButtonState = getComputeButtonState(node) actionHeader.submitButtonState = getSubmitButtonState(node) actionHeader.canRestartNode = isNodeRestartable(node) + actionHeader.canRetryNode = isNodeRetriable(node) } // Set initial state & position @@ -205,13 +232,13 @@ Item { font.pointSize: 16 text: actionHeader.computeButtonIcon padding: 6 - ToolTip.text: "Start/Stop/Restart Compute" + ToolTip.text: actionHeader.computeButtonTooltip ToolTip.visible: hovered ToolTip.delay: 1000 - visible: actionHeader.computeButtonState != NodeActions.ButtonState.DELETABLE - enabled: actionHeader.computeButtonState % 2 == 1 // Launchable & Stoppable + visible: actionHeader.computeButtonState != NodeActions.ButtonState.DISABLED + enabled: visible && !actionHeader.nodeSubmitted // Launchable & Stoppable, local // Icon color - textColor: (!enabled && actionHeader.nodeSubmitted) ? Colors.statusColors["SUBMITTED"] : (checked ? palette.highlight : palette.text) + textColor: checked ? palette.highlight : palette.text // Background color background: Rectangle { color: { @@ -228,12 +255,18 @@ Item { } onClicked: { switch (actionHeader.computeButtonState) { - case NodeActions.ButtonState.STOPPABLE: + case NodeActions.ButtonState.STOPPABLE: root.stopComputeRequest(actionHeader.selectedNode) break - case NodeActions.ButtonState.LAUNCHABLE: + case NodeActions.ButtonState.LAUNCHABLE: + root.computeRequest(actionHeader.selectedNode) + break + case NodeActions.ButtonState.DELETABLE: + root.deleteDataRequest(actionHeader.selectedNode) root.computeRequest(actionHeader.selectedNode) break + default: + break } } } @@ -244,7 +277,7 @@ Item { font.pointSize: 16 text: MaterialIcons.delete_ padding: 6 - ToolTip.text: "Delete data" + ToolTip.text: "Delete Data" ToolTip.visible: hovered ToolTip.delay: 1000 visible: actionHeader.canRestartNode || actionHeader.computeButtonState == NodeActions.ButtonState.DELETABLE @@ -265,20 +298,23 @@ Item { MaterialToolButton { id: submitButton font.pointSize: 16 - text: MaterialIcons.rocket_launch + text: actionHeader.submitButtonIcon padding: 6 - ToolTip.text: "Submit on Render Farm" + ToolTip.text: actionHeader.submitButtonTooltip ToolTip.visible: hovered ToolTip.delay: 1000 - visible: root.uigraph ? root.uigraph.canSubmit : false - enabled: actionHeader.submitButtonState != NodeActions.ButtonState.DISABLED + visible: actionHeader.submitButtonState != NodeActions.ButtonState.DISABLED + enabled: visible && (actionHeader.nodeSubmitted || !actionHeader.nodeIsLocked) // Launchable & Stoppable, external // Icon color - textColor: (!enabled && actionHeader.nodeSubmitted) ? Colors.statusColors["SUBMITTED"] : (checked ? palette.highlight : palette.text) + textColor: checked ? palette.highlight : palette.text // Background color background: Rectangle { color: { if (!submitButton.enabled) return activePalette.button + + if (actionHeader.submitButtonState == NodeActions.ButtonState.STOPPABLE) + return submitButton.hovered ? Colors.orange : Qt.darker(Colors.orange, 1.3) return submitButton.hovered ? activePalette.highlight : activePalette.button } opacity: submitButton.hovered ? 1 : root._opacity @@ -287,9 +323,49 @@ Item { radius: 3 } onClicked: { - if (actionHeader.selectedNode) { - root.submitRequest(actionHeader.selectedNode) + switch (actionHeader.submitButtonState) { + case NodeActions.ButtonState.STOPPABLE: + root.stopSubmitRequest(actionHeader.selectedNode) + break + case NodeActions.ButtonState.LAUNCHABLE: + root.submitRequest(actionHeader.selectedNode) + actionHeader.updateProperties(actionHeader.selectedNode) + break + case NodeActions.ButtonState.DELETABLE: + root.deleteDataRequest(actionHeader.selectedNode) + root.submitRequest(actionHeader.selectedNode) + break + default: + break + } + } + } + + // Retry button (for farm submissions that have failed) + MaterialToolButton { + id: retryButton + font.pointSize: 16 + text: MaterialIcons.cloud_sync + padding: 6 + ToolTip.text: "Retry Submission On Render Farm" + ToolTip.visible: hovered + ToolTip.delay: 1000 + visible: actionHeader.canRetryNode + enabled: visible + + // Background color + background: Rectangle { + color: { + return retryButton.hovered ? activePalette.highlight : activePalette.button } + opacity: retryButton.hovered ? 1 : root._opacity + border.color: retryButton.hovered ? activePalette.highlight : Qt.darker(activePalette.window, 1.3) + border.width: 1 + radius: 3 + } + + onClicked: { + root.retrySubmitRequest(actionHeader.selectedNode) } } } diff --git a/meshroom/ui/qml/GraphEditor/ChunksListView.qml b/meshroom/ui/qml/GraphEditor/ChunksListView.qml index 1afc3182b9..6ddd6d2c26 100644 --- a/meshroom/ui/qml/GraphEditor/ChunksListView.qml +++ b/meshroom/ui/qml/GraphEditor/ChunksListView.qml @@ -5,11 +5,13 @@ import QtQuick.Layouts import Utils 1.0 /** - * ChunkListView + * ChunksListView */ ColumnLayout { id: root + + property var uigraph: null property variant chunks property int currentIndex: 0 property variant currentChunk: (chunks && currentIndex >= 0) ? chunks.at(currentIndex) : undefined @@ -90,4 +92,17 @@ ColumnLayout { } } } + + Connections { + target: _reconstruction + function onSelectedChunkChanged() { + for (var i = 0; i < root.chunks.count; i++) { + if (_reconstruction.selectedChunk === root.chunks.at(i)) { + root.currentIndex = i + break; + } + } + } + ignoreUnknownSignals: true + } } diff --git a/meshroom/ui/qml/GraphEditor/GraphEditor.qml b/meshroom/ui/qml/GraphEditor/GraphEditor.qml index 2c29d962af..9d6564e8cd 100755 --- a/meshroom/ui/qml/GraphEditor/GraphEditor.qml +++ b/meshroom/ui/qml/GraphEditor/GraphEditor.qml @@ -699,18 +699,39 @@ Item { } MenuItem { text: "Stop Computation" - enabled: nodeMenu.currentNode.canBeStopped() + enabled: nodeMenu.currentNode.canBeStopped() && nodeMenu.currentNode.globalExecMode == "LOCAL" visible: enabled height: visible ? implicitHeight : 0 onTriggered: uigraph.stopNodeComputation(nodeMenu.currentNode) } MenuItem { text: "Cancel Computation" - enabled: nodeMenu.currentNode.canBeCanceled() + enabled: nodeMenu.currentNode.canBeCanceled() && nodeMenu.currentNode.globalExecMode == "LOCAL" visible: enabled height: visible ? implicitHeight : 0 onTriggered: uigraph.cancelNodeComputation(nodeMenu.currentNode) } + MenuItem { + text: "Interrupt Job" + enabled: nodeMenu.currentNode.canBeStopped() && nodeMenu.currentNode.globalExecMode == "EXTERN" + visible: enabled + height: visible ? implicitHeight : 0 + onTriggered: uigraph.stopNode(nodeMenu.currentNode) + } + MenuItem { + text: "Cancel Job" + enabled: nodeMenu.currentNode.canBeCanceled() && nodeMenu.currentNode.globalExecMode == "EXTERN" + visible: enabled + height: visible ? implicitHeight : 0 + onTriggered: uigraph.stopNode(nodeMenu.currentNode) + } + MenuItem { + text: "Retry Error Tasks" + enabled: nodeMenu.currentNode.globalExecMode == "EXTERN" && ["ERROR", "STOPPED", "KILLED"].includes(nodeMenu.currentNode.globalStatus) + visible: enabled + height: visible ? implicitHeight : 0 + onTriggered: uigraph.restartJobErrorTasks(nodeMenu.currentNode) + } MenuItem { text: "Open Folder" visible: nodeMenu.currentNode.isComputableType @@ -1093,14 +1114,24 @@ Item { uigraph.cancelNodeComputation(node) } } - + onDeleteDataRequest: function(node) { - uigraph.clearSelectedNodesData(); + uigraph.clearSelectedNodesData() } onSubmitRequest: function(node) { root.submitRequest([node]) } + + onStopSubmitRequest: function(node) { + if (node.canBeStopped() || node.canBeCanceled()) { + uigraph.stopNode(node) + } + } + + onRetrySubmitRequest: function(node) { + uigraph.restartJobErrorTasks(node) + } } MessageDialog { diff --git a/meshroom/ui/qml/GraphEditor/Node.qml b/meshroom/ui/qml/GraphEditor/Node.qml index 438aad0535..4cd88080bb 100755 --- a/meshroom/ui/qml/GraphEditor/Node.qml +++ b/meshroom/ui/qml/GraphEditor/Node.qml @@ -496,10 +496,18 @@ Item { // Node Chunks NodeChunks { visible: node.isComputableType + targetNode: node defaultColor: Colors.sysPalette.mid implicitHeight: 3 width: parent.width - model: node ? node.chunks : undefined + model: { + if (node && node.chunksCreated) + return node.chunks + else if (node && !node.chunksCreated) + return node.chunkPlaceholder + + return undefined + } Rectangle { anchors.fill: parent diff --git a/meshroom/ui/qml/GraphEditor/NodeChunks.qml b/meshroom/ui/qml/GraphEditor/NodeChunks.qml index 5505da7c38..069cf8b084 100644 --- a/meshroom/ui/qml/GraphEditor/NodeChunks.qml +++ b/meshroom/ui/qml/GraphEditor/NodeChunks.qml @@ -9,13 +9,15 @@ ListView { SystemPalette { id: activePalette } + property var targetNode: null + property color defaultColor: Qt.darker(activePalette.window, 1.1) property real chunkHeight: height - property bool modelIsBig: (3 * model.count >= width) + property int modelSize: model ? model.count : 0 + property bool modelIsBig: (3 * modelSize >= width) property real chunkWidth: { - if (!model || model.count == 0) - return 0 - return (width / model.count) - spacing + if (modelSize == 0) return 0 + return (width / modelSize) - spacing } orientation: ListView.Horizontal @@ -28,7 +30,7 @@ ListView { width: root.chunkWidth property var chunkColor: Colors.getChunkColor(object, { "NONE": root.defaultColor }) color: { - if (!highlightChunks || model.count == 1) + if (!highlightChunks || modelSize == 1) return chunkColor if (index % 2 == 0) return Qt.lighter(chunkColor, 1.1) diff --git a/meshroom/ui/qml/GraphEditor/NodeEditor.qml b/meshroom/ui/qml/GraphEditor/NodeEditor.qml index db824eebdb..9a4579cfd4 100644 --- a/meshroom/ui/qml/GraphEditor/NodeEditor.qml +++ b/meshroom/ui/qml/GraphEditor/NodeEditor.qml @@ -284,8 +284,9 @@ Panel { // The list of chunks ChunksListView { id: chunksLV - visible: (tabBar.currentIndex >= 1 && tabBar.currentIndex <= 3) - chunks: root.node.chunks + enabled: root.node ? root.node.chunksCreated : false + chunks: root.node ? root.node.chunks : null + visible: enabled && (tabBar.currentIndex >= 1 && tabBar.currentIndex <= 3) SplitView.preferredWidth: 55 SplitView.minimumWidth: 20 } diff --git a/meshroom/ui/qml/GraphEditor/TaskManager.qml b/meshroom/ui/qml/GraphEditor/TaskManager.qml index aa3bbc79aa..96ab531cb0 100644 --- a/meshroom/ui/qml/GraphEditor/TaskManager.qml +++ b/meshroom/ui/qml/GraphEditor/TaskManager.qml @@ -2,6 +2,7 @@ import QtQuick import QtQuick.Controls import QtQuick.Layouts +import MaterialIcons 2.2 import Controls 1.0 import Utils 1.0 @@ -22,10 +23,20 @@ Item { property color tableBorder: Colors.sysPalette.window property int borderWidth: 3 + // Max wifth for some columns + readonly property int maxExecWidth: 200 + + property var selectedChunk: null + function selectNode(node) { uigraph.selectedNode = node } + function selectChunk(chunk) { + root.selectedChunk = chunk + uigraph.selectedChunk = chunk + } + TextMetrics { id: nbMetrics text: root.taskManager ? root.taskManager.nodes.count : "0" @@ -51,222 +62,424 @@ Item { text: "Progress" } - ListView { - id: taskList + RowLayout { anchors.fill: parent - ScrollBar.vertical: MScrollBar {} - model: parent.taskManager ? parent.taskManager.nodes : null - spacing: 3 + ColumnLayout { + Layout.alignment: Qt.AlignLeft | Qt.AlignTop + width: childrenRect.width + spacing: 8 - headerPositioning: ListView.OverlayHeader + // TODO : enable/disable buttons depending on selectedChunk + // TODO : Also handle case where uigraph.selectedNode and selectedNode.chunksCreated==false - header: RowLayout { - height: 30 - spacing: 3 + // Task toolbar + Rectangle { + Layout.preferredWidth: 40 + Layout.preferredHeight: taskColumn.height + 8 + color: "transparent" + border.color: Colors.darkpurple + border.width: 2 + radius: 8 - width: parent.width + ColumnLayout { + id: taskColumn + anchors.centerIn: parent + spacing: 2 - z: 2 + MaterialToolButton { + ToolTip.text: "Stop Task" + Layout.alignment: Qt.AlignHCenter + enabled: selectedChunk !== null || root.uigraph.selectedNode !== null + text: MaterialIcons.stop_circle + font.pointSize: 15 + onClicked: { + if (selectedChunk !== null) { + root.uigraph.stopTask(selectedChunk) + } else { + root.uigraph.stopNode(root.uigraph.selectedNode) + } + } + } - Label { - text: qsTr("Nb") - Layout.preferredWidth: nbMetrics.width + 20 - Layout.preferredHeight: parent.height - horizontalAlignment: Label.AlignHCenter - verticalAlignment: Label.AlignVCenter - background: Rectangle { - color: headBgColor - } - } - Label { - text: qsTr("Node") - Layout.preferredWidth: 250 - Layout.preferredHeight: parent.height - horizontalAlignment: Label.AlignHCenter - verticalAlignment: Label.AlignVCenter - background: Rectangle { - color: headBgColor - } - } - Label { - text: qsTr("State") - Layout.preferredWidth: statusMetrics.width + 20 - Layout.preferredHeight: parent.height - horizontalAlignment: Label.AlignHCenter - verticalAlignment: Label.AlignVCenter - background: Rectangle { - color: headBgColor - } - } - Label { - text: qsTr("Chunks Done") - Layout.preferredWidth: chunksMetrics.width + 20 - Layout.preferredHeight: parent.height - horizontalAlignment: Label.AlignHCenter - verticalAlignment: Label.AlignVCenter - background: Rectangle { - color: headBgColor - } - } - Label { - text: qsTr("Exec Mode") - Layout.preferredWidth: execMetrics.width + 20 - Layout.preferredHeight: parent.height - horizontalAlignment: Label.AlignHCenter - verticalAlignment: Label.AlignVCenter - background: Rectangle { - color: headBgColor + MaterialToolButton { + ToolTip.text: "Restart Task" + Layout.alignment: Qt.AlignHCenter + enabled: selectedChunk !== null + text: MaterialIcons.replay_circle_filled + font.pointSize: 15 + onClicked: { + uigraph.restartTask(selectedChunk) + } + } + + MaterialToolButton { + ToolTip.text: "Skip Task" + Layout.alignment: Qt.AlignHCenter + enabled: selectedChunk !== null + text: MaterialIcons.skip_next + font.pointSize: 15 + onClicked: { + uigraph.skipTask(selectedChunk) + } + } + + Item { + Layout.preferredWidth: 40 + Layout.preferredHeight: 50 + + Text { + text: "TASK" + anchors.centerIn: parent + color: Colors.sysPalette.text + font.pixelSize: 11 + font.bold: true + rotation: -90 + transformOrigin: Item.Center + } + } } } - Label { - text: qsTr("Progress") - Layout.fillWidth: true - Layout.minimumWidth: progressMetrics.width + 20 - Layout.preferredHeight: parent.height - horizontalAlignment: Label.AlignHCenter - verticalAlignment: Label.AlignVCenter - background: Rectangle { - color: headBgColor + + // Job toolbar + Rectangle { + Layout.preferredWidth: 40 + Layout.preferredHeight: jobColumn.height + 8 + color: "transparent" + border.color: Colors.darkpurple + border.width: 2 + radius: 8 + + ColumnLayout { + id: jobColumn + anchors.centerIn: parent + spacing: 2 + + MaterialToolButton { + ToolTip.text: "Pause Job" + Layout.alignment: Qt.AlignHCenter + enabled: root.uigraph.selectedNode !== null + text: MaterialIcons.pause_circle_filled + font.pointSize: 15 + onClicked: { + uigraph.pauseJob(uigraph.selectedNode) + } + } + + MaterialToolButton { + ToolTip.text: "Resume Job" + Layout.alignment: Qt.AlignHCenter + enabled: root.uigraph.selectedNode !== null + text: MaterialIcons.play_circle_filled + font.pointSize: 15 + onClicked: { + uigraph.resumeJob(uigraph.selectedNode) + } + } + + MaterialToolButton { + ToolTip.text: "Interrupt Job" + Layout.alignment: Qt.AlignHCenter + enabled: root.uigraph.selectedNode !== null + text: MaterialIcons.stop_circle + font.pointSize: 15 + onClicked: { + uigraph.interruptJob(uigraph.selectedNode) + } + } + + MaterialToolButton { + ToolTip.text: "Restart All Error Tasks" + Layout.alignment: Qt.AlignHCenter + enabled: root.uigraph.selectedNode !== null + text: MaterialIcons.replay_circle_filled + font.pointSize: 15 + onClicked: { + uigraph.restartJobErrorTasks(uigraph.selectedNode) + } + } + + Item { + Layout.preferredWidth: 40 + Layout.preferredHeight: 40 + + Text { + text: "JOB" + anchors.centerIn: parent + color: Colors.sysPalette.text + font.pixelSize: 11 + font.bold: true + rotation: -90 + transformOrigin: Item.Center + } + } } } } - delegate: RowLayout { - width: ListView.view.width - height: 18 + ListView { + id: taskList + Layout.alignment: Qt.AlignLeft | Qt.AlignTop + Layout.fillWidth: true + Layout.fillHeight: true + ScrollBar.vertical: MScrollBar {} + + model: root.taskManager ? root.taskManager.nodes : null spacing: 3 - function getNbFinishedChunks(chunks) { - var nbSuccess = 0 - for (var i = 0; i < chunks.count; i++) { - if (chunks.at(i).statusName === "SUCCESS") { - nbSuccess += 1 + headerPositioning: ListView.OverlayHeader + + header: RowLayout { + height: 30 + spacing: 3 + + width: parent.width + + z: 2 + + Label { + text: qsTr("Nb") + Layout.preferredWidth: nbMetrics.width + 20 + Layout.preferredHeight: parent.height + horizontalAlignment: Label.AlignHCenter + verticalAlignment: Label.AlignVCenter + background: Rectangle { + color: headBgColor } } - return nbSuccess - } - - Label { - text: index + 1 - Layout.preferredWidth: nbMetrics.width + 20 - Layout.preferredHeight: parent.height - horizontalAlignment: Label.AlignHCenter - verticalAlignment: Label.AlignVCenter - color: object === uigraph.selectedNode ? Colors.sysPalette.window : Colors.sysPalette.text - background: Rectangle { - color: object === uigraph.selectedNode ? Colors.sysPalette.text : bgColor + Label { + text: qsTr("Node") + Layout.preferredWidth: 200 + Layout.preferredHeight: parent.height + horizontalAlignment: Label.AlignHCenter + verticalAlignment: Label.AlignVCenter + background: Rectangle { + color: headBgColor + } } - - MouseArea { - anchors.fill: parent - onPressed: { - selectNode(object) + Label { + text: qsTr("State") + Layout.preferredWidth: statusMetrics.width + 20 + Layout.preferredHeight: parent.height + horizontalAlignment: Label.AlignHCenter + verticalAlignment: Label.AlignVCenter + background: Rectangle { + color: headBgColor } } - } - Label { - text: object.label - Layout.preferredWidth: 250 - Layout.preferredHeight: parent.height - horizontalAlignment: Label.AlignHCenter - verticalAlignment: Label.AlignVCenter - color: object === uigraph.selectedNode ? Colors.sysPalette.window : Colors.sysPalette.text - background: Rectangle { - color: object === uigraph.selectedNode ? Colors.sysPalette.text : bgColor + Label { + text: qsTr("Chunks Done") + Layout.preferredWidth: chunksMetrics.width + 20 + Layout.preferredHeight: parent.height + horizontalAlignment: Label.AlignHCenter + verticalAlignment: Label.AlignVCenter + background: Rectangle { + color: headBgColor + } } - - MouseArea { - anchors.fill: parent - onPressed: { - selectNode(object) + Label { + text: qsTr("Exec Mode") + Layout.preferredWidth: execMetrics.width + 60 + Layout.preferredHeight: parent.height + horizontalAlignment: Label.AlignHCenter + verticalAlignment: Label.AlignVCenter + background: Rectangle { + color: headBgColor + } + } + Label { + text: qsTr("Progress") + Layout.fillWidth: true + Layout.minimumWidth: progressMetrics.width + 20 + Layout.preferredHeight: parent.height + horizontalAlignment: Label.AlignHCenter + verticalAlignment: Label.AlignVCenter + background: Rectangle { + color: headBgColor } } } - Label { - text: object.globalStatus - Layout.preferredWidth: statusMetrics.width + 20 - Layout.preferredHeight: parent.height - horizontalAlignment: Label.AlignHCenter - verticalAlignment: Label.AlignVCenter - color: object === uigraph.selectedNode ? Colors.sysPalette.window : Colors.sysPalette.text - background: Rectangle { - color: object === uigraph.selectedNode ? Colors.sysPalette.text : bgColor + + delegate: RowLayout { + width: ListView.view.width + height: 18 + spacing: 3 + + function getNbFinishedChunks(chunks) { + var nbSuccess = 0 + for (var i = 0; i < chunks.count; i++) { + if (chunks.at(i).statusName === "SUCCESS") { + nbSuccess += 1 + } + } + return nbSuccess } - MouseArea { - anchors.fill: parent - onPressed: { - selectNode(object) + Label { + text: index + 1 + Layout.preferredWidth: nbMetrics.width + 20 + Layout.preferredHeight: parent.height + horizontalAlignment: Label.AlignHCenter + verticalAlignment: Label.AlignVCenter + color: object === uigraph.selectedNode ? Colors.sysPalette.window : Colors.sysPalette.text + background: Rectangle { + color: object === uigraph.selectedNode ? Colors.sysPalette.text : bgColor + } + + MouseArea { + anchors.fill: parent + onPressed: { + selectNode(object) + } } } - } - Label { - text: getNbFinishedChunks(object.chunks) + "/" + object.chunks.count - Layout.preferredWidth: chunksMetrics.width + 20 - Layout.preferredHeight: parent.height - horizontalAlignment: Label.AlignHCenter - verticalAlignment: Label.AlignVCenter - color: object === uigraph.selectedNode ? Colors.sysPalette.window : Colors.sysPalette.text - background: Rectangle { - color: object === uigraph.selectedNode ? Colors.sysPalette.text : bgColor + Label { + text: object.label + elide: Text.ElideRight + Layout.preferredWidth: 200 + Layout.preferredHeight: parent.height + horizontalAlignment: Label.AlignHCenter + verticalAlignment: Label.AlignVCenter + color: object === uigraph.selectedNode ? Colors.sysPalette.window : Colors.sysPalette.text + background: Rectangle { + color: object === uigraph.selectedNode ? Colors.sysPalette.text : bgColor + } + + MouseArea { + anchors.fill: parent + acceptedButtons: Qt.LeftButton | Qt.RightButton + onPressed: (mouse) => { + if (mouse.button === Qt.LeftButton) { + selectNode(object) + } else if (mouse.button === Qt.RightButton) { + contextMenu.popup() + } + } + Menu { + id: contextMenu + MenuItem { + text: "Open Folder" + height: visible ? implicitHeight : 0 + onTriggered: Qt.openUrlExternally(Filepath.stringToUrl(object.internalFolder)) + } + } + } } + Label { + text: object.globalStatus + Layout.preferredWidth: statusMetrics.width + 20 + Layout.preferredHeight: parent.height + horizontalAlignment: Label.AlignHCenter + verticalAlignment: Label.AlignVCenter + color: object === uigraph.selectedNode ? Colors.sysPalette.window : Colors.sysPalette.text + background: Rectangle { + color: object === uigraph.selectedNode ? Colors.sysPalette.text : bgColor + } - MouseArea { - anchors.fill: parent - onPressed: { - selectNode(object) + MouseArea { + anchors.fill: parent + onPressed: { + selectNode(object) + } } } - } - Label { - text: object.globalExecMode - Layout.preferredWidth: execMetrics.width + 20 - Layout.preferredHeight: parent.height - horizontalAlignment: Label.AlignHCenter - verticalAlignment: Label.AlignVCenter - color: object === uigraph.selectedNode ? Colors.sysPalette.window : Colors.sysPalette.text - background: Rectangle { - color: object === uigraph.selectedNode ? Colors.sysPalette.text : bgColor + Label { + text: getNbFinishedChunks(object.chunks) + "/" + object.chunks.count + Layout.preferredWidth: chunksMetrics.width + 20 + Layout.preferredHeight: parent.height + horizontalAlignment: Label.AlignHCenter + verticalAlignment: Label.AlignVCenter + color: object === uigraph.selectedNode ? Colors.sysPalette.window : Colors.sysPalette.text + background: Rectangle { + color: object === uigraph.selectedNode ? Colors.sysPalette.text : bgColor + } + + MouseArea { + anchors.fill: parent + onPressed: { + selectNode(object) + } + } } + Label { + text: object.jobName + elide: Text.ElideRight + Layout.preferredWidth: execMetrics.width + 60 + Layout.preferredHeight: parent.height + horizontalAlignment: Label.AlignHCenter + verticalAlignment: Label.AlignVCenter + color: object === uigraph.selectedNode ? Colors.sysPalette.window : Colors.sysPalette.text + background: Rectangle { + color: object === uigraph.selectedNode ? Colors.sysPalette.text : bgColor + } - MouseArea { - anchors.fill: parent - onPressed: { - selectNode(object) + MouseArea { + anchors.fill: parent + onPressed: { + selectNode(object) + } } } - } - Item { - Layout.fillWidth: true - Layout.minimumWidth: progressMetrics.width + 20 - Layout.preferredHeight: parent.height - - ListView { - id: chunkList - width: parent.width - height: parent.height - orientation: ListView.Horizontal - model: object.chunks - property var node: object - - spacing: 3 - - delegate: Label { - width: ListView.view.model ? (ListView.view.width / ListView.view.model.count) - 3 : 0 - height: ListView.view.height - anchors.verticalCenter: parent.verticalCenter - background: Rectangle { - color: Colors.getChunkColor(object, {"NONE": bgColor}) - radius: 3 - border.width: 2 - border.color: chunkList.node === uigraph.selectedNode ? Colors.sysPalette.text : Colors.getChunkColor(object, {"NONE": bgColor}) + Item { + Layout.fillWidth: true + Layout.minimumWidth: progressMetrics.width + 20 + Layout.preferredHeight: parent.height + + ListView { + id: chunkList + width: parent.width + height: parent.height + orientation: ListView.Horizontal + model: object.chunks + property var node: object + + spacing: 3 + + delegate: Loader { + id: chunkDelegate + width: ListView.view.model + ? (ListView.view.width - (ListView.view.model.count - 1) * chunkList.spacing) / ListView.view.model.count + : 0 + + height: ListView.view.height + + sourceComponent: Label { + anchors.fill: parent + background: Rectangle { + color: Colors.getChunkColor(object, {"NONE": bgColor}) + radius: 3 + border.width: 2 + border.color: (root.selectedChunk == object) ? Qt.darker(color, 1.3) : "transparent" + } + + MouseArea { + anchors.fill: parent + onPressed: { + selectNode(chunkList.node) + selectChunk(object) + } + } + } } - MouseArea { + // Placeholder for uninitialized chunks + Label { + enabled: chunkList.model.count == 0 + visible: enabled anchors.fill: parent - onPressed: { - selectNode(chunkList.node) + background: Rectangle { + color: Colors.getNodeColor(chunkList.node, {"NONE": Colors.darkpurple}) + radius: 3 + border.width: 2 + border.color: (chunkList.node === uigraph.selectedNode) ? Qt.lighter(color, 1.3) : "transparent" + } + + MouseArea { + anchors.fill: parent + onPressed: { + selectNode(chunkList.node) + selectChunk(null) + } } } } diff --git a/meshroom/ui/qml/Utils/Colors.qml b/meshroom/ui/qml/Utils/Colors.qml index af27c91ce5..749ce63dfa 100644 --- a/meshroom/ui/qml/Utils/Colors.qml +++ b/meshroom/ui/qml/Utils/Colors.qml @@ -20,6 +20,7 @@ QtObject { readonly property color lime: "#CDDC39" readonly property color grey: "#555555" readonly property color lightgrey: "#999999" + readonly property color darkpurple: "#5c4885" readonly property var statusColors: { "NONE": "transparent", @@ -27,7 +28,8 @@ QtObject { "RUNNING": orange, "ERROR": red, "SUCCESS": green, - "STOPPED": pink + "STOPPED": pink, + "INPUT": "transparent" } readonly property var ghostColors: { @@ -62,6 +64,22 @@ QtObject { console.warn("Unknown status : " + chunk.status) return "magenta" } + + function getNodeColor(node, overrides) { + if (node === undefined) + return "transparent" + if (overrides && node.globalStatus in overrides) { + return overrides[node.globalStatus] + } else if (node.globalExecMode === "EXTERN" && node.globalStatus in statusColorsExternOverrides) { + return statusColorsExternOverrides[node.globalStatus] + } else if (node.name !== node.nodeStatusNodeName && node.globalStatus in ghostColors) { + return ghostColors[node.globalStatus] + } else if (node.globalStatus in statusColors) { + return statusColors[node.globalStatus] + } + console.warn("Unknown status : " + node.globalStatus) + return "magenta" + } function toRgb(color) { return [ diff --git a/meshroom/ui/reconstruction.py b/meshroom/ui/reconstruction.py index 3448e1611c..08466934e0 100755 --- a/meshroom/ui/reconstruction.py +++ b/meshroom/ui/reconstruction.py @@ -1031,7 +1031,8 @@ def _setSfm(self, node): # disconnection step in 'setSfm' (at this point, 'self._sfm' underlying object # has been destroyed and can't be evaluated anymore) self._sfm.destroyed.connect(self._unsetSfm) - self._sfm.chunks[0].statusChanged.connect(self.updateSfMResults) + if len(self._sfm._chunks) > 0: + self._sfm.chunks[0].statusChanged.connect(self.updateSfMResults) self.sfmChanged.emit() def setSfm(self, node):