Skip to content

Commit 125a018

Browse files
authored
synchronous run and resume functionality + nbrun for runner API (#1845)
* synchronous run with resume and async resume * add nbrun * remove runtime error if not in jupyter env * fix env vars update * suggested changes to nbrun
1 parent 9dd168c commit 125a018

File tree

4 files changed

+370
-45
lines changed

4 files changed

+370
-45
lines changed

metaflow/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ class and related decorators.
149149
# Runner API
150150
if sys.version_info >= (3, 7):
151151
from .runner.metaflow_runner import Runner
152+
from .runner.nbrun import NBRunner
152153

153154
__version_addl__ = []
154155
_ext_debug("Loading top-level modules")

metaflow/runner/metaflow_runner.py

+122-39
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import os
22
import sys
33
import time
4-
import asyncio
54
import tempfile
65
from typing import Dict, Iterator, Optional, Tuple
76
from metaflow import Run
@@ -86,6 +85,20 @@ async def wait(
8685
await self.command_obj.wait(timeout, stream)
8786
return self
8887

88+
@property
89+
def returncode(self) -> Optional[int]:
90+
"""
91+
Gets the returncode of the underlying subprocess that is responsible
92+
for executing the run.
93+
94+
Returns
95+
-------
96+
Optional[int]
97+
The returncode for the subprocess that executes the run.
98+
(None if process is still running)
99+
"""
100+
return self.command_obj.process.returncode
101+
89102
@property
90103
def status(self) -> str:
91104
"""
@@ -214,32 +227,49 @@ def __init__(
214227
from metaflow.runner.click_api import MetaflowAPI
215228

216229
self.flow_file = flow_file
217-
self.env_vars = os.environ.copy().update(env or {})
230+
self.env_vars = os.environ.copy()
231+
self.env_vars.update(env or {})
218232
if profile:
219233
self.env_vars["METAFLOW_PROFILE"] = profile
220234
self.spm = SubprocessManager()
235+
self.top_level_kwargs = kwargs
221236
self.api = MetaflowAPI.from_cli(self.flow_file, start)
222-
self.runner = self.api(**kwargs).run
223237

224238
def __enter__(self) -> "Runner":
225239
return self
226240

227241
async def __aenter__(self) -> "Runner":
228242
return self
229243

230-
def __exit__(self, exc_type, exc_value, traceback):
231-
self.spm.cleanup()
232-
233-
async def __aexit__(self, exc_type, exc_value, traceback):
234-
self.spm.cleanup()
235-
236-
def run(self, **kwargs) -> ExecutingRun:
244+
def __get_executing_run(self, tfp_pathspec, command_obj):
245+
try:
246+
pathspec = read_from_file_when_ready(tfp_pathspec.name, timeout=10)
247+
run_object = Run(pathspec, _namespace_check=False)
248+
return ExecutingRun(self, command_obj, run_object)
249+
except TimeoutError as e:
250+
stdout_log = open(command_obj.log_files["stdout"]).read()
251+
stderr_log = open(command_obj.log_files["stderr"]).read()
252+
command = " ".join(command_obj.command)
253+
error_message = "Error executing: '%s':\n" % command
254+
if stdout_log.strip():
255+
error_message += "\nStdout:\n%s\n" % stdout_log
256+
if stderr_log.strip():
257+
error_message += "\nStderr:\n%s\n" % stderr_log
258+
raise RuntimeError(error_message) from e
259+
260+
def run(self, show_output: bool = False, **kwargs) -> ExecutingRun:
237261
"""
238262
Synchronous execution of the run. This method will *block* until
239263
the run has completed execution.
240264
241265
Parameters
242266
----------
267+
show_output : bool, default False
268+
Suppress the 'stdout' and 'stderr' to the console by default.
269+
They can be accessed later by reading the files present in the
270+
ExecutingRun object (referenced as 'result' below) returned:
271+
- result.stdout
272+
- result.stderr
243273
**kwargs : Any
244274
Additional arguments that you would pass to `python ./myflow.py` after
245275
the `run` command.
@@ -249,15 +279,53 @@ def run(self, **kwargs) -> ExecutingRun:
249279
ExecutingRun
250280
ExecutingRun object for this run.
251281
"""
252-
loop = asyncio.new_event_loop()
253-
asyncio.set_event_loop(loop)
282+
with tempfile.TemporaryDirectory() as temp_dir:
283+
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
284+
command = self.api(**self.top_level_kwargs).run(
285+
pathspec_file=tfp_pathspec.name, **kwargs
286+
)
254287

255-
try:
256-
result = loop.run_until_complete(self.async_run(**kwargs))
257-
result = loop.run_until_complete(result.wait())
258-
return result
259-
finally:
260-
loop.close()
288+
pid = self.spm.run_command(
289+
[sys.executable, *command], env=self.env_vars, show_output=show_output
290+
)
291+
command_obj = self.spm.get(pid)
292+
293+
return self.__get_executing_run(tfp_pathspec, command_obj)
294+
295+
def resume(self, show_output: bool = False, **kwargs):
296+
"""
297+
Synchronous resume execution of the run.
298+
This method will *block* until the resumed run has completed execution.
299+
300+
Parameters
301+
----------
302+
show_output : bool, default False
303+
Suppress the 'stdout' and 'stderr' to the console by default.
304+
They can be accessed later by reading the files present in the
305+
ExecutingRun object (referenced as 'result' below) returned:
306+
- result.stdout
307+
- result.stderr
308+
**kwargs : Any
309+
Additional arguments that you would pass to `python ./myflow.py` after
310+
the `resume` command.
311+
312+
Returns
313+
-------
314+
ExecutingRun
315+
ExecutingRun object for this resumed run.
316+
"""
317+
with tempfile.TemporaryDirectory() as temp_dir:
318+
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
319+
command = self.api(**self.top_level_kwargs).resume(
320+
pathspec_file=tfp_pathspec.name, **kwargs
321+
)
322+
323+
pid = self.spm.run_command(
324+
[sys.executable, *command], env=self.env_vars, show_output=show_output
325+
)
326+
command_obj = self.spm.get(pid)
327+
328+
return self.__get_executing_run(tfp_pathspec, command_obj)
261329

262330
async def async_run(self, **kwargs) -> ExecutingRun:
263331
"""
@@ -277,33 +345,48 @@ async def async_run(self, **kwargs) -> ExecutingRun:
277345
"""
278346
with tempfile.TemporaryDirectory() as temp_dir:
279347
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
348+
command = self.api(**self.top_level_kwargs).run(
349+
pathspec_file=tfp_pathspec.name, **kwargs
350+
)
280351

281-
command = self.runner(pathspec_file=tfp_pathspec.name, **kwargs)
282-
283-
pid = await self.spm.run_command(
352+
pid = await self.spm.async_run_command(
284353
[sys.executable, *command], env=self.env_vars
285354
)
286355
command_obj = self.spm.get(pid)
287356

288-
try:
289-
pathspec = read_from_file_when_ready(tfp_pathspec.name, timeout=5)
290-
run_object = Run(pathspec, _namespace_check=False)
291-
return ExecutingRun(self, command_obj, run_object)
292-
except TimeoutError as e:
293-
stdout_log = open(
294-
command_obj.log_files["stdout"], encoding="utf-8"
295-
).read()
296-
stderr_log = open(
297-
command_obj.log_files["stderr"], encoding="utf-8"
298-
).read()
299-
command = " ".join(command_obj.command)
357+
return self.__get_executing_run(tfp_pathspec, command_obj)
358+
359+
async def async_resume(self, **kwargs):
360+
"""
361+
Asynchronous resume execution of the run.
362+
This method will return as soon as the resume has launched.
363+
364+
Parameters
365+
----------
366+
**kwargs : Any
367+
Additional arguments that you would pass to `python ./myflow.py` after
368+
the `resume` command.
369+
370+
Returns
371+
-------
372+
ExecutingRun
373+
ExecutingRun object for this resumed run.
374+
"""
375+
with tempfile.TemporaryDirectory() as temp_dir:
376+
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
377+
command = self.api(**self.top_level_kwargs).resume(
378+
pathspec_file=tfp_pathspec.name, **kwargs
379+
)
300380

301-
error_message = "Error executing: '%s':\n" % command
381+
pid = await self.spm.async_run_command(
382+
[sys.executable, *command], env=self.env_vars
383+
)
384+
command_obj = self.spm.get(pid)
302385

303-
if stdout_log.strip():
304-
error_message += "\nStdout:\n%s\n" % stdout_log
386+
return self.__get_executing_run(tfp_pathspec, command_obj)
305387

306-
if stderr_log.strip():
307-
error_message += "\nStderr:\n%s\n" % stderr_log
388+
def __exit__(self, exc_type, exc_value, traceback):
389+
self.spm.cleanup()
308390

309-
raise RuntimeError(error_message) from e
391+
async def __aexit__(self, exc_type, exc_value, traceback):
392+
self.spm.cleanup()

metaflow/runner/nbrun.py

+131
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import os
2+
import ast
3+
import shutil
4+
import tempfile
5+
from typing import Optional, Dict
6+
from metaflow import Runner
7+
8+
try:
9+
from IPython import get_ipython
10+
11+
ipython = get_ipython()
12+
except ModuleNotFoundError:
13+
print("'nbrun' requires an interactive python environment (such as Jupyter)")
14+
15+
16+
def get_current_cell():
17+
if ipython:
18+
return ipython.history_manager.input_hist_raw[-1]
19+
return None
20+
21+
22+
def format_flowfile(cell):
23+
"""
24+
Formats the given cell content to create a valid Python script that can be executed as a Metaflow flow.
25+
"""
26+
flowspec = [
27+
x
28+
for x in ast.parse(cell).body
29+
if isinstance(x, ast.ClassDef) and any(b.id == "FlowSpec" for b in x.bases)
30+
]
31+
32+
if not flowspec:
33+
raise ModuleNotFoundError(
34+
"The cell doesn't contain any class that inherits from 'FlowSpec'"
35+
)
36+
37+
lines = cell.splitlines()[: flowspec[0].end_lineno]
38+
lines += ["if __name__ == '__main__':", f" {flowspec[0].name}()"]
39+
return "\n".join(lines)
40+
41+
42+
class NBRunner(object):
43+
"""
44+
A class to run Metaflow flows from Jupyter notebook cells.
45+
"""
46+
47+
def __init__(
48+
self,
49+
flow,
50+
profile: Optional[str] = None,
51+
env: Optional[Dict] = None,
52+
base_dir: Optional[str] = None,
53+
**kwargs
54+
):
55+
self.cell = get_current_cell()
56+
self.flow = flow
57+
58+
self.env_vars = os.environ.copy()
59+
self.env_vars.update(env or {})
60+
self.env_vars.update({"JPY_PARENT_PID": ""})
61+
if profile:
62+
self.env_vars["METAFLOW_PROFILE"] = profile
63+
64+
self.base_dir = base_dir
65+
66+
if not self.cell:
67+
raise ValueError("Couldn't find a cell.")
68+
69+
if self.base_dir is None:
70+
# for some reason, using this is much faster
71+
self.tempdir = tempfile.mkdtemp()
72+
else:
73+
self.tempdir = self.base_dir
74+
75+
self.tmp_flow_file = tempfile.NamedTemporaryFile(
76+
prefix=self.flow.__name__,
77+
suffix=".py",
78+
mode="w",
79+
dir=self.tempdir,
80+
delete=False,
81+
)
82+
83+
self.tmp_flow_file.write(format_flowfile(self.cell))
84+
self.tmp_flow_file.flush()
85+
self.tmp_flow_file.close()
86+
87+
self.runner = Runner(
88+
flow_file=self.tmp_flow_file.name,
89+
profile=profile,
90+
env=self.env_vars,
91+
**kwargs
92+
)
93+
94+
def nbrun(self, **kwargs):
95+
result = self.runner.run(show_output=True, **kwargs)
96+
self.runner.spm.cleanup()
97+
return result.run
98+
99+
def nbresume(self, **kwargs):
100+
result = self.runner.resume(show_output=True, **kwargs)
101+
self.runner.spm.cleanup()
102+
return result.run
103+
104+
def cleanup(self):
105+
"""Cleans up the temporary directory used to store the flow script"""
106+
if self.base_dir is None:
107+
shutil.rmtree(self.tempdir, ignore_errors=True)
108+
109+
def run(self, **kwargs):
110+
"""
111+
Runs the flow.
112+
"""
113+
return self.runner.run(**kwargs)
114+
115+
def resume(self, **kwargs):
116+
"""
117+
Resumes the flow.
118+
"""
119+
return self.runner.resume(**kwargs)
120+
121+
async def async_run(self, **kwargs):
122+
"""
123+
Asynchronously runs the flow.
124+
"""
125+
return await self.runner.async_run(**kwargs)
126+
127+
async def async_resume(self, **kwargs):
128+
"""
129+
Asynchronously resumes the flow.
130+
"""
131+
return await self.runner.async_resume(**kwargs)

0 commit comments

Comments
 (0)