Skip to content

Commit b877d47

Browse files
committed
return Run objects
1 parent cc17ade commit b877d47

File tree

2 files changed

+105
-38
lines changed

2 files changed

+105
-38
lines changed

metaflow/cli.py

+19-6
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,13 @@ def common_run_options(func):
686686
type=str,
687687
help="Write the ID of this run to the file specified.",
688688
)
689+
@click.option(
690+
"--flow-name-file",
691+
default=None,
692+
show_default=True,
693+
type=str,
694+
help="Write the flow name of this run to the file specified.",
695+
)
689696
@wraps(func)
690697
def wrapper(*args, **kwargs):
691698
return func(*args, **kwargs)
@@ -736,6 +743,7 @@ def resume(
736743
max_log_size=None,
737744
decospecs=None,
738745
run_id_file=None,
746+
flow_name_file=None,
739747
):
740748
before_run(obj, tags, decospecs + obj.environment.decospecs())
741749

@@ -790,7 +798,9 @@ def resume(
790798
max_num_splits=max_num_splits,
791799
max_log_size=max_log_size * 1024 * 1024,
792800
)
793-
write_run_id(run_id_file, runtime.run_id)
801+
write_file(run_id_file, runtime.run_id)
802+
write_file(flow_name_file, obj.flow.name)
803+
794804
runtime.persist_constants()
795805
runtime.execute()
796806

@@ -819,6 +829,7 @@ def run(
819829
max_log_size=None,
820830
decospecs=None,
821831
run_id_file=None,
832+
flow_name_file=None,
822833
user_namespace=None,
823834
**kwargs
824835
):
@@ -842,17 +853,19 @@ def run(
842853
max_log_size=max_log_size * 1024 * 1024,
843854
)
844855
write_latest_run_id(obj, runtime.run_id)
845-
write_run_id(run_id_file, runtime.run_id)
856+
write_file(run_id_file, runtime.run_id)
857+
write_file(flow_name_file, obj.flow.name)
846858

847859
obj.flow._set_constants(obj.graph, kwargs)
848860
runtime.persist_constants()
849861
runtime.execute()
850862

851863

852-
def write_run_id(run_id_file, run_id):
853-
if run_id_file is not None:
854-
with open(run_id_file, "w") as f:
855-
f.write(str(run_id))
864+
def write_file(file_path, content):
865+
if file_path is not None:
866+
with open(file_path, "w") as f:
867+
f.write(str(content))
868+
f.close()
856869

857870

858871
def before_run(obj, tags, decospecs):

metaflow/metaflow_runner.py

+86-32
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,117 @@
1+
import os
2+
import sys
3+
import time
14
import asyncio
5+
import tempfile
6+
import subprocess
7+
from metaflow import Run
8+
from typing import List, Dict
29
from concurrent.futures import ProcessPoolExecutor
3-
import subprocess, sys, os
4-
from typing import List, Dict, Optional
510

611

7-
def convert_params_to_cli_args(params: List[Dict]):
8-
converted_params = [item for pair in params.items() for item in pair]
9-
converted_params = [
10-
str(val) if idx % 2 else f"--{val}" for idx, val in enumerate(converted_params)
11-
]
12-
return converted_params
12+
def cli_runner(flow_file: str, command: str, args: List[str], env_vars: Dict):
13+
process = subprocess.Popen(
14+
[sys.executable, flow_file, command, *args],
15+
stdout=subprocess.PIPE,
16+
stderr=subprocess.PIPE,
17+
env=env_vars,
18+
)
19+
return process
1320

1421

15-
def cli_runner(flow_file: str, command: str, args: List[str], env_vars: Dict):
16-
result = subprocess.run([sys.executable, flow_file, command, *args], env=env_vars)
17-
return result.returncode
22+
def read_from_file_when_ready(file_pointer):
23+
content = file_pointer.read().decode()
24+
while not content:
25+
time.sleep(0.1)
26+
content = file_pointer.read().decode()
27+
return content
1828

1929

2030
class Pool(object):
2131
def __init__(
2232
self,
2333
flow_file: str,
2434
num_processes: int,
25-
profile: Optional[str] = None,
26-
with_context: Optional[str] = None,
2735
):
2836
self.flow_file = flow_file
2937
self.num_processes = num_processes
30-
self.profile = profile
31-
self.with_context = with_context
38+
self.runner = Runner(self.flow_file)
3239

33-
def map(self, params: List[Dict], command: str = "run"):
34-
return asyncio.run(self._map(params, command))
40+
def __enter__(self):
41+
return self
3542

36-
async def _map(self, params: List[Dict], command: str = "run"):
37-
self.command = command
43+
def map(self, paramater_space: List[Dict], blocking: bool = False):
44+
return asyncio.run(self._map(paramater_space, blocking))
45+
46+
async def _map(self, paramater_space: List[Dict], blocking: bool = False):
3847
loop = asyncio.get_running_loop()
3948
tasks, runs = [], []
4049
with ProcessPoolExecutor(max_workers=self.num_processes) as executor:
41-
for each_param in params:
42-
tasks.append(loop.run_in_executor(executor, self._execute, each_param))
50+
for set_of_params in paramater_space:
51+
tasks.append(
52+
loop.run_in_executor(
53+
executor, self.runner.run, set_of_params, blocking
54+
)
55+
)
4356
for done in asyncio.as_completed(tasks):
4457
runs.append(await done)
4558

4659
return runs
4760

48-
def _execute(self, params: List[Dict]):
49-
env_vars = os.environ.copy()
50-
if self.profile:
51-
env_vars.update({"METAFLOW_PROFILE": self.profile})
52-
params = convert_params_to_cli_args(params)
53-
print(f"Starting for {params}")
54-
result = cli_runner(self.flow_file, self.command, params, env_vars)
55-
print(f"Result ready for {params}")
56-
return {" ".join(params): result}
61+
def __exit__(self, exc_type, exc_value, traceback):
62+
pass
63+
64+
65+
# consider more args in constructor,
66+
# list of them is available using:
67+
# `python ../try.py --help`
68+
class Runner(object):
69+
def __init__(
70+
self,
71+
flow_file: str,
72+
):
73+
self.flow_file = flow_file
5774

5875
def __enter__(self):
59-
print("Start....")
6076
return self
6177

78+
# consider more args for run method,
79+
# list of them is available using:
80+
# `python ../try.py run --help`
81+
def run(
82+
self,
83+
params: Dict, # eventually, parse the file for parameters? contains stuff like {'alpha': 30}
84+
blocking: bool = False,
85+
):
86+
env_vars = os.environ.copy()
87+
88+
params_as_cli_args = []
89+
for (k, v) in params.items():
90+
params_as_cli_args.extend(["--" + str(k), str(v)])
91+
92+
with tempfile.TemporaryDirectory() as temp_dir:
93+
tfp_flow_name = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
94+
tfp_run_id = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
95+
96+
params_as_cli_args.extend(["--run-id-file", tfp_run_id.name])
97+
params_as_cli_args.extend(["--flow-name-file", tfp_flow_name.name])
98+
99+
process = cli_runner(
100+
self.flow_file,
101+
command="run",
102+
args=params_as_cli_args,
103+
env_vars=env_vars,
104+
)
105+
if blocking:
106+
process.wait()
107+
108+
flow_name = read_from_file_when_ready(tfp_flow_name)
109+
run_id = read_from_file_when_ready(tfp_run_id)
110+
111+
pathspec_components = (flow_name, run_id)
112+
run_object = Run("/".join(pathspec_components), _namespace_check=False)
113+
114+
return run_object
115+
62116
def __exit__(self, exc_type, exc_value, traceback):
63-
print("End...")
117+
pass

0 commit comments

Comments
 (0)