Skip to content

Commit cc17ade

Browse files
committed
initial runner map api
1 parent d95e7b6 commit cc17ade

File tree

1 file changed

+63
-0
lines changed

1 file changed

+63
-0
lines changed

metaflow/metaflow_runner.py

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import asyncio
2+
from concurrent.futures import ProcessPoolExecutor
3+
import subprocess, sys, os
4+
from typing import List, Dict, Optional
5+
6+
7+
def convert_params_to_cli_args(params: List[Dict]):
8+
converted_params = [item for pair in params.items() for item in pair]
9+
converted_params = [
10+
str(val) if idx % 2 else f"--{val}" for idx, val in enumerate(converted_params)
11+
]
12+
return converted_params
13+
14+
15+
def cli_runner(flow_file: str, command: str, args: List[str], env_vars: Dict):
16+
result = subprocess.run([sys.executable, flow_file, command, *args], env=env_vars)
17+
return result.returncode
18+
19+
20+
class Pool(object):
21+
def __init__(
22+
self,
23+
flow_file: str,
24+
num_processes: int,
25+
profile: Optional[str] = None,
26+
with_context: Optional[str] = None,
27+
):
28+
self.flow_file = flow_file
29+
self.num_processes = num_processes
30+
self.profile = profile
31+
self.with_context = with_context
32+
33+
def map(self, params: List[Dict], command: str = "run"):
34+
return asyncio.run(self._map(params, command))
35+
36+
async def _map(self, params: List[Dict], command: str = "run"):
37+
self.command = command
38+
loop = asyncio.get_running_loop()
39+
tasks, runs = [], []
40+
with ProcessPoolExecutor(max_workers=self.num_processes) as executor:
41+
for each_param in params:
42+
tasks.append(loop.run_in_executor(executor, self._execute, each_param))
43+
for done in asyncio.as_completed(tasks):
44+
runs.append(await done)
45+
46+
return runs
47+
48+
def _execute(self, params: List[Dict]):
49+
env_vars = os.environ.copy()
50+
if self.profile:
51+
env_vars.update({"METAFLOW_PROFILE": self.profile})
52+
params = convert_params_to_cli_args(params)
53+
print(f"Starting for {params}")
54+
result = cli_runner(self.flow_file, self.command, params, env_vars)
55+
print(f"Result ready for {params}")
56+
return {" ".join(params): result}
57+
58+
def __enter__(self):
59+
print("Start....")
60+
return self
61+
62+
def __exit__(self, exc_type, exc_value, traceback):
63+
print("End...")

0 commit comments

Comments
 (0)