Skip to content

Commit 036ac35

Browse files
talsperreclaude
andcommitted
Fix start/end step regressions: events, SFN, cards, argo, graph, lint
Core fixes: - events.py: use end_task.parent.id instead of hardcoded run_obj["end"] - step_functions.py: use StartAt from definition JSON (both get_existing_deployment and get_execution) instead of hardcoded ["States"]["start"] - argo_workflows.py: _matching_conditional_join uses self.graph.end_step instead of hardcoded "end" fallback - graph.py: None guard in output_steps() raises clear ValueError - client/core.py: narrow _graph_endpoints exception caching to (KeyError, MetaflowNotFound); transient errors return uncached fallback - runtime.py: guard metadata registration with is_cloned check + try/except Cards: - card_cli.py: wrap graph_dict in payload with start_step/end_step metadata - basic.py: accept new payload format, pass through start/end - dag.svelte, step-wrapper.svelte, types.ts: dynamic start/end props Lint: - Improved check_basic_steps to distinguish "no start step" from "multiple @step(start=True) annotations" - New check_annotation_name_conflict: warns when @step(start=True) coexists with a step named "start" (and likewise for end) Tests: - Negative-path tests for malformed annotation patterns (8 cases) - Card rendering with custom endpoints - Trigger.from_runs() with custom terminal step - Step Functions StartAt lookup Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 021c5a8 commit 036ac35

23 files changed

Lines changed: 1097 additions & 464 deletions

metaflow/client/core.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2192,8 +2192,13 @@ def _graph_endpoints(self):
21922192
params_meta = self["_parameters"].task.metadata_dict
21932193
start = params_meta.get("start_step", "start")
21942194
end = params_meta.get("end_step", "end")
2195-
except Exception:
2195+
except (KeyError, MetaflowNotFound):
2196+
# Expected for old runs without _parameters or metadata.
21962197
pass
2198+
except Exception:
2199+
# Transient error (network, metadata service) -- do NOT cache
2200+
# the fallback so a subsequent access can retry.
2201+
return (start, end)
21972202
self._cached_endpoints = (start, end)
21982203
return self._cached_endpoints
21992204

metaflow/decorators.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -591,10 +591,10 @@ def _base_step_decorator(decotype, *args, **kwargs):
591591

592592
# Step decorator applied to a class with a synthesized step method.
593593
# Used by extensions that create a single synthetic @step on a
594-
# FlowSpec subclass. _step_spec_step_name is set by the
595-
# extension's metaclass.
596-
if isinstance(func, type) and hasattr(func, "_step_spec_step_name"):
597-
step_func = getattr(func, func._step_spec_step_name)
594+
# FlowSpec subclass. _function_spec_step_name is set by the
595+
# extension's metaclass (e.g. FunctionSpecMeta).
596+
if isinstance(func, type) and hasattr(func, "_function_spec_step_name"):
597+
step_func = getattr(func, func._function_spec_step_name)
598598
if hasattr(step_func, "is_step"):
599599
step_func.decorators.append(
600600
decotype(attributes=kwargs, statically_defined=True)

metaflow/events.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,24 @@ def __init__(self, _meta=None):
5757
@classmethod
5858
def from_runs(cls, run_objs: List["metaflow.Run"]):
5959
run_objs.sort(key=lambda x: x.finished_at, reverse=True)
60-
trigger = Trigger(
61-
[
60+
valid_runs = []
61+
meta = []
62+
for run_obj in run_objs:
63+
end_task = run_obj.end_task
64+
if end_task is None:
65+
continue
66+
valid_runs.append(run_obj)
67+
meta.append(
6268
{
6369
"type": "run",
6470
"timestamp": run_obj.finished_at,
65-
"name": "metaflow.%s.%s" % (run_obj.parent.id, run_obj["end"].id),
66-
"id": run_obj.end_task.pathspec,
71+
"name": "metaflow.%s.%s" % (run_obj.parent.id, end_task.parent.id),
72+
"id": end_task.pathspec,
6773
}
68-
for run_obj in run_objs
69-
]
70-
)
71-
trigger._runs = run_objs
74+
)
75+
76+
trigger = Trigger(meta)
77+
trigger._runs = valid_runs
7278
return trigger
7379

7480
@property

metaflow/flowspec.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -164,21 +164,11 @@ def _merge_value(self, inherited_value, self_value):
164164

165165

166166
class FlowSpecMeta(type):
167-
_registry = {} # {class_name: class} for all FlowSpec/StepSpec subclasses
168-
169-
# Names of base classes that should NOT be registered or have their
170-
# graph/attrs initialized. Subclasses of FlowSpecMeta (like StepSpecMeta)
171-
# can extend this set.
172-
_base_class_names = frozenset({"FlowSpec"})
173-
174167
def __init__(cls, name, bases, attrs):
175168
super().__init__(name, bases, attrs)
176-
# Check against the metaclass's own _base_class_names — this
177-
# allows StepSpecMeta to add "StepSpec" to the set.
178-
if name in type(cls)._base_class_names:
169+
if name == "FlowSpec":
179170
return
180171

181-
type(cls)._registry[name] = cls
182172
cls._init_attrs()
183173

184174
def _init_attrs(cls):

metaflow/graph.py

Lines changed: 112 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,20 @@ def __init__(
5959
is_start_step=False,
6060
is_end_step=False,
6161
node_info=None,
62+
name=None,
63+
num_args=None,
6264
):
63-
self.name = func_ast.name
65+
if func_ast is None and name is None:
66+
raise ValueError("name is required when func_ast is None")
67+
68+
self.name = func_ast.name if func_ast is not None else name
6469
self.source_file = source_file
6570
# lineno is the start line of decorators in source_file
6671
# func_ast.lineno is lines from decorators start to def of function
67-
self.func_lineno = lineno + func_ast.lineno - 1
72+
if func_ast is not None:
73+
self.func_lineno = lineno + func_ast.lineno - 1
74+
else:
75+
self.func_lineno = lineno
6876
self.decorators = decos
6977
self.wrappers = wrappers
7078
self.config_decorators = config_decorators
@@ -83,13 +91,14 @@ def __init__(
8391
self.out_funcs = []
8492
self.has_tail_next = False
8593
self.invalid_tail_next = False
86-
self.num_args = 0
94+
self.num_args = 0 if num_args is None else num_args
8795
self.switch_cases = {}
8896
self.condition = None
8997
self.foreach_param = None
9098
self.num_parallel = 0
9199
self.parallel_foreach = False
92-
self._parse(func_ast, lineno)
100+
if func_ast is not None:
101+
self._parse(func_ast, lineno)
93102

94103
# these attributes are populated by _traverse_graph
95104
self.in_funcs = set()
@@ -282,48 +291,34 @@ def _identify_start_end(self):
282291
"""
283292
Determine the start and end steps.
284293
285-
Uses explicit ``@step(start=True)`` / ``@step(end=True)`` annotations
286-
if present. Falls back to looking for steps named ``"start"`` /
287-
``"end"`` for backward compatibility.
288-
289-
Sets ``self.start_step`` and ``self.end_step`` to step name strings,
290-
or ``None`` if the graph is malformed (validated later by lint).
291-
Also assigns the ``"start"`` and ``"end"`` node types.
294+
Uses explicit @step(start=True) / @step(end=True) annotations if present.
295+
Falls back to steps named "start" / "end" for backward compatibility.
296+
Sets self.start_step and self.end_step to step name strings, or None
297+
if the graph is malformed (validated later by lint).
292298
"""
293-
# 1. Look for explicit annotations
294-
annotated_start = [
295-
name
296-
for name, node in self.nodes.items()
297-
if node.is_start_step and not name.startswith("_")
298-
]
299-
annotated_end = [
300-
name
301-
for name, node in self.nodes.items()
302-
if node.is_end_step and not name.startswith("_")
303-
]
304-
305-
# 2. Determine start step (annotation first, then name fallback)
306-
if len(annotated_start) == 1:
307-
self.start_step = annotated_start[0]
308-
elif len(annotated_start) == 0:
309-
self.start_step = "start" if "start" in self.nodes else None
310-
else:
311-
self.start_step = None # Multiple annotated — lint will catch
312299

313-
# 3. Determine end step (annotation first, then name fallback)
314-
if len(annotated_end) == 1:
315-
self.end_step = annotated_end[0]
316-
elif len(annotated_end) == 0:
317-
self.end_step = "end" if "end" in self.nodes else None
318-
else:
319-
self.end_step = None # Multiple annotated — lint will catch
300+
def _resolve(attr, fallback_name):
301+
"""Find the unique annotated step, or fall back to a named step."""
302+
annotated = [
303+
name
304+
for name, node in self.nodes.items()
305+
if getattr(node, attr) and not name.startswith("_")
306+
]
307+
if len(annotated) == 1:
308+
return annotated[0]
309+
if len(annotated) == 0:
310+
return fallback_name if fallback_name in self.nodes else None
311+
return None # Multiple annotated — lint will catch
312+
313+
self.start_step = _resolve("is_start_step", "start")
314+
self.end_step = _resolve("is_end_step", "end")
320315

321-
# 4. Assign types based on identified start/end.
322-
# Only upgrade "linear" "start" for the entry point; do NOT override
316+
# Assign node types for graph traversal.
317+
# Only upgrade "linear" -> "start" for the entry point; do NOT override
323318
# "split", "foreach", etc. since those types are needed for
324319
# split/join balance checking.
325320
if self.start_step and self.start_step == self.end_step:
326-
# Single-step flow: terminal node that is also the entry point
321+
# Single-step flow: terminal node that is also the entry point.
327322
self.nodes[self.start_step].type = "end"
328323
else:
329324
if self.start_step:
@@ -333,29 +328,76 @@ def _identify_start_end(self):
333328
if self.end_step:
334329
self.nodes[self.end_step].type = "end"
335330

331+
def _create_sourceless_single_step_node(
332+
self, name, func, is_start_step, is_end_step
333+
):
334+
"""Create a DAGNode for a dynamically-generated single-step method.
335+
336+
When ``inspect.getsourcelines()`` fails (e.g. for steps synthesized
337+
via ``compile()`` + ``exec()`` by extension metaclasses like
338+
``FunctionSpecMeta``), this method builds a DAGNode without AST
339+
parsing. This is safe because single-step flows (``start=True,
340+
end=True``) have no ``self.next()`` transitions to analyze.
341+
"""
342+
code = getattr(func, "__code__", None)
343+
source_file = inspect.getsourcefile(func) or inspect.getfile(func)
344+
lineno = getattr(code, "co_firstlineno", 0)
345+
346+
try:
347+
num_args = len(inspect.signature(func).parameters)
348+
except (TypeError, ValueError):
349+
num_args = getattr(code, "co_argcount", 0)
350+
351+
return DAGNode(
352+
None,
353+
func.decorators,
354+
func.wrappers,
355+
func.config_decorators,
356+
func.__doc__,
357+
source_file,
358+
lineno,
359+
is_start_step=is_start_step,
360+
is_end_step=is_end_step,
361+
node_info=getattr(func, "node_info", None),
362+
name=name,
363+
num_args=num_args,
364+
)
365+
336366
def _create_nodes(self, flow):
337367
nodes = {}
338368
for element in dir(flow):
339369
func = getattr(flow, element)
340370
if callable(func) and hasattr(func, "is_step"):
341-
source_file = inspect.getsourcefile(func)
342-
source_lines, lineno = inspect.getsourcelines(func)
343-
# This also works for code (strips out leading whitspace based on
344-
# first line)
345-
source_code = deindent_docstring("".join(source_lines))
346-
function_ast = ast.parse(source_code).body[0]
347-
node = DAGNode(
348-
function_ast,
349-
func.decorators,
350-
func.wrappers,
351-
func.config_decorators,
352-
func.__doc__,
353-
source_file,
354-
lineno,
355-
is_start_step=getattr(func, "is_start_step", False),
356-
is_end_step=getattr(func, "is_end_step", False),
357-
node_info=getattr(func, "node_info", None),
358-
)
371+
is_start = getattr(func, "is_start_step", False)
372+
is_end = getattr(func, "is_end_step", False)
373+
374+
try:
375+
source_file = inspect.getsourcefile(func) or inspect.getfile(func)
376+
source_lines, lineno = inspect.getsourcelines(func)
377+
except OSError:
378+
if is_start and is_end:
379+
node = self._create_sourceless_single_step_node(
380+
element, func, is_start, is_end
381+
)
382+
else:
383+
raise
384+
else:
385+
# This also works for code (strips out leading whitespace based on
386+
# first line)
387+
source_code = deindent_docstring("".join(source_lines))
388+
function_ast = ast.parse(source_code).body[0]
389+
node = DAGNode(
390+
function_ast,
391+
func.decorators,
392+
func.wrappers,
393+
func.config_decorators,
394+
func.__doc__,
395+
source_file,
396+
lineno,
397+
is_start_step=is_start,
398+
is_end_step=is_end,
399+
node_info=getattr(func, "node_info", None),
400+
)
359401
nodes[element] = node
360402
return nodes
361403

@@ -568,6 +610,18 @@ def populate_block(start_name, end_name):
568610
break
569611
return resulting_list
570612

613+
if self.start_step is None or self.end_step is None:
614+
missing = []
615+
if self.start_step is None:
616+
missing.append("start")
617+
if self.end_step is None:
618+
missing.append("end")
619+
raise ValueError(
620+
"Cannot compute graph structure: no %s step identified. "
621+
"Use @step(start=True)/@step(end=True) or name your steps "
622+
"'start'/'end'." % " or ".join(missing)
623+
)
624+
571625
if self.start_step == self.end_step:
572626
# Single-step flow
573627
graph_structure = []

metaflow/lint.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,60 @@ def check_reserved_words(graph):
5959
@linter.check
6060
def check_basic_steps(graph):
6161
if graph.start_step is None:
62+
annotated = [
63+
name
64+
for name, node in graph.nodes.items()
65+
if node.is_start_step and not name.startswith("_")
66+
]
67+
if len(annotated) > 1:
68+
raise LintWarn(
69+
"Multiple steps annotated with @step(start=True): %s. "
70+
"Exactly one is allowed." % ", ".join(sorted(annotated))
71+
)
6272
raise LintWarn(
6373
"Your flow must have exactly one start step. Either name a step "
6474
"'start' or use @step(start=True)."
6575
)
6676
if graph.end_step is None:
77+
annotated = [
78+
name
79+
for name, node in graph.nodes.items()
80+
if node.is_end_step and not name.startswith("_")
81+
]
82+
if len(annotated) > 1:
83+
raise LintWarn(
84+
"Multiple steps annotated with @step(end=True): %s. "
85+
"Exactly one is allowed." % ", ".join(sorted(annotated))
86+
)
6787
raise LintWarn(
6888
"Your flow must have exactly one end step. Either name a step "
6989
"'end' or use @step(end=True)."
7090
)
7191

7292

93+
@linter.ensure_fundamentals
94+
@linter.check
95+
def check_annotation_name_conflict(graph):
96+
"""Detect conflict between @step(start/end=True) and legacy step names."""
97+
if (
98+
graph.start_step is not None
99+
and graph.start_step != "start"
100+
and "start" in graph.nodes
101+
):
102+
raise LintWarn(
103+
"Ambiguous start step: step '%s' is annotated with @step(start=True) "
104+
"but a step named 'start' also exists. Remove the 'start' name or "
105+
"the @step(start=True) annotation." % graph.start_step
106+
)
107+
108+
if graph.end_step is not None and graph.end_step != "end" and "end" in graph.nodes:
109+
raise LintWarn(
110+
"Ambiguous end step: step '%s' is annotated with @step(end=True) "
111+
"but a step named 'end' also exists. Remove the 'end' name or "
112+
"the @step(end=True) annotation." % graph.end_step
113+
)
114+
115+
73116
@linter.ensure_static_graph
74117
@linter.check
75118
def check_start_end_degree(graph):

metaflow/plugins/argo/argo_workflows.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,8 +1201,9 @@ def _is_recursive_node(self, node):
12011201
return node.name in self.recursive_nodes
12021202

12031203
def _matching_conditional_join(self, node):
1204-
# If no earlier conditional join step is found during parsing, then 'end' is always one.
1205-
return self.matching_conditional_join_dict.get(node.name, "end")
1204+
# If no earlier conditional join step is found during parsing,
1205+
# fall back to the graph's terminal step.
1206+
return self.matching_conditional_join_dict.get(node.name, self.graph.end_step)
12061207

12071208
# Visit every node and yield the uber DAGTemplate(s).
12081209
def _dag_templates(self):

0 commit comments

Comments
 (0)