Skip to content

Commit 8a12f03

Browse files
romain-inteltalsperreclaude
authored
feat: @step(start=True)/@step(end=True) annotations, single-step flows, and node_info metadata (#3120)
## PR Type - [ ] Bug fix - [X] New feature - [X] Core Runtime change (higher bar -- see [CONTRIBUTING.md](../CONTRIBUTING.md#core-runtime-contributions-higher-bar)) - [ ] Docs / tooling - [ ] Refactoring ## Summary Makes the FlowSpec graph more flexible in three ways, with backward compatibility preserved throughout: 1. **Explicit start/end annotations.** `@step` now accepts `start=True` and `end=True` kwargs. Steps no longer have to be named "start"/"end" — a flow can use any name as long as one step is annotated as the start and one as the end. When no annotations are present, the graph falls back to discovering steps by the names "start" and "end" (old behavior). 2. **Single-step flows.** A single step annotated `@step(start=True, end=True)` is now valid; previously a FlowSpec required at least two steps. 3. **Per-node `node_info` metadata.** `DAGNode` gains a `node_info` dict that extensions can populate to attach arbitrary per-step metadata. Live references are accessible via `flow._graph`; serialized values (via `to_pod`) flow into `_graph_info`. Callables are serialized as their qualified name. These are core-runtime changes because they touch `graph.py`, `lint.py`, `runtime.py`, `task.py`, and every orchestrator plugin (Argo Workflows, Step Functions, Airflow) that reads the graph or emits events tied to the terminal step. ### Key changes | Area | What changed | |------|-------------| | `decorators.py` | `@step(start=False, end=False, node_info=None)` | | `graph.py` | `DAGNode.is_start_step`, `is_end_step`, `node_info`; `_identify_start_end` uses annotations with name fallback; attribute name (not AST `def` name) is authoritative; tolerates sourceless single-step methods (for out-of-tree FunctionSpec extension) | | `lint.py` | `check_basic_steps` distinguishes "no start" from "multiple start annotations"; new `check_start_end_degree` validates in/out degrees; new `check_annotation_name_conflict` warns when `@step(start=True)` coexists with a step named "start" (same for end) | | `util.py` | `to_pod` serializes callables via `__qualname__` | | `runtime.py` | Persists `start_step`/`end_step` as metadata on the `_parameters` task | | `task.py`, `flowspec.py` | Use the graph's start/end step rather than literal "start"/"end" names | | `client/core.py` | `Run._graph_endpoints` reads endpoints from `_parameters` metadata with `_graph_info` fallback for orchestrated runs | | `plugins/argo/argo_workflows.py` | `_matching_conditional_join` falls back to `graph.end_step`; terminal-step detection through the DAG | | `plugins/argo/argo_workflows_decorator.py` | Auto-emitted event from the end step is also published with the well-known `.end` suffix so `@trigger_on_finish` subscribers — which don't know the publisher's end step name at deploy time — keep receiving triggers from custom-named end steps | | `plugins/aws/step_functions/step_functions.py` | Reads `StartAt` from the definition JSON instead of hardcoded `["States"]["start"]` | | `plugins/airflow/airflow.py`, `airflow_cli.py` | Use `graph.start_step`/`end_step` instead of hardcoded names | | `plugins/cards/card_cli.py` | `graph` kwarg keeps the old dict-of-steps shape for backward compatibility with third-party card modules; new opt-in `graph_info` kwarg carries the full payload (steps + start/end). Card_cli probes the constructor signature and only passes `graph_info` to cards that accept it | | `plugins/cards/card_modules/basic.py` | `DefaultCard`, `DefaultCardJSON`, `ErrorCard` accept both `graph` and `graph_info` (preferred when provided) | | `plugins/cards/ui/` | Svelte DAG renderer reads start/end dynamically instead of assuming "start"/"end" | ## Tests - [X] Unit tests added/updated - [ ] Reproduction script provided (required for Core Runtime) - [X] CI passes - [ ] If tests are impractical: explain why below and provide manual evidence above New and updated tests cover: - Graph structure inference: standard flows, custom-named flows, single-step flows, branch flows, foreach flows, split-as-entry-step flows - `@step(start=True)`/`@step(end=True)` annotation mechanics and backward-compat with name-based discovery - Lint validation: degree checks, annotation-name conflicts, negative-path cases for malformed annotations - End-to-end execution of custom-named flows via Runner with card rendering - `Trigger.from_runs` on flows with a custom-named terminal step - Step Functions `StartAt` lookup from deployment JSON - Card graph transform accepting both legacy and new payload shapes ## Non-Goals - A `StepSpec`/`FunctionSpec` construct (single-step FlowSpec-like class with `init()`+`call()` lifecycle) — hooks for it are present in `DAGNode` and `_base_step_decorator` (see module comment in `graph.py`), but the class itself is kept out-of-tree for now. - Multi-flow file support (a `FlowSpec.main()` classmethod to route between multiple FlowSpec subclasses in one file) — deferred. - Direct non-CLI invocation of FlowSpec — deferred. ## AI Tool Usage - [X] AI tools were used (describe below) AI coding assistance was used during development. All generated code was reviewed and tested. --------- Co-authored-by: Shashank Srikanth <ssrikanth@netflix.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 73db930 commit 8a12f03

44 files changed

Lines changed: 2294 additions & 210 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

metaflow/client/core.py

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2121,8 +2121,9 @@ def parent_steps(self) -> Iterator["Step"]:
21212121
Parent step
21222122
"""
21232123
graph_info = self.task["_graph_info"].data
2124+
start_step = graph_info.get("start_step", "start")
21242125

2125-
if self.id != "start":
2126+
if self.id != start_step:
21262127
flow, run, _ = self.path_components
21272128
for node_name, attributes in graph_info["steps"].items():
21282129
if self.id in attributes["next"]:
@@ -2139,8 +2140,9 @@ def child_steps(self) -> Iterator["Step"]:
21392140
Child step
21402141
"""
21412142
graph_info = self.task["_graph_info"].data
2143+
end_step = graph_info.get("end_step", "end")
21422144

2143-
if self.id != "end":
2145+
if self.id != end_step:
21442146
flow, run, _ = self.path_components
21452147
for next_step in graph_info["steps"][self.id]["next"]:
21462148
yield Step(f"{flow}/{run}/{next_step}", _namespace_check=False)
@@ -2153,7 +2155,7 @@ class Run(MetaflowObject):
21532155
Attributes
21542156
----------
21552157
data : MetaflowData
2156-
a shortcut to run['end'].task.data, i.e. data produced by this run.
2158+
A shortcut to the terminal step's task data produced by this run.
21572159
successful : bool
21582160
True if the run completed successfully.
21592161
finished : bool
@@ -2165,7 +2167,7 @@ class Run(MetaflowObject):
21652167
trigger : MetaflowTrigger
21662168
Information about event(s) that triggered this run (if present). See `MetaflowTrigger`.
21672169
end_task : Task
2168-
`Task` for the end step (if it is present already).
2170+
`Task` for the terminal step (if it is present already).
21692171
"""
21702172

21712173
_NAME = "run"
@@ -2176,6 +2178,36 @@ def _iter_filter(self, x):
21762178
# exclude _parameters step
21772179
return x.id[0] != "_"
21782180

2181+
@property
2182+
def _graph_endpoints(self):
2183+
"""
2184+
Returns (start_step_name, end_step_name) from ``_parameters`` task
2185+
metadata.
2186+
2187+
The metadata is written by ``persist_constants``, which every
2188+
runtime path calls before any step executes — the native runtime
2189+
directly, and orchestrators (Argo Workflows, Airflow, Step
2190+
Functions) through the ``init`` command they insert into their
2191+
generated command line. Falls back to the literal
2192+
``("start", "end")`` for old runs that predate custom endpoint
2193+
support.
2194+
"""
2195+
if not hasattr(self, "_cached_endpoints"):
2196+
start, end = "start", "end"
2197+
try:
2198+
params_meta = self["_parameters"].task.metadata_dict
2199+
start = params_meta.get("start_step", "start")
2200+
end = params_meta.get("end_step", "end")
2201+
except (KeyError, MetaflowNotFound):
2202+
# Expected for old runs without _parameters or metadata.
2203+
pass
2204+
except Exception:
2205+
# Transient error (network, metadata service) -- do NOT cache
2206+
# the fallback so a subsequent access can retry.
2207+
return (start, end)
2208+
self._cached_endpoints = (start, end)
2209+
return self._cached_endpoints
2210+
21792211
def steps(self, *tags: str) -> Iterator[Step]:
21802212
"""
21812213
[Legacy function - do not use]
@@ -2298,17 +2330,18 @@ def finished_at(self) -> Optional[datetime]:
22982330
@property
22992331
def end_task(self) -> Optional[Task]:
23002332
"""
2301-
Returns the Task corresponding to the 'end' step.
2333+
Returns the Task corresponding to the terminal step.
23022334
2303-
This returns None if the end step does not yet exist.
2335+
This returns None if the terminal step does not yet exist.
23042336
23052337
Returns
23062338
-------
23072339
Task, optional
2308-
The 'end' task
2340+
The terminal step's task
23092341
"""
23102342
try:
2311-
end_step = self["end"]
2343+
_, end_step_name = self._graph_endpoints
2344+
end_step = self[end_step_name]
23122345
except KeyError:
23132346
return None
23142347

@@ -2481,8 +2514,9 @@ def trigger(self) -> Optional[Trigger]:
24812514
Trigger, optional
24822515
Container of triggering events
24832516
"""
2484-
if "start" in self and self["start"].task:
2485-
meta = self["start"].task.metadata_dict.get("execution-triggers")
2517+
start_step, _ = self._graph_endpoints
2518+
if start_step in self and self[start_step].task:
2519+
meta = self[start_step].task.metadata_dict.get("execution-triggers")
24862520
if meta:
24872521
return Trigger(json.loads(meta))
24882522
return None

metaflow/decorators.py

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,27 @@ def _base_step_decorator(decotype, *args, **kwargs):
588588
func = args[0]
589589
if isinstance(func, (StepMutator, UserStepDecoratorBase)):
590590
func = func._my_step
591+
592+
# Step decorator applied to a class with a synthesized step method.
593+
# This branch exists to support an upcoming FunctionSpec feature
594+
# (currently shipped as an out-of-tree extension): its metaclass
595+
# (FunctionSpecMeta) creates a single `@step(start=True, end=True)`
596+
# method on the class and sets `_function_spec_step_name` to its
597+
# attribute name. That lets class-level step decorators like
598+
# ``@retry``/``@resources`` forward to the synthetic step.
599+
#
600+
# The `_function_spec_step_name` attribute is not set anywhere else
601+
# in this repo — it is deliberately an extension contract. See the
602+
# DAGNode module comment in graph.py for the wider context. This
603+
# hook may be removed once FunctionSpec is folded into core.
604+
if isinstance(func, type) and hasattr(func, "_function_spec_step_name"):
605+
step_func = getattr(func, func._function_spec_step_name)
606+
if hasattr(step_func, "is_step"):
607+
step_func.decorators.append(
608+
decotype(attributes=kwargs, statically_defined=True)
609+
)
610+
return func
611+
591612
if not hasattr(func, "is_step"):
592613
raise BadStepDecoratorException(decotype.name, func)
593614

@@ -1009,7 +1030,11 @@ def step(
10091030

10101031

10111032
def step(
1012-
f: Union[Callable[[FlowSpecDerived], None], Callable[[FlowSpecDerived, Any], None]],
1033+
f=None,
1034+
*,
1035+
start=False,
1036+
end=False,
1037+
node_info=None,
10131038
):
10141039
"""
10151040
Marks a method in a FlowSpec as a Metaflow Step. Note that this
@@ -1034,20 +1059,41 @@ def foo(self):
10341059
10351060
Parameters
10361061
----------
1037-
f : Union[Callable[[FlowSpecDerived], None], Callable[[FlowSpecDerived, Any], None]]
1038-
Function to make into a Metaflow Step
1062+
f : callable, optional
1063+
Function to make into a Metaflow Step. When using keyword arguments
1064+
(e.g. ``@step(start=True)``), this is ``None`` and a decorator
1065+
function is returned instead.
1066+
start : bool, default False
1067+
Mark this step as the start (entry) step of the flow.
1068+
end : bool, default False
1069+
Mark this step as the end (terminal) step of the flow.
1070+
node_info : dict, optional
1071+
Extra metadata to attach to this step's DAGNode. Extensions can use
1072+
this to store arbitrary information accessible via ``flow._graph``
1073+
(live references) and ``_graph_info`` (serialized via ``to_pod``).
10391074
10401075
Returns
10411076
-------
1042-
Union[Callable[[FlowSpecDerived, StepFlag], None], Callable[[FlowSpecDerived, Any, StepFlag], None]]
1043-
Function that is a Metaflow Step
1077+
callable
1078+
The decorated function, or a decorator if keyword arguments were used.
10441079
"""
1045-
f.is_step = True
1046-
f.decorators = []
1047-
f.config_decorators = []
1048-
f.wrappers = []
1049-
f.name = f.__name__
1050-
return f
1080+
1081+
def _apply(func):
1082+
func.is_step = True
1083+
func.decorators = []
1084+
func.config_decorators = []
1085+
func.wrappers = []
1086+
func.name = func.__name__
1087+
func.is_start_step = start
1088+
func.is_end_step = end
1089+
func.node_info = node_info or {}
1090+
return func
1091+
1092+
if f is not None:
1093+
# Called as @step (no parens)
1094+
return _apply(f)
1095+
# Called as @step(start=True) etc.
1096+
return _apply
10511097

10521098

10531099
def _import_plugin_decorators(globals_dict):

metaflow/events.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,39 @@ def __init__(self, _meta=None):
5757
@classmethod
5858
def from_runs(cls, run_objs: List["metaflow.Run"]):
5959
run_objs.sort(key=lambda x: x.finished_at, reverse=True)
60-
trigger = Trigger(
61-
[
60+
valid_runs = []
61+
meta = []
62+
for run_obj in run_objs:
63+
end_task = run_obj.end_task
64+
if end_task is None:
65+
continue
66+
valid_runs.append(run_obj)
67+
end_step_name = end_task.parent.id
68+
meta.append(
6269
{
6370
"type": "run",
6471
"timestamp": run_obj.finished_at,
65-
"name": "metaflow.%s.%s" % (run_obj.parent.id, run_obj["end"].id),
66-
"id": run_obj.end_task.pathspec,
72+
"name": "metaflow.%s.%s" % (run_obj.parent.id, end_step_name),
73+
"id": end_task.pathspec,
6774
}
68-
for run_obj in run_objs
69-
]
70-
)
71-
trigger._runs = run_objs
75+
)
76+
# For custom-named end steps, also emit the well-known ".end"
77+
# alias so downstream code that filters on "metaflow.<flow>.end"
78+
# keeps matching. This mirrors the dual-emit in
79+
# argo_workflows_decorator.py so the Argo publish path and the
80+
# programmatic Trigger.from_runs path stay symmetric.
81+
if end_step_name != "end":
82+
meta.append(
83+
{
84+
"type": "run",
85+
"timestamp": run_obj.finished_at,
86+
"name": "metaflow.%s.end" % run_obj.parent.id,
87+
"id": end_task.pathspec,
88+
}
89+
)
90+
91+
trigger = Trigger(meta)
92+
trigger._runs = valid_runs
7293
return trigger
7394

7495
@property

metaflow/flowspec.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,8 @@ def _set_constants(self, graph, kwargs, config_options):
522522

523523
graph_info = {
524524
"file": os.path.basename(os.path.abspath(sys.argv[0])),
525+
"start_step": graph.start_step,
526+
"end_step": graph.end_step,
525527
"parameters": parameters_info,
526528
"constants": constants_info,
527529
"steps": steps_info,

0 commit comments

Comments
 (0)