Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 44 additions & 10 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2121,8 +2121,9 @@ def parent_steps(self) -> Iterator["Step"]:
Parent step
"""
graph_info = self.task["_graph_info"].data
start_step = graph_info.get("start_step", "start")

if self.id != "start":
if self.id != start_step:
flow, run, _ = self.path_components
for node_name, attributes in graph_info["steps"].items():
if self.id in attributes["next"]:
Expand All @@ -2139,8 +2140,9 @@ def child_steps(self) -> Iterator["Step"]:
Child step
"""
graph_info = self.task["_graph_info"].data
end_step = graph_info.get("end_step", "end")

if self.id != "end":
if self.id != end_step:
flow, run, _ = self.path_components
for next_step in graph_info["steps"][self.id]["next"]:
yield Step(f"{flow}/{run}/{next_step}", _namespace_check=False)
Expand All @@ -2153,7 +2155,7 @@ class Run(MetaflowObject):
Attributes
----------
data : MetaflowData
a shortcut to run['end'].task.data, i.e. data produced by this run.
A shortcut to the terminal step's task data produced by this run.
successful : bool
True if the run completed successfully.
finished : bool
Expand All @@ -2165,7 +2167,7 @@ class Run(MetaflowObject):
trigger : MetaflowTrigger
Information about event(s) that triggered this run (if present). See `MetaflowTrigger`.
end_task : Task
`Task` for the end step (if it is present already).
`Task` for the terminal step (if it is present already).
"""

_NAME = "run"
Expand All @@ -2176,6 +2178,36 @@ def _iter_filter(self, x):
# exclude _parameters step
return x.id[0] != "_"

@property
def _graph_endpoints(self):
"""
Returns (start_step_name, end_step_name) from ``_parameters`` task
metadata.

The metadata is written by ``persist_constants``, which every
runtime path calls before any step executes — the native runtime
directly, and orchestrators (Argo Workflows, Airflow, Step
Functions) through the ``init`` command they insert into their
generated command line. Falls back to the literal
``("start", "end")`` for old runs that predate custom endpoint
support.
"""
if not hasattr(self, "_cached_endpoints"):
start, end = "start", "end"
try:
params_meta = self["_parameters"].task.metadata_dict
start = params_meta.get("start_step", "start")
end = params_meta.get("end_step", "end")
except (KeyError, MetaflowNotFound):
# Expected for old runs without _parameters or metadata.
pass
except Exception:
# Transient error (network, metadata service) -- do NOT cache
# the fallback so a subsequent access can retry.
return (start, end)
self._cached_endpoints = (start, end)
return self._cached_endpoints
Comment thread
greptile-apps[bot] marked this conversation as resolved.

def steps(self, *tags: str) -> Iterator[Step]:
"""
[Legacy function - do not use]
Expand Down Expand Up @@ -2298,17 +2330,18 @@ def finished_at(self) -> Optional[datetime]:
@property
def end_task(self) -> Optional[Task]:
"""
Returns the Task corresponding to the 'end' step.
Returns the Task corresponding to the terminal step.

This returns None if the end step does not yet exist.
This returns None if the terminal step does not yet exist.

Returns
-------
Task, optional
The 'end' task
The terminal step's task
"""
try:
end_step = self["end"]
_, end_step_name = self._graph_endpoints
end_step = self[end_step_name]
except KeyError:
return None

Expand Down Expand Up @@ -2481,8 +2514,9 @@ def trigger(self) -> Optional[Trigger]:
Trigger, optional
Container of triggering events
"""
if "start" in self and self["start"].task:
meta = self["start"].task.metadata_dict.get("execution-triggers")
start_step, _ = self._graph_endpoints
if start_step in self and self[start_step].task:
meta = self[start_step].task.metadata_dict.get("execution-triggers")
if meta:
return Trigger(json.loads(meta))
return None
Expand Down
68 changes: 57 additions & 11 deletions metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,27 @@ def _base_step_decorator(decotype, *args, **kwargs):
func = args[0]
if isinstance(func, (StepMutator, UserStepDecoratorBase)):
func = func._my_step

# Step decorator applied to a class with a synthesized step method.
Comment thread
romain-intel marked this conversation as resolved.
# This branch exists to support an upcoming FunctionSpec feature
# (currently shipped as an out-of-tree extension): its metaclass
# (FunctionSpecMeta) creates a single `@step(start=True, end=True)`
# method on the class and sets `_function_spec_step_name` to its
# attribute name. That lets class-level step decorators like
# ``@retry``/``@resources`` forward to the synthetic step.
#
# The `_function_spec_step_name` attribute is not set anywhere else
# in this repo — it is deliberately an extension contract. See the
# DAGNode module comment in graph.py for the wider context. This
# hook may be removed once FunctionSpec is folded into core.
if isinstance(func, type) and hasattr(func, "_function_spec_step_name"):
step_func = getattr(func, func._function_spec_step_name)
if hasattr(step_func, "is_step"):
step_func.decorators.append(
decotype(attributes=kwargs, statically_defined=True)
)
return func

if not hasattr(func, "is_step"):
raise BadStepDecoratorException(decotype.name, func)

Expand Down Expand Up @@ -1009,7 +1030,11 @@ def step(


def step(
f: Union[Callable[[FlowSpecDerived], None], Callable[[FlowSpecDerived, Any], None]],
f=None,
*,
start=False,
end=False,
node_info=None,
):
"""
Marks a method in a FlowSpec as a Metaflow Step. Note that this
Expand All @@ -1034,20 +1059,41 @@ def foo(self):

Parameters
----------
f : Union[Callable[[FlowSpecDerived], None], Callable[[FlowSpecDerived, Any], None]]
Function to make into a Metaflow Step
f : callable, optional
Function to make into a Metaflow Step. When using keyword arguments
(e.g. ``@step(start=True)``), this is ``None`` and a decorator
function is returned instead.
start : bool, default False
Mark this step as the start (entry) step of the flow.
end : bool, default False
Mark this step as the end (terminal) step of the flow.
node_info : dict, optional
Extra metadata to attach to this step's DAGNode. Extensions can use
this to store arbitrary information accessible via ``flow._graph``
(live references) and ``_graph_info`` (serialized via ``to_pod``).

Returns
-------
Union[Callable[[FlowSpecDerived, StepFlag], None], Callable[[FlowSpecDerived, Any, StepFlag], None]]
Function that is a Metaflow Step
callable
The decorated function, or a decorator if keyword arguments were used.
"""
f.is_step = True
f.decorators = []
f.config_decorators = []
f.wrappers = []
f.name = f.__name__
return f

def _apply(func):
func.is_step = True
func.decorators = []
func.config_decorators = []
func.wrappers = []
func.name = func.__name__
func.is_start_step = start
func.is_end_step = end
func.node_info = node_info or {}
return func

if f is not None:
# Called as @step (no parens)
return _apply(f)
# Called as @step(start=True) etc.
return _apply


def _import_plugin_decorators(globals_dict):
Expand Down
37 changes: 29 additions & 8 deletions metaflow/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,39 @@ def __init__(self, _meta=None):
@classmethod
def from_runs(cls, run_objs: List["metaflow.Run"]):
run_objs.sort(key=lambda x: x.finished_at, reverse=True)
trigger = Trigger(
[
valid_runs = []
meta = []
for run_obj in run_objs:
end_task = run_obj.end_task
if end_task is None:
continue
valid_runs.append(run_obj)
end_step_name = end_task.parent.id
meta.append(
{
"type": "run",
"timestamp": run_obj.finished_at,
"name": "metaflow.%s.%s" % (run_obj.parent.id, run_obj["end"].id),
"id": run_obj.end_task.pathspec,
"name": "metaflow.%s.%s" % (run_obj.parent.id, end_step_name),
"id": end_task.pathspec,
}
for run_obj in run_objs
]
)
trigger._runs = run_objs
)
# For custom-named end steps, also emit the well-known ".end"
# alias so downstream code that filters on "metaflow.<flow>.end"
# keeps matching. This mirrors the dual-emit in
# argo_workflows_decorator.py so the Argo publish path and the
# programmatic Trigger.from_runs path stay symmetric.
if end_step_name != "end":
meta.append(
{
"type": "run",
"timestamp": run_obj.finished_at,
"name": "metaflow.%s.end" % run_obj.parent.id,
"id": end_task.pathspec,
}
)

trigger = Trigger(meta)
trigger._runs = valid_runs
return trigger

@property
Expand Down
2 changes: 2 additions & 0 deletions metaflow/flowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ def _set_constants(self, graph, kwargs, config_options):

graph_info = {
"file": os.path.basename(os.path.abspath(sys.argv[0])),
"start_step": graph.start_step,
"end_step": graph.end_step,
"parameters": parameters_info,
"constants": constants_info,
"steps": steps_info,
Expand Down
Loading
Loading