Skip to content

Commit 8cb6d8b

Browse files
committed
return ExecutingRun encapsulating CommandManager and metaflow.Run
1 parent 7db25ee commit 8cb6d8b

File tree

2 files changed

+38
-19
lines changed

2 files changed

+38
-19
lines changed

metaflow/metaflow_runner.py

+33-17
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
import os
22
import sys
3-
import shutil
43
import asyncio
54
import tempfile
65
import aiofiles
76
from typing import Dict
87
from metaflow import Run
98
from metaflow.cli import start
109
from metaflow.click_api import MetaflowAPI
11-
from metaflow.subprocess_manager import SubprocessManager
10+
from metaflow.subprocess_manager import SubprocessManager, CommandManager
1211

1312

1413
async def read_from_file_when_ready(file_path):
@@ -20,6 +19,28 @@ async def read_from_file_when_ready(file_path):
2019
return content
2120

2221

22+
class ExecutingRun(object):
23+
def __init__(self, command_obj: CommandManager, run_obj: Run) -> None:
24+
self.command_obj = command_obj
25+
self.run_obj = run_obj
26+
27+
def __getattr__(self, name: str):
28+
if hasattr(self.run_obj, name):
29+
run_attr = getattr(self.run_obj, name)
30+
if callable(run_attr):
31+
return lambda *args, **kwargs: run_attr(*args, **kwargs)
32+
else:
33+
return run_attr
34+
elif hasattr(self.command_obj, name):
35+
command_attr = getattr(self.command_obj, name)
36+
if callable(command_attr):
37+
return lambda *args, **kwargs: command_attr(*args, **kwargs)
38+
else:
39+
return command_attr
40+
else:
41+
raise AttributeError(f"Invalid attribute {name}")
42+
43+
2344
class Runner(object):
2445
def __init__(
2546
self,
@@ -29,17 +50,14 @@ def __init__(
2950
):
3051
self.flow_file = flow_file
3152
self.env_vars = os.environ.copy().update(env)
32-
self.spm = SubprocessManager(env=self.env_vars)
53+
self.spm = SubprocessManager()
3354
self.api = MetaflowAPI.from_cli(self.flow_file, start)
3455
self.runner = self.api(**kwargs).run
3556

36-
def __enter__(self):
57+
async def __aenter__(self):
3758
return self
3859

39-
async def tail_logs(self, stream="stdout"):
40-
await self.spm.get_logs(stream)
41-
42-
async def run(self, blocking: bool = False, **kwargs):
60+
async def run(self, **kwargs):
4361
with tempfile.TemporaryDirectory() as temp_dir:
4462
tfp_flow = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
4563
tfp_run_id = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
@@ -48,20 +66,18 @@ async def run(self, blocking: bool = False, **kwargs):
4866
run_id_file=tfp_run_id.name, flow_name_file=tfp_flow.name, **kwargs
4967
)
5068

51-
process = await self.spm.run_command([sys.executable, *command.split()])
52-
53-
if blocking:
54-
await process.wait()
69+
command_id = await self.spm.run_command(
70+
[sys.executable, *command], env=self.env_vars
71+
)
72+
command_obj = self.spm.get(command_id)
5573

5674
flow_name = await read_from_file_when_ready(tfp_flow.name)
5775
run_id = await read_from_file_when_ready(tfp_run_id.name)
5876

5977
pathspec_components = (flow_name, run_id)
6078
run_object = Run("/".join(pathspec_components), _namespace_check=False)
6179

62-
self.run = run_object
63-
64-
return run_object
80+
return ExecutingRun(command_obj, run_object)
6581

66-
def __exit__(self, exc_type, exc_value, traceback):
67-
shutil.rmtree(self.spm.temp_dir, ignore_errors=True)
82+
async def __aexit__(self, exc_type, exc_value, traceback):
83+
await self.spm.cleanup()

metaflow/subprocess_manager.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ async def __aenter__(self):
2929
return self
3030

3131
async def __aexit__(self, exc_type, exc_value, traceback):
32-
for _, v in self.commands.items():
33-
await v.cleanup()
32+
await self.cleanup()
3433

3534
async def run_command(self, command: List[str], env=None, cwd=None):
3635
command_id = hash_command_invocation(command)
@@ -41,6 +40,10 @@ async def run_command(self, command: List[str], env=None, cwd=None):
4140
def get(self, command_id: str) -> "CommandManager":
4241
return self.commands.get(command_id, None)
4342

43+
async def cleanup(self):
44+
for _, v in self.commands.items():
45+
await v.cleanup()
46+
4447

4548
class CommandManager(object):
4649
def __init__(self, command: List[str], env=None, cwd=None):

0 commit comments

Comments
 (0)