Skip to content

Commit 2d1520d

Browse files
committed
synchronous run with logs
1 parent d661bbc commit 2d1520d

File tree

2 files changed

+138
-19
lines changed

2 files changed

+138
-19
lines changed

metaflow/runner/metaflow_runner.py

+22-13
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import os
22
import sys
33
import time
4-
import asyncio
54
import tempfile
65
from typing import Dict, Iterator, Optional, Tuple
76
from metaflow import Run
@@ -219,7 +218,6 @@ def __init__(
219218
self.env_vars["METAFLOW_PROFILE"] = profile
220219
self.spm = SubprocessManager()
221220
self.api = MetaflowAPI.from_cli(self.flow_file, start)
222-
self.runner = self.api(**kwargs).run
223221

224222
def __enter__(self) -> "Runner":
225223
return self
@@ -249,15 +247,27 @@ def run(self, **kwargs) -> ExecutingRun:
249247
ExecutingRun
250248
ExecutingRun object for this run.
251249
"""
252-
loop = asyncio.new_event_loop()
253-
asyncio.set_event_loop(loop)
250+
with tempfile.TemporaryDirectory() as temp_dir:
251+
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
252+
command = self.api.run(pathspec_file=tfp_pathspec.name, **kwargs)
254253

255-
try:
256-
result = loop.run_until_complete(self.async_run(**kwargs))
257-
result = loop.run_until_complete(result.wait())
258-
return result
259-
finally:
260-
loop.close()
254+
pid = self.spm.run_command([sys.executable, *command], env=self.env_vars)
255+
command_obj = self.spm.get(pid)
256+
257+
try:
258+
pathspec = read_from_file_when_ready(tfp_pathspec.name, timeout=5)
259+
run_object = Run(pathspec, _namespace_check=False)
260+
return ExecutingRun(self, command_obj, run_object)
261+
except TimeoutError as e:
262+
stdout_log = open(command_obj.log_files["stdout"]).read()
263+
stderr_log = open(command_obj.log_files["stderr"]).read()
264+
command = " ".join(command_obj.command)
265+
error_message = "Error executing: '%s':\n" % command
266+
if stdout_log.strip():
267+
error_message += "\nStdout:\n%s\n" % stdout_log
268+
if stderr_log.strip():
269+
error_message += "\nStderr:\n%s\n" % stderr_log
270+
raise RuntimeError(error_message) from e
261271

262272
async def async_run(self, **kwargs) -> ExecutingRun:
263273
"""
@@ -277,10 +287,9 @@ async def async_run(self, **kwargs) -> ExecutingRun:
277287
"""
278288
with tempfile.TemporaryDirectory() as temp_dir:
279289
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
290+
command = self.api.run(pathspec_file=tfp_pathspec.name, **kwargs)
280291

281-
command = self.runner(pathspec_file=tfp_pathspec.name, **kwargs)
282-
283-
pid = await self.spm.run_command(
292+
pid = await self.spm.async_run_command(
284293
[sys.executable, *command], env=self.env_vars
285294
)
286295
command_obj = self.spm.get(pid)

metaflow/runner/subprocess_manager.py

+116-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import shutil
66
import asyncio
77
import tempfile
8+
import threading
89
import subprocess
910
from typing import List, Dict, Optional, Callable, Iterator, Tuple
1011

@@ -42,7 +43,44 @@ async def __aenter__(self) -> "SubprocessManager":
4243
async def __aexit__(self, exc_type, exc_value, traceback):
4344
self.cleanup()
4445

45-
async def run_command(
46+
def run_command(
47+
self,
48+
command: List[str],
49+
env: Optional[Dict[str, str]] = None,
50+
cwd: Optional[str] = None,
51+
show_output: bool = False,
52+
) -> int:
53+
"""
54+
Run a command synchronously and return its process ID.
55+
56+
Parameters
57+
----------
58+
command : List[str]
59+
The command to run in List form.
60+
env : Optional[Dict[str, str]], default None
61+
Environment variables to set for the subprocess; if not specified,
62+
the current enviornment variables are used.
63+
cwd : Optional[str], default None
64+
The directory to run the subprocess in; if not specified, the current
65+
directory is used.
66+
show_output : bool, default False
67+
Suppress the 'stdout' and 'stderr' to the console by default.
68+
They can be accessed later by reading the files present in the
69+
CommandManager object:
70+
- command_obj.log_files["stdout"]
71+
- command_obj.log_files["stderr"]
72+
Returns
73+
-------
74+
int
75+
The process ID of the subprocess.
76+
"""
77+
78+
command_obj = CommandManager(command, env, cwd)
79+
pid = command_obj.run(show_output=show_output)
80+
self.commands[pid] = command_obj
81+
return pid
82+
83+
async def async_run_command(
4684
self,
4785
command: List[str],
4886
env: Optional[Dict[str, str]] = None,
@@ -69,7 +107,7 @@ async def run_command(
69107
"""
70108

71109
command_obj = CommandManager(command, env, cwd)
72-
pid = await command_obj.run()
110+
pid = await command_obj.async_run()
73111
self.commands[pid] = command_obj
74112
return pid
75113

@@ -80,7 +118,7 @@ def get(self, pid: int) -> Optional["CommandManager"]:
80118
Parameters
81119
----------
82120
pid : int
83-
The process ID of the subprocess (returned by run_command).
121+
The process ID of the subprocess (returned by run_command or async_run_command).
84122
85123
Returns
86124
-------
@@ -144,7 +182,7 @@ async def wait(
144182
Wait for the subprocess to finish, optionally with a timeout
145183
and optionally streaming its output.
146184
147-
You can only call `wait` if `run` has already been called.
185+
You can only call `wait` if `async_run` has already been called.
148186
149187
Parameters
150188
----------
@@ -178,7 +216,79 @@ async def wait(
178216
"within %s seconds." % (self.process.pid, command_string, timeout)
179217
)
180218

181-
async def run(self):
219+
def run(self, show_output: bool = False):
220+
"""
221+
Run the subprocess synchronously. This can only be called once.
222+
223+
This also waits on the process implicitly.
224+
225+
Parameters
226+
----------
227+
show_output : bool, default False
228+
Suppress the 'stdout' and 'stderr' to the console by default.
229+
They can be accessed later by reading the files present in:
230+
- self.log_files["stdout"]
231+
- self.log_files["stderr"]
232+
"""
233+
234+
if not self.run_called:
235+
self.temp_dir = tempfile.mkdtemp()
236+
stdout_logfile = os.path.join(self.temp_dir, "stdout.log")
237+
stderr_logfile = os.path.join(self.temp_dir, "stderr.log")
238+
239+
def stream_to_stdout_and_file(pipe, log_file):
240+
with open(log_file, "w") as file:
241+
for line in iter(pipe.readline, ""):
242+
if show_output:
243+
sys.stdout.write(line)
244+
file.write(line)
245+
pipe.close()
246+
247+
try:
248+
self.process = subprocess.Popen(
249+
self.command,
250+
cwd=self.cwd,
251+
env=self.env,
252+
stdout=subprocess.PIPE,
253+
stderr=subprocess.PIPE,
254+
bufsize=1,
255+
universal_newlines=True,
256+
)
257+
258+
self.log_files["stdout"] = stdout_logfile
259+
self.log_files["stderr"] = stderr_logfile
260+
261+
self.run_called = True
262+
263+
stdout_thread = threading.Thread(
264+
target=stream_to_stdout_and_file,
265+
args=(self.process.stdout, stdout_logfile),
266+
)
267+
stderr_thread = threading.Thread(
268+
target=stream_to_stdout_and_file,
269+
args=(self.process.stderr, stderr_logfile),
270+
)
271+
272+
stdout_thread.start()
273+
stderr_thread.start()
274+
275+
self.process.wait()
276+
277+
stdout_thread.join()
278+
stderr_thread.join()
279+
280+
return self.process.pid
281+
except Exception as e:
282+
print("Error starting subprocess: %s" % e)
283+
self.cleanup()
284+
else:
285+
command_string = " ".join(self.command)
286+
print(
287+
"Command '%s' has already been called. Please create another "
288+
"CommandManager object." % command_string
289+
)
290+
291+
async def async_run(self):
182292
"""
183293
Run the subprocess asynchronously. This can only be called once.
184294
@@ -357,7 +467,7 @@ async def main():
357467

358468
async with SubprocessManager() as spm:
359469
# returns immediately
360-
pid = await spm.run_command(cmd)
470+
pid = await spm.async_run_command(cmd)
361471
command_obj = spm.get(pid)
362472

363473
print(pid)

0 commit comments

Comments
 (0)