-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsubtaskCreator.py
More file actions
187 lines (156 loc) · 5.57 KB
/
subtaskCreator.py
File metadata and controls
187 lines (156 loc) · 5.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#!/usr/bin/env python
"""
Helper functions to create subtasks
Provides queueSubtask() to write Tractor subtask definitions to stdout.
Works with tractorExpander.py to ensure proper stream handling.
Example :
>>> from tractorSubmitter.api.subtaskCreator import queueSubtask
>>> queueSubtask(command1, **args)
>>> queueSubtask(command2, **args)
>>> ...
"""
import sys
import os
import json
import shlex
from tractorSubmitter.api.base import TaskInfo
# Original stdout file descriptor
# Cached to avoid reopening file descriptor multiple times
EXPAND_MODE = "stdout" # Or "file"
_stdout = None
_expandTaskFile = None
def log(*text):
text = " ".join(text)
sys.stderr.write(text + "\n")
def _getCachedSubtaskStdout():
"""
Get cached subtask stdout
"""
global _stdout
if _stdout is None:
if 'TRACTOR_STDOUT_FD' in os.environ:
try:
fd = int(os.environ['TRACTOR_STDOUT_FD'])
# Open the file descriptor for writing
_stdout = os.fdopen(fd, 'w', buffering=1)
except (ValueError, OSError):
raise RuntimeError("(_getCachedSubtaskStdout) Could not open TRACTOR_STDOUT_FD")
log(f"(_getCachedSubtaskStdout) stdout={_stdout}")
else:
raise FileNotFoundError("(_getCachedSubtaskStdout) Could not find TRACTOR_STDOUT_FD")
return _stdout
def _getCachedTaskFile():
"""
Not used ! It would be a better alternative but since we cannot
pass a string to cmd.expand (although it should be possible since tractor 1.7)
we cannot use this
"""
global _expandTaskFile
if _expandTaskFile is None:
if 'EXPAND_FILE' in os.environ:
try:
_expandTaskFile = os.environ['EXPAND_FILE']
except (ValueError, OSError):
raise RuntimeError("(_getCachedTaskFile) Could not open EXPAND_FILE")
log(f"(_getCachedTaskFile) expand file: {_expandTaskFile}")
else:
raise FileNotFoundError("(_getCachedTaskFile) Could not find EXPAND_FILE")
return _expandTaskFile
def sendTractorCmd(task_def):
""" Write the tractor command to the stdout """
if EXPAND_MODE == "stdout":
tractor_stdout = _getCachedSubtaskStdout()
tractor_stdout.write(task_def)
tractor_stdout.flush()
elif EXPAND_MODE == "file":
expandFile = _getCachedTaskFile()
with open(expandFile, "a+") as f:
f.write("\n" + task_def + "\n")
def queueSubtask(title, argv, service="", limits=None, metadata=None, envkey=None):
"""
Queue a subtask to be created in Tractor.
Args:
title (str): Task title
cmd (str or list): Command to run (string or argv list)
service (str): Tractor service key
limits (list): Limit tags (e.g. ["blender", "nuke"])
metadata (dict): Metadata as key:value pairs
envkey (list): Environment key list
# TODO : Add possibility to specify blades ?
Example:
queueSubtask(
title="render_frame_0001",
cmd="render --frame 1 scene.ma",
service="mikrosRender",
limits=["blender"],
metadata={'user': 'john', 'iteration': '1', 'prod': 'mvg'}
)
"""
# Parse command
if isinstance(argv, str):
cmd_argv = shlex.split(argv)
else:
cmd_argv = list(argv)
cmd_str = " ".join(cmd_argv)
# Build tags string
tags_str = ""
if limits:
tags_str = f"-tags {{{' '.join(limits)}}}"
# Build metadata string
if isinstance(metadata, dict):
metadata = json.dumps(metadata)
metadata_str = f"-metadata {{{metadata}}}"
# Build envkey string
envkey_str = ""
if envkey:
envkey_str = f"-envkey {{{' '.join(envkey)}}}"
# Build service string
service_str = f"-service {{{service}}}" if service else ""
# Write Alfred task definition
# TODO : we can use tractor API to convert a Task into alf (asTcl)
task_def = f"""
Task -title {{{title}}} {service_str} {metadata_str} -cmds {{
RemoteCmd {{{cmd_str}}} {service_str} {tags_str} {envkey_str}
}}
"""
print(task_def)
sendTractorCmd(task_def)
log(f"Queued subtask: {title}")
def getChunks(chunkParams):
it = None
ignoreIterations = chunkParams.get("ignoreIterations", [])
if chunkParams:
start, end = chunkParams.get("start", -1), chunkParams.get("end", -2)
size = 1
frameRange = list(range(start, end+1, 1))
if frameRange:
it = [
Chunk(i, )
]
slices = [frameRange[i : i+1] for i in range(0, len(frameRange))]
it = [Chunk(i, item[0], item[-1]) for i, item in enumerate(slices)
if i not in ignoreIterations]
return it
def queueChunkTask(node, cmdArgs, service, tags=None, reqPackages=None, environment=None):
blockSize, fullSize, nbBlocks = node.nodeDesc.parallelization.getSizes(node)
if nbBlocks <= 0:
return
licenses = node.nodeDesc._licenses
for iteration in range(nbBlocks):
taskInfo = TaskInfo(
name=node.name,
cmdArgs=cmdArgs,
nodeUid=node._uid,
environment=environment,
reqPackages=reqPackages,
service=service,
licenses=licenses,
taskType=("chunk", iteration),
tags=tags.copy() if tags else None,
)
# title, argv, service, metadata
taskArgs = taskInfo.cook()
# limits, envkey
taskArgs['limits'] = taskInfo.limits
taskArgs['envkey'] = taskInfo.envkey
queueSubtask(**taskArgs)