Skip to content

Commit 5cabd10

Browse files
author
Nissan Pow
committed
port 5 NFLX tests to OSS: build_env_conda, resources_cpu/gpu, config_from_deployment, config_parser
Adapted from mf-6 (branch npow/ux-test-compute-env-refactor): - test_build_env_conda: conda env built at runtime within a step - test_resources_cpu: @resources(cpu=N, memory=N) on scheduler backends - test_resources_gpu: @resources(gpu=1) deployment (skipped, no GPU nodes) - test_config_from_deployment: Config + DeployedFlow.from_deployment() - test_config_parser_flow_default: Config with requirements_txt_parser Adaptations: @titus -> @resources, MaestroDeployedFlow -> DeployedFlow, send_signals -> standard test harness patterns.
1 parent be18a0c commit 5cabd10

8 files changed

Lines changed: 383 additions & 1 deletion
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from metaflow import FlowSpec, step, resources, project
2+
3+
4+
@project(name="hello_resources_cpu")
5+
class ResourcesCpuFlow(FlowSpec):
6+
7+
@step
8+
def start(self):
9+
from metaflow import metaflow_version
10+
11+
print(f"In start step and using metaflow: {metaflow_version.get_version()}")
12+
print("ResourcesCpuFlow is starting.")
13+
self.next(
14+
self.default,
15+
self.cpu2,
16+
self.cpu4,
17+
)
18+
19+
@resources()
20+
@step
21+
def default(self):
22+
self.next(self.join)
23+
24+
@resources(cpu=2)
25+
@step
26+
def cpu2(self):
27+
self.next(self.join)
28+
29+
@resources(cpu=4, memory=8000)
30+
@step
31+
def cpu4(self):
32+
self.next(self.join)
33+
34+
@step
35+
def join(self, inputs):
36+
self.next(self.end)
37+
38+
@step
39+
def end(self):
40+
self.message = "Metaflow says: Hi Resources CPU!"
41+
print("ResourcesCpuFlow is all done.")
42+
43+
44+
if __name__ == "__main__":
45+
ResourcesCpuFlow()
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from metaflow import FlowSpec, step, resources, project
2+
3+
4+
@project(name="hello_resources_gpu")
5+
class ResourcesGpuFlow(FlowSpec):
6+
7+
@step
8+
def start(self):
9+
from metaflow import metaflow_version
10+
11+
print(f"In start step and using metaflow: {metaflow_version.get_version()}")
12+
print("ResourcesGpuFlow is starting.")
13+
self.next(self.gpu_step)
14+
15+
@resources(gpu=1)
16+
@step
17+
def gpu_step(self):
18+
# No actual GPU validation — devstack has no GPU nodes.
19+
# This test verifies the @resources(gpu=N) decorator compiles
20+
# and deploys successfully on each scheduler backend.
21+
self.gpu_requested = True
22+
self.next(self.end)
23+
24+
@step
25+
def end(self):
26+
self.message = "Metaflow says: Hi Resources GPU!"
27+
print("ResourcesGpuFlow is all done.")
28+
29+
30+
if __name__ == "__main__":
31+
ResourcesGpuFlow()
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import subprocess
2+
import sys
3+
import tempfile
4+
5+
from metaflow import (
6+
FlowSpec,
7+
Parameter,
8+
conda,
9+
current,
10+
named_env,
11+
project,
12+
step,
13+
)
14+
15+
16+
def trigger_name_func(context):
17+
return [current.project_flow_name + "Trigger"]
18+
19+
20+
@project(name="selfbuild")
21+
class BuildCondaEnvInStep(FlowSpec):
22+
my_var = Parameter(
23+
"my_var",
24+
default=123,
25+
external_artifact=trigger_name_func,
26+
external_trigger=True,
27+
)
28+
29+
@conda(disabled=True)
30+
@step
31+
def start(self):
32+
from metaflow import metaflow_version
33+
34+
print(f"In start step and using metaflow: {metaflow_version.get_version()}")
35+
full_run_id = current.run_id
36+
if "-" in full_run_id:
37+
base_run_id = full_run_id.split("-")[1]
38+
else:
39+
base_run_id = full_run_id
40+
with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8") as req_file:
41+
req_file.write("itsdangerous==2.1.2")
42+
req_file.flush()
43+
subprocess.check_call(
44+
[
45+
sys.executable,
46+
"-m",
47+
"metaflow.cmd.main_cli",
48+
"environment",
49+
"resolve",
50+
"--alias",
51+
"mlp/metaflow/test/build_in_step/id_%s_%s_%s"
52+
% (current.run_id, base_run_id, str(self.my_var)),
53+
"-r",
54+
req_file.name,
55+
"--python",
56+
"3.8.*",
57+
]
58+
)
59+
print(
60+
"Build environment and aliased using mlp/metaflow/test/build_in_step/id_%s_%s_%s"
61+
% (current.run_id, base_run_id, str(self.my_var))
62+
)
63+
self.next(self.fetch_old)
64+
65+
@conda(
66+
name="mlp/metaflow/test/build_in_step/id_@{METAFLOW_RUN_ID}_@{METAFLOW_RUN_ID_BASE}_@{METAFLOW_INIT_MY_VAR}",
67+
fetch_at_exec=True,
68+
)
69+
@step
70+
def fetch_old(self):
71+
import itsdangerous
72+
73+
print("Imported itsdangerous and found version %s" % itsdangerous.__version__)
74+
self.found_version_old = itsdangerous.__version__
75+
self.next(self.end)
76+
77+
@named_env(
78+
name="mlp/metaflow/test/build_in_step/id_@{METAFLOW_RUN_ID}_@{METAFLOW_RUN_ID_BASE}_@{METAFLOW_INIT_MY_VAR}",
79+
fetch_at_exec=True,
80+
)
81+
@step
82+
def end(self):
83+
import itsdangerous
84+
85+
print("Imported itsdangerous and found version %s" % itsdangerous.__version__)
86+
self.found_version = itsdangerous.__version__
87+
88+
89+
if __name__ == "__main__":
90+
BuildCondaEnvInStep()
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from metaflow import (
2+
Config,
3+
FlowSpec,
4+
Parameter,
5+
config_expr,
6+
current,
7+
project,
8+
pypi_base,
9+
requirements_txt_parser,
10+
step,
11+
)
12+
13+
default_config = {"project_name": "config_parser"}
14+
15+
16+
def trigger_name_func(ctx):
17+
return [current.project_flow_name + "Trigger"]
18+
19+
20+
@project(name=config_expr("cfg.project_name"))
21+
@pypi_base(**config_expr("req_config"))
22+
class ConfigParser(FlowSpec):
23+
24+
trigger_param = Parameter(
25+
"trigger_param",
26+
default="",
27+
external_trigger=True,
28+
external_artifact=trigger_name_func,
29+
)
30+
cfg = Config("cfg", default_value=default_config)
31+
32+
req_config = Config(
33+
"req_config",
34+
default="flows/config/config_parser_requirements.txt",
35+
parser=requirements_txt_parser,
36+
)
37+
38+
@step
39+
def start(self):
40+
from metaflow import metaflow_version
41+
42+
print(f"In start step and using metaflow: {metaflow_version.get_version()}")
43+
import regex
44+
45+
self.lib_version = regex.__version__
46+
self.next(self.end)
47+
48+
@step
49+
def end(self):
50+
pass
51+
52+
53+
if __name__ == "__main__":
54+
ConfigParser()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
python==3.10.*
2+
regex==2024.11.6
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from metaflow import FlowSpec, Config, step, config_expr, project
2+
3+
default_config = {"batch_size": 32, "packages": {"pandas": "2.2.3"}}
4+
5+
6+
@project(name="hello_from_deployment_config")
7+
class HelloFromDeploymentFlowConfig(FlowSpec):
8+
"""Simple flow combining config, config_expr, and from_deployment testing."""
9+
10+
simple_config = Config("simple_config", default_value=default_config)
11+
12+
@step
13+
def start(self):
14+
from metaflow import metaflow_version
15+
16+
print(f"In start step and using metaflow: {metaflow_version.get_version()}")
17+
self.batch_size = self.simple_config.batch_size
18+
self.next(self.end)
19+
20+
@step
21+
def end(self):
22+
pass
23+
24+
25+
if __name__ == "__main__":
26+
HelloFromDeploymentFlowConfig()

test/ux/core/test_basic.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
execute_test_flow,
77
deploy_flow_to_scheduler,
88
wait_for_deployed_run,
9+
wait_for_deployed_run_allow_failure,
910
verify_run_provenance,
1011
)
1112

@@ -172,3 +173,63 @@ def test_hello_conda(exec_mode, decospecs, compute_env, tag, scheduler_config):
172173
assert (
173174
run["combo"].task.data.itsdangerous_version == "2.2.0"
174175
), "itsdangerous version incorrect"
176+
177+
178+
@pytest.mark.conda
179+
def test_build_env_conda(exec_mode, decospecs, compute_env, tag, scheduler_config):
180+
"""Verify that a conda environment can be built at runtime within a step."""
181+
run = execute_test_flow(
182+
flow_name="basic/self_building_env.py",
183+
exec_mode=exec_mode,
184+
decospecs=decospecs,
185+
tag=tag,
186+
scheduler_config=scheduler_config,
187+
test_name="build_env_conda",
188+
tl_args_extra={
189+
"environment": "conda",
190+
"env": compute_env,
191+
},
192+
)
193+
194+
assert run.successful, "Run was not successful"
195+
assert (
196+
run["fetch_old"].task.data.found_version_old == "2.1.2"
197+
), "fetch_old version incorrect"
198+
assert run["end"].task.data.found_version == "2.1.2", "end version incorrect"
199+
200+
201+
@pytest.mark.scheduler_only
202+
def test_resources_cpu(exec_mode, decospecs, compute_env, tag, scheduler_config):
203+
"""Verify @resources(cpu=N, memory=N) deploys and runs on each scheduler backend."""
204+
run = execute_test_flow(
205+
flow_name="basic/resources_cpu_flow.py",
206+
exec_mode=exec_mode,
207+
decospecs=decospecs,
208+
tag=tag,
209+
scheduler_config=scheduler_config,
210+
test_name="resources_cpu",
211+
tl_args_extra={"env": compute_env},
212+
)
213+
214+
assert run.successful, "Run was not successful"
215+
assert (
216+
run["end"].task.data.message == "Metaflow says: Hi Resources CPU!"
217+
), "Message didn't match"
218+
219+
220+
@pytest.mark.scheduler_only
221+
@pytest.mark.skip(reason="devstack has no GPU nodes")
222+
def test_resources_gpu(exec_mode, decospecs, compute_env, tag, scheduler_config):
223+
"""Verify @resources(gpu=1) deploys successfully (no actual GPU validation)."""
224+
run = execute_test_flow(
225+
flow_name="basic/resources_gpu_flow.py",
226+
exec_mode=exec_mode,
227+
decospecs=decospecs,
228+
tag=tag,
229+
scheduler_config=scheduler_config,
230+
test_name="resources_gpu",
231+
tl_args_extra={"env": compute_env},
232+
)
233+
234+
assert run.successful, "Run was not successful"
235+
assert run["gpu_step"].task.data.gpu_requested is True

0 commit comments

Comments
 (0)