Skip to content

Commit 953ce56

Browse files
committed
suggested changes
1 parent d1cbae9 commit 953ce56

File tree

4 files changed

+104
-72
lines changed

4 files changed

+104
-72
lines changed

metaflow/cli.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -678,11 +678,11 @@ def common_run_options(func):
678678
help="Write the ID of this run to the file specified.",
679679
)
680680
@click.option(
681-
"--flow-name-file",
681+
"--pathspec-file",
682682
default=None,
683683
show_default=True,
684684
type=str,
685-
help="Write the flow name of this run to the file specified.",
685+
help="Write the pathspec of this run to the file specified.",
686686
)
687687
@wraps(func)
688688
def wrapper(*args, **kwargs):
@@ -742,7 +742,7 @@ def resume(
742742
decospecs=None,
743743
run_id_file=None,
744744
resume_identifier=None,
745-
flow_name_file=None,
745+
pathspec_file=None,
746746
):
747747
before_run(obj, tags, decospecs + obj.environment.decospecs())
748748

@@ -799,7 +799,7 @@ def resume(
799799
resume_identifier=resume_identifier,
800800
)
801801
write_file(run_id_file, runtime.run_id)
802-
write_file(flow_name_file, obj.flow.name)
802+
write_file(pathspec_file, "/".join((obj.flow.name, runtime.run_id)))
803803
runtime.print_workflow_info()
804804

805805
runtime.persist_constants()
@@ -833,7 +833,7 @@ def run(
833833
max_log_size=None,
834834
decospecs=None,
835835
run_id_file=None,
836-
flow_name_file=None,
836+
pathspec_file=None,
837837
user_namespace=None,
838838
**kwargs
839839
):
@@ -858,7 +858,7 @@ def run(
858858
)
859859
write_latest_run_id(obj, runtime.run_id)
860860
write_file(run_id_file, runtime.run_id)
861-
write_file(flow_name_file, obj.flow.name)
861+
write_file(pathspec_file, "/".join((obj.flow.name, runtime.run_id)))
862862

863863
obj.flow._set_constants(obj.graph, kwargs)
864864
runtime.print_workflow_info()

metaflow/click_api.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def _method_sanity_check(
7575
if supplied_k not in possible_params:
7676
raise ValueError(
7777
"Unknown argument: '%s', possible args are: %s"
78-
% (supplied_k, list(possible_params.keys()))
78+
% (supplied_k, ", ".join(possible_params.keys()))
7979
)
8080

8181
try:

metaflow/metaflow_runner.py

+53-27
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,33 @@
88
from metaflow.subprocess_manager import SubprocessManager, CommandManager
99

1010

11-
def read_from_file_when_ready(file_path):
11+
def read_from_file_when_ready(file_path: str, timeout: float = 5):
12+
start_time = time.time()
1213
with open(file_path, "r") as file_pointer:
1314
content = file_pointer.read()
1415
while not content:
16+
if time.time() - start_time > timeout:
17+
raise TimeoutError(
18+
"Timeout while waiting for file content from '%s'" % file_path
19+
)
1520
time.sleep(0.1)
1621
content = file_pointer.read()
1722
return content
1823

1924

2025
class ExecutingRun(object):
26+
"""
27+
An object that encapsulates both the:
28+
- CommandManager Object (that has ability to stream logs, kill the underlying process, etc.)
29+
- Metaflow's Run object
30+
31+
This is a user facing object exposing methods and properties for:
32+
- waiting (with an optional timeout and optional streaming of logs)
33+
- current snapshot of stdout
34+
- current snapshot of stderr
35+
- ability to stream logs
36+
"""
37+
2138
def __init__(
2239
self, runner: "Runner", command_obj: CommandManager, run_obj: Run
2340
) -> None:
@@ -32,21 +49,25 @@ def __exit__(self, exc_type, exc_value, traceback):
3249
self.runner.__exit__(exc_type, exc_value, traceback)
3350

3451
async def wait(self, timeout: Optional[float] = None, stream: Optional[str] = None):
52+
"""Wait for the run to finish, optionally with a timeout and optionally streaming its output."""
3553
await self.command_obj.wait(timeout, stream)
3654
return self
3755

3856
@property
3957
def stdout(self):
58+
"""Get the current snapshot of stdout from the log file."""
4059
with open(self.command_obj.log_files.get("stdout"), "r") as fp:
4160
return fp.read()
4261

4362
@property
4463
def stderr(self):
64+
"""Get the current snapshot of stderr from the log file."""
4565
with open(self.command_obj.log_files.get("stderr"), "r") as fp:
4666
return fp.read()
4767

48-
async def stream_logs(self, stream: str, position: Optional[int] = None):
49-
async for position, line in self.command_obj.stream_logs(stream, position):
68+
async def stream_log(self, stream: str, position: Optional[int] = None):
69+
"""Stream logs from the run using the log files. Used with an async for loop"""
70+
async for position, line in self.command_obj.stream_log(stream, position):
5071
yield position, line
5172

5273

@@ -58,6 +79,22 @@ def __init__(
5879
env: Dict = {},
5980
**kwargs,
6081
):
82+
"""
83+
Metaflow's Runner API that presents a programmatic interface
84+
to run flows either synchronously or asynchronously. The class expects
85+
a path to the flow file along with optional top level parameters such as:
86+
metadata, environment, datastore, etc.
87+
88+
The run() method expects run-level parameters. The object returned by the
89+
'run()' method has access to Metaflow's Run object using `.run` along with
90+
other abilities such as streaming logs, etc.
91+
92+
Example:
93+
with metaflow_runner.Runner('../try.py', metadata="local") as runner:
94+
result = runner.run(alpha=5, tags=["abc", "def"], max_workers=5)
95+
print(result.run.finished)
96+
"""
97+
6198
# these imports are required here and not at the top
6299
# since they interfere with the user defined Parameters
63100
# in the flow file, this is related to the ability of
@@ -95,40 +132,29 @@ def run(self, **kwargs):
95132

96133
async def async_run(self, **kwargs):
97134
with tempfile.TemporaryDirectory() as temp_dir:
98-
tfp_flow = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
99-
tfp_run_id = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
135+
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
100136

101-
command = self.runner(
102-
run_id_file=tfp_run_id.name, flow_name_file=tfp_flow.name, **kwargs
103-
)
137+
command = self.runner(pathspec_file=tfp_pathspec.name, **kwargs)
104138

105139
pid = await self.spm.run_command(
106140
[sys.executable, *command], env=self.env_vars
107141
)
108142
command_obj = self.spm.get(pid)
109143

110-
# detect failures even before writing to the run_id and flow_name files
111-
# the error (if any) must happen within the first 0.5 seconds
112144
try:
113-
await asyncio.wait_for(command_obj.process.wait(), timeout=0.5)
114-
except asyncio.TimeoutError:
115-
pass
116-
117-
# if the returncode is None, the process has encountered no error within the
118-
# initial 0.5 seconds and we proceed to run it in the background
119-
# during which it would have written to the run_id and flow_name files
120-
if command_obj.process.returncode is not None:
145+
pathspec = read_from_file_when_ready(tfp_pathspec.name, timeout=5)
146+
run_object = Run(pathspec, _namespace_check=False)
147+
return ExecutingRun(self, command_obj, run_object)
148+
except TimeoutError as e:
149+
stdout_log = open(command_obj.log_files["stdout"]).read()
121150
stderr_log = open(command_obj.log_files["stderr"]).read()
122151
command = " ".join(command_obj.command)
123-
raise RuntimeError(f"Error executing: '{command}':\n\n{stderr_log}")
124-
else:
125-
flow_name = read_from_file_when_ready(tfp_flow.name)
126-
run_id = read_from_file_when_ready(tfp_run_id.name)
127-
128-
pathspec_components = (flow_name, run_id)
129-
run_object = Run("/".join(pathspec_components), _namespace_check=False)
130-
131-
return ExecutingRun(self, command_obj, run_object)
152+
error_message = "Error executing: '%s':\n" % command
153+
if stdout_log.strip():
154+
error_message += "\nStdout:\n%s\n" % stdout_log
155+
if stderr_log.strip():
156+
error_message += "\nStderr:\n%s\n" % stderr_log
157+
raise RuntimeError(error_message) from e
132158

133159
def __exit__(self, exc_type, exc_value, traceback):
134160
self.spm.cleanup()

metaflow/subprocess_manager.py

+44-38
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ async def run_command(
5353
"""Run a command asynchronously and return its process ID."""
5454

5555
command_obj = CommandManager(command, env, cwd)
56-
process = await command_obj.run()
57-
self.commands[process.pid] = command_obj
58-
return process.pid
56+
pid = await command_obj.run()
57+
self.commands[pid] = command_obj
58+
return pid
5959

6060
def get(self, pid: int) -> "CommandManager":
6161
"""Get the CommandManager object for a given process ID."""
@@ -98,7 +98,6 @@ async def __aexit__(self, exc_type, exc_value, traceback):
9898
def handle_sigint(self, signum, frame):
9999
"""Handle the SIGINT signal."""
100100

101-
print("SIGINT received.")
102101
asyncio.create_task(self.kill())
103102

104103
async def wait(
@@ -121,38 +120,45 @@ async def wait(
121120
command_string = " ".join(self.command)
122121
await self.kill()
123122
print(
124-
"Timeout: The process: %s with command: '%s' didn't complete within %s seconds."
123+
"Timeout: The process (PID %d; command: '%s') did not complete within %s seconds."
125124
% (self.process.pid, command_string, timeout)
126125
)
127126

128127
async def run(self):
129128
"""Run the subprocess, streaming the logs to temporary files"""
130129

131-
self.temp_dir = tempfile.mkdtemp()
132-
stdout_logfile = os.path.join(self.temp_dir, "stdout.log")
133-
stderr_logfile = os.path.join(self.temp_dir, "stderr.log")
134-
135-
try:
136-
# returns when process has been started,
137-
# not when it is finished...
138-
self.process = await asyncio.create_subprocess_exec(
139-
*self.command,
140-
cwd=self.cwd,
141-
env=self.env,
142-
stdout=open(stdout_logfile, "w"),
143-
stderr=open(stderr_logfile, "w"),
144-
)
130+
if not self.run_called:
131+
self.temp_dir = tempfile.mkdtemp()
132+
stdout_logfile = os.path.join(self.temp_dir, "stdout.log")
133+
stderr_logfile = os.path.join(self.temp_dir, "stderr.log")
134+
135+
try:
136+
# returns when process has been started,
137+
# not when it is finished...
138+
self.process = await asyncio.create_subprocess_exec(
139+
*self.command,
140+
cwd=self.cwd,
141+
env=self.env,
142+
stdout=open(stdout_logfile, "w"),
143+
stderr=open(stderr_logfile, "w"),
144+
)
145145

146-
self.log_files["stdout"] = stdout_logfile
147-
self.log_files["stderr"] = stderr_logfile
146+
self.log_files["stdout"] = stdout_logfile
147+
self.log_files["stderr"] = stderr_logfile
148148

149-
self.run_called = True
150-
return self.process
151-
except Exception as e:
152-
print("Error starting subprocess: %s" % e)
153-
self.cleanup()
149+
self.run_called = True
150+
return self.process.pid
151+
except Exception as e:
152+
print("Error starting subprocess: %s" % e)
153+
self.cleanup()
154+
else:
155+
command_string = " ".join(self.command)
156+
print(
157+
"Command '%s' has already been called. Please create another CommandManager object."
158+
% command_string
159+
)
154160

155-
async def stream_logs(
161+
async def stream_log(
156162
self,
157163
stream: str,
158164
position: Optional[int] = None,
@@ -161,13 +167,13 @@ async def stream_logs(
161167
):
162168
"""Stream logs from the subprocess using the log files"""
163169

164-
if self.run_called is False:
165-
raise ValueError("No command run yet to get the logs for...")
170+
if not self.run_called:
171+
raise RuntimeError("No command run yet to get the logs for...")
166172

167173
if stream not in self.log_files:
168174
raise ValueError(
169175
"No log file found for '%s', valid values are: %s"
170-
% (stream, list(self.log_files.keys()))
176+
% (stream, ", ".join(self.log_files.keys()))
171177
)
172178

173179
log_file = self.log_files[stream]
@@ -206,18 +212,18 @@ async def stream_logs(
206212
continue
207213

208214
position = f.tell()
209-
yield position, line.strip()
215+
yield position, line.rstrip()
210216

211217
async def emit_logs(self, stream: str = "stdout", custom_logger: Callable = print):
212-
"""Helper function to iterate over stream_logs"""
218+
"""Helper function to iterate over stream_log"""
213219

214-
async for _, line in self.stream_logs(stream):
220+
async for _, line in self.stream_log(stream):
215221
custom_logger(line)
216222

217223
def cleanup(self):
218224
"""Clean up log files for a running subprocesses."""
219225

220-
if hasattr(self, "temp_dir"):
226+
if self.run_called:
221227
shutil.rmtree(self.temp_dir, ignore_errors=True)
222228

223229
async def kill(self, termination_timeout: float = 5):
@@ -272,7 +278,7 @@ async def main():
272278

273279
# stream logs line by line and check for existence of a string, noting down the position
274280
interesting_position = 0
275-
async for position, line in command_obj.stream_logs(stream="stdout"):
281+
async for position, line in command_obj.stream_log(stream="stdout"):
276282
print(line)
277283
if "alpha is" in line:
278284
interesting_position = position
@@ -290,21 +296,21 @@ async def main():
290296
"resuming streaming from: %s while process is still running..."
291297
% interesting_position
292298
)
293-
async for position, line in command_obj.stream_logs(
299+
async for position, line in command_obj.stream_log(
294300
stream="stdout", position=interesting_position
295301
):
296302
print(line)
297303

298304
# this will be instantaneous since the process has finished and we just read from the log file
299305
print("process has ended by now... streaming again from scratch..")
300-
async for position, line in command_obj.stream_logs(stream="stdout"):
306+
async for position, line in command_obj.stream_log(stream="stdout"):
301307
print(line)
302308

303309
# this will be instantaneous since the process has finished and we just read from the log file
304310
print(
305311
"process has ended by now... streaming again but from position of choice.."
306312
)
307-
async for position, line in command_obj.stream_logs(
313+
async for position, line in command_obj.stream_log(
308314
stream="stdout", position=interesting_position
309315
):
310316
print(line)

0 commit comments

Comments
 (0)