Skip to content

Commit 0c063ee

Browse files
talsperreclaude
andcommitted
Add composition tests for single-step flows
Cover the scenarios raised in PR review for @step(start=True, end=True) single-step flows: Config descriptors, stacked step decorators, and FlowMutators. Unit tests (test/unit/test_graph_structure.py): - Config descriptor is registered via _get_parameters. - @Retry + @resources stack correctly on the only step. - FlowMutator registers at class-definition time (pre_mutate only fires via the CLI layer, so execution is covered by the integration test). Integration tests (test/unit/graph_inference/, new flow files + fixtures): - Config-bearing single-step flow runs end-to-end; config value flows through to the end task's artifact. - Stacked @retry/@resources flow runs; _graph_info records both decorators on the only step. - FlowMutator-decorated flow runs; pre_mutate lands @Retry on the sole step. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 983691a commit 0c063ee

6 files changed

Lines changed: 199 additions & 1 deletion

File tree

test/unit/graph_inference/conftest.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,23 @@ def flow_fixture(request):
3535
custom_named_card_run = pytest.fixture(scope="session")(
3636
create_flow_fixture("CustomNamedCardFlow", "custom_named_card_flow.py")
3737
)
38+
39+
single_step_with_config_run = pytest.fixture(scope="session")(
40+
create_flow_fixture(
41+
"SingleStepWithConfigFlow", "single_step_with_config_flow.py"
42+
)
43+
)
44+
45+
single_step_with_stacked_decos_run = pytest.fixture(scope="session")(
46+
create_flow_fixture(
47+
"SingleStepWithStackedDecosFlow",
48+
"single_step_with_stacked_decos_flow.py",
49+
)
50+
)
51+
52+
single_step_with_flow_mutator_run = pytest.fixture(scope="session")(
53+
create_flow_fixture(
54+
"SingleStepWithFlowMutatorFlow",
55+
"single_step_with_flow_mutator_flow.py",
56+
)
57+
)
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from metaflow import FlowSpec, step, Config
2+
3+
4+
class SingleStepWithConfigFlow(FlowSpec):
5+
"""Single-step flow wired to a Config, verifies Configs resolve end-to-end."""
6+
7+
cfg = Config("cfg", default_value={"x": 7})
8+
9+
@step(start=True, end=True)
10+
def only(self):
11+
self.v = self.cfg["x"]
12+
13+
14+
if __name__ == "__main__":
15+
SingleStepWithConfigFlow()
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from metaflow import FlowSpec, step, retry, FlowMutator
2+
3+
4+
class AddRetryToOnly(FlowMutator):
5+
"""Adds @retry to every step. Verifies mutators reach the sole step."""
6+
7+
def pre_mutate(self, mutable_flow):
8+
for _, s in mutable_flow.steps:
9+
s.add_decorator(retry, deco_kwargs={"times": 1})
10+
11+
12+
@AddRetryToOnly
13+
class SingleStepWithFlowMutatorFlow(FlowSpec):
14+
"""Single-step flow with a FlowMutator applied at the class level."""
15+
16+
@step(start=True, end=True)
17+
def only(self):
18+
self.v = 1
19+
20+
21+
if __name__ == "__main__":
22+
SingleStepWithFlowMutatorFlow()
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from metaflow import FlowSpec, step, retry, resources
2+
3+
4+
class SingleStepWithStackedDecosFlow(FlowSpec):
5+
"""Single-step flow with @retry and @resources stacked on top of @step."""
6+
7+
@retry(times=1)
8+
@resources(cpu=1, memory=256)
9+
@step(start=True, end=True)
10+
def only(self):
11+
self.v = 42
12+
13+
14+
if __name__ == "__main__":
15+
SingleStepWithStackedDecosFlow()

test/unit/graph_inference/test_graph_inference.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,3 +246,50 @@ def test_default_card_includes_custom_graph_endpoints(custom_named_card_run):
246246
assert set(dag_data["steps"]) == {"begin", "middle", "finish"}
247247
assert "start" not in dag_data["steps"]
248248
assert "end" not in dag_data["steps"]
249+
250+
251+
# ---------------------------------------------------------------------------
252+
# Composition: single-step flows with Config, stacked decorators, FlowMutator
253+
# ---------------------------------------------------------------------------
254+
255+
256+
def test_single_step_with_config_completes(single_step_with_config_run):
257+
"""Config-bearing single-step flow runs to completion."""
258+
assert single_step_with_config_run.successful
259+
assert single_step_with_config_run.finished
260+
261+
262+
def test_single_step_with_config_value_flows_to_artifact(single_step_with_config_run):
263+
"""Config descriptor value is readable from the end task's artifact."""
264+
end_task = single_step_with_config_run.end_task
265+
assert end_task["v"].data == 7
266+
267+
268+
def test_single_step_with_stacked_decos_completes(single_step_with_stacked_decos_run):
269+
"""Single-step flow with stacked @retry/@resources runs end-to-end."""
270+
assert single_step_with_stacked_decos_run.successful
271+
assert single_step_with_stacked_decos_run.finished
272+
273+
274+
def test_single_step_with_stacked_decos_graph_info(single_step_with_stacked_decos_run):
275+
"""_graph_info records all stacked decorators on the only step."""
276+
graph_info = (
277+
single_step_with_stacked_decos_run["_parameters"].task["_graph_info"].data
278+
)
279+
names = {d["name"] for d in graph_info["steps"]["only"]["decorators"]}
280+
assert {"retry", "resources"}.issubset(names)
281+
282+
283+
def test_single_step_with_flow_mutator_completes(single_step_with_flow_mutator_run):
284+
"""FlowMutator-decorated single-step flow runs end-to-end."""
285+
assert single_step_with_flow_mutator_run.successful
286+
assert single_step_with_flow_mutator_run.finished
287+
288+
289+
def test_single_step_with_flow_mutator_applied(single_step_with_flow_mutator_run):
290+
"""FlowMutator.add_decorator landed @retry on the only step."""
291+
graph_info = (
292+
single_step_with_flow_mutator_run["_parameters"].task["_graph_info"].data
293+
)
294+
names = {d["name"] for d in graph_info["steps"]["only"]["decorators"]}
295+
assert "retry" in names

test/unit/test_graph_structure.py

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
"""
1313

1414
import pytest
15-
from metaflow import FlowSpec, step, Parameter
15+
from metaflow import Config, FlowMutator, FlowSpec, step, Parameter, retry, resources
16+
from metaflow.flowspec import FlowStateItems
1617
from metaflow.lint import linter, LintWarn
1718

1819
# ---------------------------------------------------------------------------
@@ -133,6 +134,48 @@ def _make_dynamic_single_step_flow():
133134
)
134135

135136

137+
# ---------------------------------------------------------------------------
138+
# Flow classes: single-step flows composed with configs, decorators, mutators
139+
# ---------------------------------------------------------------------------
140+
141+
142+
class _SingleStepWithConfig(FlowSpec):
143+
"""Single-step flow with a Config descriptor."""
144+
145+
cfg = Config("cfg", default_value={"x": 7})
146+
147+
@step(start=True, end=True)
148+
def only(self):
149+
self.v = self.cfg["x"]
150+
151+
152+
class _SingleStepWithStackedDecos(FlowSpec):
153+
"""Single-step flow with multiple step decorators stacked."""
154+
155+
@retry(times=3)
156+
@resources(cpu=2, memory=1024)
157+
@step(start=True, end=True)
158+
def only(self):
159+
self.v = 1
160+
161+
162+
class _AddRetryMutator(FlowMutator):
163+
"""Adds @retry to every step. Used to verify mutators reach a single-step flow."""
164+
165+
def pre_mutate(self, mutable_flow):
166+
for _, s in mutable_flow.steps:
167+
s.add_decorator(retry, deco_kwargs={"times": 1})
168+
169+
170+
@_AddRetryMutator
171+
class _SingleStepWithFlowMutator(FlowSpec):
172+
"""Single-step flow with a FlowMutator applied at the class level."""
173+
174+
@step(start=True, end=True)
175+
def only(self):
176+
pass
177+
178+
136179
# ---------------------------------------------------------------------------
137180
# Fixtures
138181
# ---------------------------------------------------------------------------
@@ -487,6 +530,42 @@ def test_backward_compat_name_based():
487530
assert graph["end"].is_end_step is False
488531

489532

533+
# ---------------------------------------------------------------------------
534+
# Tests: composition with configs, stacked decorators, and flow mutators
535+
# ---------------------------------------------------------------------------
536+
537+
538+
def test_single_step_with_config_descriptor_registered():
539+
"""Config descriptor is registered on a single-step flow."""
540+
graph = _SingleStepWithConfig._graph
541+
assert graph.start_step == "only" == graph.end_step
542+
names = {name for name, _ in _SingleStepWithConfig._get_parameters()}
543+
assert "cfg" in names
544+
545+
546+
def test_single_step_with_multiple_step_decorators():
547+
"""Multiple step decorators stack correctly on a single-step flow."""
548+
graph = _SingleStepWithStackedDecos._graph
549+
deco_names = {d.name for d in graph["only"].decorators}
550+
assert {"retry", "resources"}.issubset(deco_names)
551+
552+
553+
def test_single_step_with_flow_mutator_registered():
554+
"""FlowMutator is registered on a single-step flow at class-definition time.
555+
556+
pre_mutate only fires when the flow is processed via the CLI layer, so
557+
the decorator it adds won't appear on the graph in a unit test. What we
558+
can verify here is that the mutator syntax is accepted by a single-step
559+
FlowSpec and that it's registered as a flow mutator. End-to-end execution
560+
is covered by the matching integration test.
561+
"""
562+
flow_cls = _SingleStepWithFlowMutator._flow_cls
563+
graph = flow_cls._graph
564+
assert graph.start_step == "only" == graph.end_step
565+
mutators = flow_cls._flow_state[FlowStateItems.FLOW_MUTATORS]
566+
assert any(isinstance(m, _AddRetryMutator) for m in mutators)
567+
568+
490569
# ---------------------------------------------------------------------------
491570
# Negative-path tests: malformed annotation patterns caught by lint
492571
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)