Skip to content

Commit 9ad1c0a

Browse files
committed
resume and async resume
1 parent 2d1520d commit 9ad1c0a

File tree

1 file changed

+101
-42
lines changed

1 file changed

+101
-42
lines changed

metaflow/runner/metaflow_runner.py

+101-42
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ def __init__(
217217
if profile:
218218
self.env_vars["METAFLOW_PROFILE"] = profile
219219
self.spm = SubprocessManager()
220+
self.top_level_kwargs = kwargs
220221
self.api = MetaflowAPI.from_cli(self.flow_file, start)
221222

222223
def __enter__(self) -> "Runner":
@@ -225,19 +226,35 @@ def __enter__(self) -> "Runner":
225226
async def __aenter__(self) -> "Runner":
226227
return self
227228

228-
def __exit__(self, exc_type, exc_value, traceback):
229-
self.spm.cleanup()
230-
231-
async def __aexit__(self, exc_type, exc_value, traceback):
232-
self.spm.cleanup()
233-
234-
def run(self, **kwargs) -> ExecutingRun:
229+
def __get_executing_run(self, tfp_pathspec, command_obj):
230+
try:
231+
pathspec = read_from_file_when_ready(tfp_pathspec.name, timeout=5)
232+
run_object = Run(pathspec, _namespace_check=False)
233+
return ExecutingRun(self, command_obj, run_object)
234+
except TimeoutError as e:
235+
stdout_log = open(command_obj.log_files["stdout"]).read()
236+
stderr_log = open(command_obj.log_files["stderr"]).read()
237+
command = " ".join(command_obj.command)
238+
error_message = "Error executing: '%s':\n" % command
239+
if stdout_log.strip():
240+
error_message += "\nStdout:\n%s\n" % stdout_log
241+
if stderr_log.strip():
242+
error_message += "\nStderr:\n%s\n" % stderr_log
243+
raise RuntimeError(error_message) from e
244+
245+
def run(self, show_output: bool = False, **kwargs) -> ExecutingRun:
235246
"""
236247
Synchronous execution of the run. This method will *block* until
237248
the run has completed execution.
238249
239250
Parameters
240251
----------
252+
show_output : bool, default False
253+
Suppress the 'stdout' and 'stderr' to the console by default.
254+
They can be accessed later by reading the files present in the
255+
ExecutingRun object (referenced as 'result' below) returned:
256+
- result.stdout
257+
- result.stderr
241258
**kwargs : Any
242259
Additional arguments that you would pass to `python ./myflow.py` after
243260
the `run` command.
@@ -249,25 +266,51 @@ def run(self, **kwargs) -> ExecutingRun:
249266
"""
250267
with tempfile.TemporaryDirectory() as temp_dir:
251268
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
252-
command = self.api.run(pathspec_file=tfp_pathspec.name, **kwargs)
269+
command = self.api(**self.top_level_kwargs).run(
270+
pathspec_file=tfp_pathspec.name, **kwargs
271+
)
253272

254-
pid = self.spm.run_command([sys.executable, *command], env=self.env_vars)
273+
pid = self.spm.run_command(
274+
[sys.executable, *command], env=self.env_vars, show_output=show_output
275+
)
255276
command_obj = self.spm.get(pid)
256277

257-
try:
258-
pathspec = read_from_file_when_ready(tfp_pathspec.name, timeout=5)
259-
run_object = Run(pathspec, _namespace_check=False)
260-
return ExecutingRun(self, command_obj, run_object)
261-
except TimeoutError as e:
262-
stdout_log = open(command_obj.log_files["stdout"]).read()
263-
stderr_log = open(command_obj.log_files["stderr"]).read()
264-
command = " ".join(command_obj.command)
265-
error_message = "Error executing: '%s':\n" % command
266-
if stdout_log.strip():
267-
error_message += "\nStdout:\n%s\n" % stdout_log
268-
if stderr_log.strip():
269-
error_message += "\nStderr:\n%s\n" % stderr_log
270-
raise RuntimeError(error_message) from e
278+
return self.__get_executing_run(tfp_pathspec, command_obj)
279+
280+
def resume(self, show_output: bool = False, **kwargs):
281+
"""
282+
Synchronous resume execution of the run.
283+
This method will *block* until the resumed run has completed execution.
284+
285+
Parameters
286+
----------
287+
show_output : bool, default False
288+
Suppress the 'stdout' and 'stderr' to the console by default.
289+
They can be accessed later by reading the files present in the
290+
ExecutingRun object (referenced as 'result' below) returned:
291+
- result.stdout
292+
- result.stderr
293+
**kwargs : Any
294+
Additional arguments that you would pass to `python ./myflow.py` after
295+
the `resume` command.
296+
297+
Returns
298+
-------
299+
ExecutingRun
300+
ExecutingRun object for this resumed run.
301+
"""
302+
with tempfile.TemporaryDirectory() as temp_dir:
303+
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
304+
command = self.api(**self.top_level_kwargs).resume(
305+
pathspec_file=tfp_pathspec.name, **kwargs
306+
)
307+
308+
pid = self.spm.run_command(
309+
[sys.executable, *command], env=self.env_vars, show_output=show_output
310+
)
311+
command_obj = self.spm.get(pid)
312+
313+
return self.__get_executing_run(tfp_pathspec, command_obj)
271314

272315
async def async_run(self, **kwargs) -> ExecutingRun:
273316
"""
@@ -287,32 +330,48 @@ async def async_run(self, **kwargs) -> ExecutingRun:
287330
"""
288331
with tempfile.TemporaryDirectory() as temp_dir:
289332
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
290-
command = self.api.run(pathspec_file=tfp_pathspec.name, **kwargs)
333+
command = self.api(**self.top_level_kwargs).run(
334+
pathspec_file=tfp_pathspec.name, **kwargs
335+
)
291336

292337
pid = await self.spm.async_run_command(
293338
[sys.executable, *command], env=self.env_vars
294339
)
295340
command_obj = self.spm.get(pid)
296341

297-
try:
298-
pathspec = read_from_file_when_ready(tfp_pathspec.name, timeout=5)
299-
run_object = Run(pathspec, _namespace_check=False)
300-
return ExecutingRun(self, command_obj, run_object)
301-
except TimeoutError as e:
302-
stdout_log = open(
303-
command_obj.log_files["stdout"], encoding="utf-8"
304-
).read()
305-
stderr_log = open(
306-
command_obj.log_files["stderr"], encoding="utf-8"
307-
).read()
308-
command = " ".join(command_obj.command)
342+
return self.__get_executing_run(tfp_pathspec, command_obj)
309343

310-
error_message = "Error executing: '%s':\n" % command
344+
async def async_resume(self, **kwargs):
345+
"""
346+
Asynchronous resume execution of the run.
347+
This method will return as soon as the resume has launched.
311348
312-
if stdout_log.strip():
313-
error_message += "\nStdout:\n%s\n" % stdout_log
349+
Parameters
350+
----------
351+
**kwargs : Any
352+
Additional arguments that you would pass to `python ./myflow.py` after
353+
the `resume` command.
314354
315-
if stderr_log.strip():
316-
error_message += "\nStderr:\n%s\n" % stderr_log
355+
Returns
356+
-------
357+
ExecutingRun
358+
ExecutingRun object for this resumed run.
359+
"""
360+
with tempfile.TemporaryDirectory() as temp_dir:
361+
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
362+
command = self.api(**self.top_level_kwargs).resume(
363+
pathspec_file=tfp_pathspec.name, **kwargs
364+
)
365+
366+
pid = await self.spm.async_run_command(
367+
[sys.executable, *command], env=self.env_vars
368+
)
369+
command_obj = self.spm.get(pid)
317370

318-
raise RuntimeError(error_message) from e
371+
return self.__get_executing_run(tfp_pathspec, command_obj)
372+
373+
def __exit__(self, exc_type, exc_value, traceback):
374+
self.spm.cleanup()
375+
376+
async def __aexit__(self, exc_type, exc_value, traceback):
377+
self.spm.cleanup()

0 commit comments

Comments
 (0)