diff --git a/meshroom/tractorSubmitter/api/__init__.py b/meshroom/tractorSubmitter/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/meshroom/tractorSubmitter/api/base.py b/meshroom/tractorSubmitter/api/base.py index 11ce377..f5a1a54 100644 --- a/meshroom/tractorSubmitter/api/base.py +++ b/meshroom/tractorSubmitter/api/base.py @@ -204,28 +204,30 @@ def cook(self): class TaskInfo: def __init__(self, name, cmdArgs, nodeUid, cacheFolder="", - environment=None, rezPackages=None, - service=None, licenses=None, tags=None, - expandingTask=False, chunkParams=None): + environment=None, reqPackages=None, service=None, + licenses=None, taskType=None, tags=None): self.name = name self.uid = nodeUid self.taskCommandArgs = cmdArgs # Env self.environment = environment or {} - # Rez packages - self.rezPackages = rezPackages or [] + # Requested packages + self.reqPackages = reqPackages or [] # self.limits self.service = service or os.environ.get("DEFAULT_TRACTOR_SERVICE", "") self.limits = self.getLimits(licenses) # Tags self.tags = tags or {} self.tags["nodeUid"] = nodeUid + # Expanding / Chunks - self.expandingTask = expandingTask - # self.expandingFile = self._setExpandingTaskFile(cacheFolder) - self.chunks = [] - if not expandingTask: - self.chunks = self.getChunks(chunkParams) + taskType_, iteration_ = taskType or ("placeholder", None) + self.placeholderTask = (taskType_=="placeholder") + self.expandingTask = (taskType_=="expanding") + self.preprocessTask = (taskType_=="preprocess") + self.postprocessTask = (taskType_=="postprocess") + self.chunkTask = (taskType_=="chunk") + self.iteration = iteration_ @staticmethod def getLimits(licenses=None): @@ -235,21 +237,6 @@ def getLimits(licenses=None): taskLimits.append(os.environ['DEFAULT_TRACTOR_LIMIT']) return taskLimits - @staticmethod - def getChunks(chunkParams) -> list[Chunk]: - """ Get list of chunks """ - it = None - ignoreIterations = chunkParams.get("ignoreIterations", []) - if chunkParams: - start, end = chunkParams.get("start", -1), chunkParams.get("end", -2) - size = chunkParams.get("packetSize", 1) - frameRange = list(range(start, end+1, 1)) - if frameRange: - slices = [frameRange[i:i + size] for i in range(0, len(frameRange), size)] - it = [Chunk(i, item[0], item[-1]) for i, item in enumerate(slices) - if i not in ignoreIterations] - return it - def _setExpandingTaskFile(self, cacheFolder): """ Doesn't work with current python API ! It should be possible starting Tractor 1.7 to give a file path to cmd.expand @@ -273,62 +260,41 @@ def _setExpandingTaskFile(self, cacheFolder): def envkey(self): return toTractorEnv(self.environment) - def getExpandWrappedCmd(self): - cmd = self.taskCommandArgs - # Wrap with create_chunks - cmd = f"meshroom_createChunks --submitter Tractor {cmd}" - # Wrap with rez - cmd = rezWrapCommand(cmd, otherRezPkg=self.rezPackages) - # Wrap with tractor wrapper (will redirect stdout to stderr) - # to make sure stdout only has the - wrapperModule = "tractorSubtaskWrapper.py" - wrapperPath = os.path.join(os.environ["MR_SUBMITTERS_SCRITPS"], wrapperModule) - cmd = f"{sys.executable} {wrapperPath} {cmd}" - return cmd - def cook(self): + title = f"{self.name}" + tags = self.tags + cmd = self.taskCommandArgs + if self.preprocessTask: + cmd += f" --preprocess" + title += "_preprocess" + tags["iteration"] = "preprocess" + elif self.postprocessTask: + cmd += f" --postprocess" + title += "_postprocess" + tags["iteration"] = "postprocess" + elif self.chunkTask: + if self.iteration >= 0: + title += f"_{self.iteration}" + else: + title += f"_0" + cmd += f" --iteration {self.iteration}" + tags["iteration"] = self.iteration + if self.expandingTask: - # Chunks are not created yet so we use the wrapper and the task will expand itself - cmd = self.getExpandWrappedCmd() - - elif self.chunks: - # Empty task with multiple commands (sub-tasks) to execute in parallel + cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages) + # Wrap with tractor wrapper (will redirect stdout to stderr) + # to make sure stdout only has the + wrapperModule = "tractorExpander.py" + wrapperPath = os.path.join(os.environ["MR_SUBMITTERS_SCRITPS"], wrapperModule) + cmd = f"{sys.executable} {wrapperPath} {cmd}" + elif self.placeholderTask: cmd = None else: - # Simple task with only one command to execute - cmd = f"meshroom_compute {self.taskCommandArgs}" - cmd = rezWrapCommand(cmd, otherRezPkg=self.rezPackages) + cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages) + return { - "title": self.name, + "title": title, "argv": shlex.split(cmd) if cmd else cmd, "service": self.service, "metadata": json.dumps(self.tags) } - - -class ChunkTaskInfo: - """ - In the case where chunks are already created, and that there are multiple chunks - we will create the chunks from the submitter process. - Here the taskInfo corresponds to the task for the node, and we create an instance of - ChunkTaskInfo per chunk that handles generating information for the chunk task - """ - def __init__(self, taskInfo, chunk): - self.taskInfo: TaskInfo = taskInfo - self.chunk: Chunk = chunk - - def cook(self): - title = f"{self.taskInfo.name}_{self.chunk.start}_{self.chunk.end}" - # Update cmd - cmd = f"meshroom_compute {self.taskInfo.taskCommandArgs}" - cmd = f"{cmd} --iteration {self.chunk.iteration}" - cmd = rezWrapCommand(cmd, otherRezPkg=self.taskInfo.rezPackages) - # Update tags - chunkTags = self.taskInfo.tags.copy() - chunkTags["iteration"] = self.chunk.iteration - return { - "title": title, - "argv": shlex.split(cmd), # Never None - "service": self.taskInfo.service, - "metadata": json.dumps(chunkTags), - } diff --git a/meshroom/tractorSubmitter/api/subtaskCreator.py b/meshroom/tractorSubmitter/api/subtaskCreator.py index 806c64e..e4d4b4d 100644 --- a/meshroom/tractorSubmitter/api/subtaskCreator.py +++ b/meshroom/tractorSubmitter/api/subtaskCreator.py @@ -4,7 +4,7 @@ Helper functions to create subtasks Provides queueSubtask() to write Tractor subtask definitions to stdout. -Works with tractorSubtaskWrapper.py to ensure proper stream handling. +Works with tractorExpander.py to ensure proper stream handling. Example : >>> from tractorSubmitter.api.subtaskCreator import queueSubtask @@ -17,7 +17,7 @@ import os import json import shlex -from tractorSubmitter.api.base import TaskInfo, ChunkTaskInfo +from tractorSubmitter.api.base import TaskInfo # Original stdout file descriptor @@ -38,16 +38,16 @@ def _getCachedSubtaskStdout(): """ global _stdout if _stdout is None: - if 'TRACTOR_SUBTASK_STDOUT_FD' in os.environ: + if 'TRACTOR_STDOUT_FD' in os.environ: try: - fd = int(os.environ['TRACTOR_SUBTASK_STDOUT_FD']) + fd = int(os.environ['TRACTOR_STDOUT_FD']) # Open the file descriptor for writing _stdout = os.fdopen(fd, 'w', buffering=1) except (ValueError, OSError): - raise RuntimeError("(_getCachedSubtaskStdout) Could not open TRACTOR_SUBTASK_STDOUT_FD") + raise RuntimeError("(_getCachedSubtaskStdout) Could not open TRACTOR_STDOUT_FD") log(f"(_getCachedSubtaskStdout) stdout={_stdout}") else: - raise FileNotFoundError("(_getCachedSubtaskStdout) Could not find TRACTOR_SUBTASK_STDOUT_FD") + raise FileNotFoundError("(_getCachedSubtaskStdout) Could not find TRACTOR_STDOUT_FD") return _stdout @@ -144,30 +144,44 @@ def queueSubtask(title, argv, service="", limits=None, metadata=None, envkey=Non log(f"Queued subtask: {title}") -def queueChunkTask(node, cmdArgs, service, tags=None, rezPackages=None, environment=None): - chunkRangeParams = None +def getChunks(chunkParams): + it = None + ignoreIterations = chunkParams.get("ignoreIterations", []) + if chunkParams: + start, end = chunkParams.get("start", -1), chunkParams.get("end", -2) + size = 1 + frameRange = list(range(start, end+1, 1)) + if frameRange: + it = [ + Chunk(i, ) + ] + slices = [frameRange[i : i+1] for i in range(0, len(frameRange))] + it = [Chunk(i, item[0], item[-1]) for i, item in enumerate(slices) + if i not in ignoreIterations] + return it + + +def queueChunkTask(node, cmdArgs, service, tags=None, reqPackages=None, environment=None): blockSize, fullSize, nbBlocks = node.nodeDesc.parallelization.getSizes(node) if nbBlocks <= 0: return - chunkRangeParams = {'start': 0, 'end': nbBlocks - 1, 'step': 1} licenses = node.nodeDesc._licenses - taskInfo = TaskInfo( - node.name, - cmdArgs, - nodeUid=node._uid, - environment=environment, - rezPackages=rezPackages, - service=service, - licenses=licenses, - tags=tags.copy() if tags else None, - expandingTask=False, - chunkParams=chunkRangeParams - ) - for chunk in TaskInfo.getChunks(chunkRangeParams): - chunkInfo = ChunkTaskInfo(taskInfo, chunk) + + for iteration in range(nbBlocks): + taskInfo = TaskInfo( + name=node.name, + cmdArgs=cmdArgs, + nodeUid=node._uid, + environment=environment, + reqPackages=reqPackages, + service=service, + licenses=licenses, + taskType=("chunk", iteration), + tags=tags.copy() if tags else None, + ) # title, argv, service, metadata - chunkParams = chunkInfo.cook() + taskArgs = taskInfo.cook() # limits, envkey - chunkParams['limits'] = taskInfo.limits - chunkParams['envkey'] = taskInfo.envkey - queueSubtask(**chunkParams) + taskArgs['limits'] = taskInfo.limits + taskArgs['envkey'] = taskInfo.envkey + queueSubtask(**taskArgs) diff --git a/meshroom/tractorSubmitter/api/tractorJobCreation.py b/meshroom/tractorSubmitter/api/tractorJobCreation.py deleted file mode 100644 index 99e27b4..0000000 --- a/meshroom/tractorSubmitter/api/tractorJobCreation.py +++ /dev/null @@ -1,199 +0,0 @@ -#!/usr/bin/env python - -import logging - -from tractorSubmitter.api.base import Chunk -from tractorSubmitter.api.base import TRACTOR_JOB_URL, PRIORITY_DICT -from tractorSubmitter.api.base import toTractorEnv -from tractorSubmitter.api.base import TaskInfo, ChunkTaskInfo, JobInfo - -from tractor.api import author as tractorAuthor - - -class TractorTask: - """ Stores a task and the additional tasks spawned for each chunks - Will be helpful later to resubmit only failed chunks for example - """ - - def __init__(self, task): - self.task: tractorAuthor.Task = task - self.chunkTasks: dict[Chunk, tractorAuthor.Task] = {} - - def addChunkTask(self, chunk: Chunk, task: tractorAuthor.Task): - self.chunkTasks[chunk] = task - - -def cookTractorTask(taskInfo: TaskInfo) -> TractorTask: - """ - Cook a tractor task depending on taskInfo - Returns a TractorTask object with the tractor task, and chunk tasks - - # TODO : there is only one command for each task so looping over .cmds seems useless - """ - taskKwargs = taskInfo.cook() - tractorTask = tractorAuthor.Task(**taskKwargs) - res = TractorTask(tractorTask) - if taskInfo.chunks: - for chk in taskInfo.chunks: - chunkTaskKwargs = ChunkTaskInfo(taskInfo, chk).cook() - chunkTractorTask = tractorTask.newTask(**chunkTaskKwargs) - for cmd in chunkTractorTask.cmds: - cmd.tags = taskInfo.limits - cmd.envkey = taskInfo.envkey - res.addChunkTask(chk, chunkTractorTask) - else: - for cmd in tractorTask.cmds: - cmd.tags = taskInfo.limits - cmd.envkey = taskInfo.envkey - cmd.expand = taskInfo.expandingTask - # if taskInfo.expandingTask: - # cmd.expand = taskInfo.expandingFile - return res - - -class Task: - """ Object that represent a Node in meshroom that has been submitted to the farm. - Each node should be created as a Task in the tractor submitter. - However one Task object can spool multiple tractor task because we will create individual - tasks for chunks. - """ - - def __init__(self, taskInfo: TaskInfo): - self.taskInfo = taskInfo - self._children = set() - self._parents = set() - - def __repr__(self): - return f"" - - def __hash__(self): - return hash(frozenset(["TractorTask", self.taskInfo.name, self.taskInfo.uid])) - - def __eq__(self, __value: object) -> bool: - return hash(self) == hash(__value) - - def addChild(self, task): - """ Add a task in the children of the current task - """ - if isinstance(task, (tuple, list)): - for t in task: - self.addChild(t) - else: - self._children.add(task) # Add task as current object children - task._parents.add(self) # Add current object as task parent - - -class TaskGraph: - """ Graph with Task objects - The point of this class is to delegate task creation and to make sure we - don't create multiple times the same task. - Also we store the created tasks and chunks info so that might be useful in the future - """ - - def __init__(self, job): - self.job = job - self._tasks = set() - self.__cooked = {} - - def __len__(self): - return len(self._tasks) - - @property - def roots(self): - return [task for task in self._tasks if not task._parents] - - @property - def leaves(self): - return [task for task in self._tasks if not task._children] - - def cookTask(self, task: Task): - """ Cook task, chunk tasks, and set tasks dependencies """ - if task.taskInfo.uid not in self.__cooked: - logging.info(f"TractorSubmitter: Create Tractor Task: {task.taskInfo.name}") - tractorTask = cookTractorTask(task.taskInfo) - self.__cooked[task.taskInfo.uid] = tractorTask - for child in task._children: - childTask = self.cookTask(child) - if tractorTask.chunkTasks: - for chkTask in tractorTask.chunkTasks.values(): - chkTask.addChild(childTask) - else: - tractorTask.task.addChild(childTask) - return self.__cooked[task.taskInfo.uid].task - - def cook(self, jobTask): - """ Cook the graph (i.e. create all tractor tasks) and dependencies - jobTask is the root task for the whole job - """ - for task in self.roots: - child = self.cookTask(task) - jobTask.addChild(child) - - -class Job: - def __init__(self, name, tags=None, requirements=None, environment=None, user=None, comment="", paused=False): - self.jobInfo = JobInfo( - name, - share="", - service=requirements, - environment=environment, - tags=tags, - user=user, - comment=comment, - paused=paused - ) - self._graph = TaskGraph(self) - - def createTask(self, name, commandArgs, uid, nodeCache="", tags=None, rezPackages=None, service=None, - licenses=None, expandingTask=False, chunkParams=None) -> Task: - """ Add task and make sure it is unique """ - taskInfo = TaskInfo( - name=name, - cmdArgs=commandArgs, - nodeUid=uid, - cacheFolder=nodeCache, - rezPackages=rezPackages, - service=service, - licenses=licenses, - tags=tags.copy() if tags else None, - expandingTask=expandingTask, - chunkParams=chunkParams - ) - task = Task(taskInfo) - # Dont add the task if it has already been created - for t in self._graph._tasks: - if t == task: - logging.error(f"TractorSubmitter: Task already created : {t}") - return t - self._graph._tasks.add(task) - return task - - def cook(self): - """ Cook job and tasks graph """ - # Create job - tractorJob = tractorAuthor.Job(**self.jobInfo.cook()) - serialsubtasks = (len(self._graph.leaves) == 1) - # Create the job task (no command, at the graph root) - jobTask = tractorJob.newTask(title=self.jobInfo.name, argv=None, serialsubtasks=serialsubtasks) - self._graph.cook(jobTask) - if len(self._graph) == 0: - # tractor API will raise a RequiredValueError if no task are in job so we add a dummy one - # note that the job will not even appear in Tractor web ui - _ = tractorJob.newTask(title='dummy') - return tractorJob - - def submit(self, priority="normal", share="", dryRun=False, block=False): - """Submit to Tractor, or print TCL if dryRun.""" - if share: - self.jobInfo.share = share - - job = self.cook() - job.priority = PRIORITY_DICT.get(priority, PRIORITY_DICT["normal"]) - - if dryRun: - logging.info("TractorSubmitter: Job in TCL format :") - logging.info(job.asTcl()) - return {} - else: - jid = job.spool(block=block, owner=self.jobInfo.user) - return {"id": jid, "url": TRACTOR_JOB_URL.format(jid=jid)} diff --git a/meshroom/tractorSubmitter/tractorSubmitter.py b/meshroom/tractorSubmitter/tractorSubmitter.py index cb37745..756c49e 100644 --- a/meshroom/tractorSubmitter/tractorSubmitter.py +++ b/meshroom/tractorSubmitter/tractorSubmitter.py @@ -2,20 +2,148 @@ import os import sys +import shutil import getpass -from meshroom.core.submitter import BaseSubmitter, SubmitterOptions, SubmitterOptionsEnum +import logging +from collections import namedtuple +from typing import Dict, List + +# ========== Tractor ========== +from tractor.api import author as tractorAuthor +from tractorSubmitter.api.base import ( + getRequestPackages, + TaskInfo, JobInfo, + TRACTOR_JOB_URL, PRIORITY_DICT +) import tractorSubmitter.api.tractorJobQuery as tq -from tractorSubmitter.api.base import getRequestPackages -from tractorSubmitter.api.tractorJobCreation import Task, Job from tractorSubmitter.api.subtaskCreator import queueChunkTask +# ========== Meshroom ========== import meshroom -from meshroom.core.submitter import BaseSubmittedJob +from meshroom.core import MESHROOM_ROOT from meshroom.core.node import Status +from meshroom.core.submitter import ( + BaseSubmitter, BaseSubmittedJob, + SubmitterOptions, SubmitterOptionsEnum, + OrderedTask, OrderedTasks, OrderedTaskType +) currentDir = os.path.dirname(os.path.realpath(__file__)) binDir = os.path.dirname(os.path.dirname(os.path.dirname(currentDir))) +CreatedTask = namedtuple("task", ["task", "chunkParams"]) + + +def wrapMeshroomBin(_bin): + if shutil.which(_bin): + # The alias exists so use it directly + return _bin + binFolder = str(MESHROOM_ROOT / "bin") + return os.path.join(binFolder, _bin) + + +class Task: + def __init__(self, name, command, uid=None, nodeCache="", tags=None, + reqPackages=None, service=None, licenses=None, taskType=None): + self.taskInfos = TaskInfo( + name=name, + cmdArgs=command, + nodeUid=uid, + cacheFolder=nodeCache, + reqPackages=reqPackages, + service=service, + licenses=licenses, + taskType=taskType, + tags=tags.copy() if tags else None, + ) + taskKwargs = self.taskInfos.cook() + self.tractorTask: tractorAuthor.Task = tractorAuthor.Task(**taskKwargs) + for cmd in self.tractorTask.cmds: + cmd.tags = self.taskInfos.limits + cmd.envkey = self.taskInfos.envkey + cmd.expand = self.taskInfos.expandingTask + # if taskInfos.expandingTask: + # cmd.expand = taskInfos.expandingFile + + +class Job: + def __init__(self, name, tags=None, requirements=None, environment=None, user=None, comment="", paused=False): + self.jobInfo = JobInfo( + name, + share="", + service=requirements, + environment=environment, + tags=tags, + user=user, + comment=comment, + paused=paused + ) + self.tasks : List[Task] = [] + self.taskTependencies = {} # task: [tasks that the task depends on] + + def addTask(self, task: Task): + self.tasks.append(task) + self.taskTependencies[task] = [] + + def addTaskDependency(self, parentTask: Task, childTask: Task): + parentTask.tractorTask.addChild(childTask.tractorTask) + self.taskTependencies[parentTask].append(childTask) + + def getRootTasks(self): + """ Get all tasks that are not children of other tasks """ + tasksWithoutDeps = set(self.tasks) + for _, childTasks in self.taskTependencies.items(): + for task in childTasks: + if task not in tasksWithoutDeps: + continue + tasksWithoutDeps.remove(task) + return list(tasksWithoutDeps) + + @staticmethod + def createDummyTask(tractorJob: tractorAuthor.Job): + """ tractor API will raise a RequiredValueError if no task are + in the job so we add a dummy one. + Note that the job will not even appear in Tractor web ui. + """ + return tractorJob.newTask(title='dummy') + + def cook(self, tractorJob: tractorAuthor.Job): + if len(self.tasks) == 0: + self.createDummyTask(tractorJob) + return + # Create the job task (no command, at the graph root) + rootTasks = self.getRootTasks() + serialsubtasks = len(rootTasks) == 1 + jobTask = tractorJob.newTask(title=self.jobInfo.name, argv=None, serialsubtasks=serialsubtasks) + for task in rootTasks: + jobTask.addChild(task.tractorTask) + # Cook tasks + taskToTractorTask = {} + for task in self.tasks: + tractorTask = task.tractorTask + tractorJob.addChild(tractorTask) + taskToTractorTask[task] = tractorTask + # Create dependencies + # TODO + + def submit(self, priority="normal", share="", dryRun=False, block=False): + """Submit to Tractor, or print TCL if dryRun.""" + if share: + self.jobInfo.share = share + + # Create job + tractorJob = tractorAuthor.Job(**self.jobInfo.cook()) + self.cook(tractorJob) + tractorJob.priority = PRIORITY_DICT.get(priority, PRIORITY_DICT["normal"]) + + if dryRun: + logging.info("TractorSubmitter: Job in TCL format :") + logging.info(tractorJob.asTcl()) + return {} + else: + jid = tractorJob.spool(block=block, owner=self.jobInfo.user) + return {"id": jid, "url": TRACTOR_JOB_URL.format(jid=jid)} + class TractorTaskReturnCode: SUCCESS = 0 @@ -181,51 +309,62 @@ def retrieveJob(self, jid) -> TractorJob: job = TractorJob(jid, self) return job - def createTask(self, job: Job, meshroomFile: str, node) -> Task: - tags = self.DEFAULT_TAGS.copy() # copy to not modify default tags - optionalArgs = {} - if not node._chunksCreated: - # Chunks will be created by the process - optionalArgs["expandingTask"] = True - elif node.isParallelized: - blockSize, fullSize, nbBlocks = node.nodeDesc.parallelization.getSizes(node) - iterationsToIgnore = [] - for c in node._chunks: - if c._status.status == Status.SUCCESS: - iterationsToIgnore.append(c.range.iteration) - if nbBlocks > 0: - optionalArgs["chunkParams"] = { - "start": 0, "end": nbBlocks - 1, "step": 1, - "ignoreIterations": iterationsToIgnore - } + def createTask(self, meshroomFile: str, orderedTask: OrderedTask, createdTasks: Dict[OrderedTask, Task]) -> Task: + node = orderedTask.node + print(f"[CreateTask] orderedTask={orderedTask}, node={node}") + if orderedTask.taskType == OrderedTaskType.PLACEHOLDER: + return Task( + name=orderedTask.node.name if orderedTask.node else "", + command="", + ) + + if orderedTask.taskType == OrderedTaskType.CHUNK: + taskType = ("chunk", orderedTask.iteration) + elif orderedTask.taskType == OrderedTaskType.PREPROCESS: + taskType = ("preprocess", None) + elif orderedTask.taskType == OrderedTaskType.POSTPROCESS: + taskType = ("postprocess", None) + elif orderedTask.taskType == OrderedTaskType.EXPANDING: + taskType = ("expanding", None) + elif orderedTask.taskType == OrderedTaskType.PLACEHOLDER: + taskType = (None, None) else: - optionalArgs["chunkParams"] = {"start": 0, "end": 0, "step": 1} - tags['nbFrames'] = node.size + raise ValueError(f"Unknown OrderedTaskType type {orderedTask.taskType}") + + tags = self.DEFAULT_TAGS.copy() # copy to not modify default tags tags['prod'] = self.prod - # Fetch licenses - licenses = node.nodeDesc._licenses - cmdArgs = f"--node {node.name} \"{meshroomFile}\" --extern" - task = job.createTask( - name=node.name, - commandArgs=cmdArgs, - uid=node._uid, # Provide unicity info - nodeCache=node._internalFolder, - tags=tags, - rezPackages=self.reqPackages, - service=self.getTaskService(node), - licenses=licenses, - **optionalArgs - ) + + taskParams = { + "name": node.name, + "uid": node._uid, # Provide unicity info + "nodeCache": node._internalFolder, + "tags": tags, + "reqPackages": self.reqPackages, + "service": self.getTaskService(node), + "licenses": node.nodeDesc._licenses, + "taskType": taskType + } + + cmdArgs = f"--node {orderedTask.node.name} \"{meshroomFile}\" --extern" + + if orderedTask.taskType == OrderedTaskType.EXPANDING: + cmdBin = "meshroom_createChunks" + cmdArgs = f"--submitter {self._name} {cmdArgs}" + else: + cmdBin = "meshroom_compute" + cmdBin = wrapMeshroomBin(cmdBin) + + cmdArgs = f"{cmdBin} {cmdArgs}" + task = Task(command=cmdArgs, **taskParams) return task - def createJob(self, nodes, edges, filepath, submitLabel="{projectName}"): + def createJob(self, orderedTasks: OrderedTasks, filepath, submitLabel="{projectName}") -> TractorJob: + # Create job projectName = os.path.splitext(os.path.basename(filepath))[0] name = submitLabel.format(projectName=projectName) comment = filepath - maxNodeSize = max([node.size for node in nodes]) mainTags = { 'prod': self.prod, - 'nbFrames': str(maxNodeSize), 'comment': comment, } # Create job @@ -235,21 +374,25 @@ def createJob(self, nodes, edges, filepath, submitLabel="{projectName}"): environment=self.environment, user=os.environ.get('FARM_USER', os.environ.get('USER', getpass.getuser())), ) - # Create tasks - nodeUidToTask: dict[str, Task] = {} - for node in nodes: - if node._uid in nodeUidToTask: - continue # HACK: Should not be necessary - # It would be better to skip inputNodes but at the same time tricky if used in between - # of other nodes: - # if node._isInputNode(): - # continue - task = self.createTask(job, filepath, node) - nodeUidToTask[node._uid] = task - # Connect tasks - for u, v in edges: - nodeUidToTask[u._uid].addChild(nodeUidToTask[v._uid]) - # Submit job + # Add tasks + print("Ordered Tasks:") + orderedTasks.display() + createdTasks: Dict[OrderedTask, Task] = dict() + for taskToCreate in orderedTasks.iterOnTasks(): + if taskToCreate in createdTasks.keys(): + continue + createdTask = self.createTask(filepath, taskToCreate, createdTasks) + job.addTask(createdTask) + createdTasks[taskToCreate] = createdTask + + for orderedTask, task in createdTasks.items(): + print(orderedTask, "->", task) + + for orderedTask, task in createdTasks.items(): + deps = [createdTasks.get(t) for t in orderedTask.dependencies] + for dependency in deps: + job.addTaskDependency(task, dependency) + res = job.submit(share=self.share, dryRun=self.dryRun) if self.dryRun: return True @@ -264,19 +407,18 @@ def createChunkTask(self, node, graphFile, environment=None, **kwargs): Keyword args : cache, forceStatus, forceCompute """ taskTags = self.DEFAULT_TAGS.copy() - taskTags['nbFrames'] = node.size taskTags['prod'] = self.prod # Environment environment = environment or {} # Command - cmdArgs = f"--node {node.name} \"{graphFile}\" --extern" + cmdArgs = f"meshroom_compute --node {node.name} \"{graphFile}\" --extern" # Add task to the queue queueChunkTask( node=node, cmdArgs=cmdArgs, service=self.getTaskService(node), tags=taskTags, - rezPackages=self.reqPackages, + reqPackages=self.reqPackages, environment=environment ) diff --git a/script/tractorSubtaskWrapper.py b/script/tractorExpander.py similarity index 79% rename from script/tractorSubtaskWrapper.py rename to script/tractorExpander.py index eea7f4a..c54d218 100644 --- a/script/tractorSubtaskWrapper.py +++ b/script/tractorExpander.py @@ -1,11 +1,11 @@ #!/usr/bin/env python """ -Tractor Subtask Wrapper -Redirects all normal output to stderr, leaving stdout for Tractor subtask definitions. +Tractor Expander +Redirects all normal output to stderr, leaving stdout for Tractor commands. Usage: - python tractorSubtaskWrapper.py createTasks.py arg1 arg2 --option=value + python tractorExpander.py script.py arg1 arg2 --option=value """ import sys @@ -35,22 +35,22 @@ def kill_current_process(cls, allow_auto_retry=True): def main(): if len(sys.argv) < 2: - sys.stderr.write("Usage: tractorSubtaskWrapper.py