Skip to content

Commit 5db8715

Browse files
authored
[namespaced events] (#2742)
- Make @trigger and ArgoEvent support namespaced eventing. This means users can do the following: Have an upstream frow raise an ArgoEvent which is caught by a downstream flow under the same project and branch. For example, below flow raises the event. ```python # python pub.py --branch foo run from metaflow import FlowSpec, step, project, namespaced_event_name from metaflow.integrations import ArgoEvent @project(name="myproject") class MyFlow(FlowSpec): @step def start(self): # Publish a namespaced event: mfns.myproject.test.foo.data_ready ArgoEvent(namespaced_event_name("data_ready")).publish() self.next(self.end) @step def end(self): pass ``` And this one catches that event: ```python # python pub.py --branch foo argo-workflows create @project(name="myproject") @trigger(event=namespaced_event_name("data_ready")) class DownstreamFlow(FlowSpec): ... ``` - Each event name returned by `namespaced_event_name` is blindly prefixed with `mfns.` to distinguish namespaced events (this is configurable via `METAFLOW_NAMESPACED_EVENTS_PREFIX`). this acts as a proxy for knowing that "metaflow" raised the event. - The intention of this change is to allow users to enforce catching/raising events only within the same project/branch namespace to make experimentation much easier. - We have a [custom decorator implementation](https://github.com/outerbounds/custom-decorator-examples/tree/main/namespaced_events) of this but it is not as natural as a core change. - We rely on the same machinery introduced in #2157 which allowed @trigger event/s parameters to be DeployTimeFields. We just lift this information / functionality into the docstring too and make it a little more public.
1 parent ecced08 commit 5db8715

5 files changed

Lines changed: 134 additions & 7 deletions

File tree

metaflow/metaflow_config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,9 @@
455455
"ARGO_EVENTS_SENSOR_NAMESPACE", KUBERNETES_NAMESPACE
456456
)
457457

458+
# Prefix for namespaced events (used by @trigger with namespaced=True)
459+
NAMESPACED_EVENTS_PREFIX = from_conf("NAMESPACED_EVENTS_PREFIX", "mfns")
460+
458461
ARGO_WORKFLOWS_UI_URL = from_conf("ARGO_WORKFLOWS_UI_URL")
459462

460463
##

metaflow/plugins/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@
174174
TL_PLUGINS_DESC = [
175175
("yaml_parser", ".parsers.yaml_parser"),
176176
("requirements_txt_parser", ".pypi.parsers.requirements_txt_parser"),
177+
("namespaced_event_name", ".namespaced_events.namespaced_event_name"),
177178
("pyproject_toml_parser", ".pypi.parsers.pyproject_toml_parser"),
178179
("conda_environment_yml_parser", ".pypi.parsers.conda_environment_yml_parser"),
179180
]

metaflow/plugins/argo/argo_events.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ class ArgoEvent(object):
2727
2828
Parameters
2929
----------
30-
name : str,
31-
Name of the event
30+
name : Union[str, Callable[[], str]]
31+
Name of the event, or a callable (invoked with no arguments) that returns the event name (e.g., `namespaced_event_name('foo')`).
3232
url : str, optional
3333
Override the event endpoint from `ARGO_EVENTS_WEBHOOK_URL`.
3434
payload : Dict, optional
@@ -39,6 +39,13 @@ def __init__(
3939
self, name, url=ARGO_EVENTS_WEBHOOK_URL, payload=None, access_token=None
4040
):
4141
# TODO: Introduce support for NATS
42+
if callable(name):
43+
name = name()
44+
if not isinstance(name, str):
45+
raise ArgoEventException(
46+
"Callable for 'name' must return a string, got %s"
47+
% type(name).__name__
48+
)
4249
self._name = name
4350
self._url = url
4451
self._payload = payload or {}

metaflow/plugins/events_decorator.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from metaflow.decorators import FlowDecorator
66
from metaflow.exception import MetaflowException
77
from metaflow.util import is_stringish
8-
from metaflow.parameters import DeployTimeField, deploy_time_eval
8+
from metaflow.parameters import DeployTimeField, deploy_time_eval, ParameterContext
99

1010
# TODO: Support dynamic parameter mapping through a context object that exposes
1111
# flow name and user name similar to parameter context
@@ -43,12 +43,23 @@ class TriggerDecorator(FlowDecorator):
4343
@trigger(event={'name':'foo', 'parameters':{'common_name': 'common_name', 'flow_param': 'event_field'}})
4444
```
4545
46+
For namespaced events, you can use `namespaced_event_name` which resolves the
47+
full event name at deploy time based on @project settings:
48+
```
49+
from metaflow import namespaced_event_name
50+
51+
@trigger(event=namespaced_event_name('foo'))
52+
53+
@trigger(event={'name': namespaced_event_name('foo'), 'parameters': {'x': 'y'}})
54+
```
55+
4656
Parameters
4757
----------
48-
event : Union[str, Dict[str, Any]], optional, default None
49-
Event dependency for this flow.
50-
events : List[Union[str, Dict[str, Any]]], default []
51-
Events dependency for this flow.
58+
event : Union[str, Dict[str, Any], Callable[[ParameterContext], Union[str, Dict[str, Any]]]], optional, default None
59+
Event dependency for this flow. Can be a string, dict, or a callable that
60+
returns a string or dict at deploy time.
61+
events : List[Union[str, Dict[str, Any],Callable[[ParameterContext], Union[str, Dict[str, Any]]]]], default []
62+
Events dependency for this flow. Each element can be a string, dict, or callable.
5263
options : Dict[str, Any], default {}
5364
Backend-specific configuration for tuning eventing behavior.
5465
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
from collections import namedtuple
2+
import functools
3+
import re
4+
5+
from metaflow.metaflow_config import NAMESPACED_EVENTS_PREFIX
6+
from metaflow.util import is_stringish
7+
8+
ParsedEvent = namedtuple(
9+
"ParsedEvent",
10+
["full_name", "project", "branch", "logical_name", "is_namespaced"],
11+
)
12+
13+
14+
def namespaced_event_name(event_name):
15+
"""
16+
Creates a project-namespaced event name based on @project settings.
17+
18+
Use this to automatically prefix event names with your @project
19+
namespace, ensuring events are isolated per project and branch.
20+
21+
The resolved name follows the format:
22+
mfns.<project>.<branch>.<event_name>
23+
24+
If no @project decorator is present, resolves to:
25+
mfns.<event_name>
26+
27+
Parameters
28+
----------
29+
event_name : str
30+
The logical event name (e.g., 'data_ready')
31+
32+
Returns
33+
-------
34+
Callable[..., str]
35+
A callable that returns the fully namespaced event name (str) when invoked
36+
37+
Examples
38+
--------
39+
With @trigger decorator:
40+
41+
```
42+
from metaflow import namespaced_event_name, project
43+
44+
@project(name="foo")
45+
@trigger(event=namespaced_event_name('data_ready'))
46+
class MyFlow(FlowSpec):
47+
...
48+
49+
@project(name="foo")
50+
@trigger(event={'name': namespaced_event_name('data_ready'), 'parameters': {'x': 'y'}})
51+
class MyFlow(FlowSpec):
52+
```
53+
54+
With ArgoEvent:
55+
56+
```
57+
from metaflow import namespaced_event_name
58+
from metaflow.integrations import ArgoEvent
59+
60+
ArgoEvent(namespaced_event_name('data_ready')).publish()
61+
```
62+
"""
63+
if event_name is None or not is_stringish(event_name):
64+
raise ValueError(
65+
"event_name should be a string not %s" % (str(type(event_name)))
66+
)
67+
68+
def _delayed_name_generator(context=None, event_name=None):
69+
from metaflow import current
70+
71+
project_name = current.get("project_name", None)
72+
branch_name = current.get("branch_name", None)
73+
return _make_namespaced_event_name(event_name, project_name, branch_name)
74+
75+
return functools.partial(_delayed_name_generator, event_name=event_name)
76+
77+
78+
def _make_namespaced_event_name(logical_name, project=None, branch=None):
79+
"""
80+
Construct a namespaced event name from components.
81+
82+
Parameters
83+
----------
84+
logical_name : str
85+
The base event name
86+
project : str, optional
87+
Project name from @project decorator
88+
branch : str, optional
89+
Branch name from @project decorator
90+
91+
Returns
92+
-------
93+
str
94+
Namespaced event name in format mfns.<project>.<branch>.<logical_name>
95+
or mfns.<logical_name> if no project/branch
96+
"""
97+
if project and branch:
98+
# When it comes to project and branches we want to make
99+
# sure that we are not having any non-acceptable characters.
100+
# allow dot (.) in project and branch names
101+
branch = re.sub(r"[^a-zA-Z0-9_\-\.]", "", branch)
102+
project = re.sub(r"[^a-zA-Z0-9_\-\.]", "", project)
103+
return f"{NAMESPACED_EVENTS_PREFIX}.{project}.{branch}.{logical_name}"
104+
else:
105+
return f"{NAMESPACED_EVENTS_PREFIX}.{logical_name}"

0 commit comments

Comments
 (0)