Skip to content

Commit d250359

Browse files
author
Nissan Pow
committed
add plugin-based send_event() API for backend-agnostic event triggering
Register event providers (ArgoEvent, SFNEvent, AirflowEvent) in the Metaflow plugin system via EVENT_PROVIDERS_DESC. The new send_event() function in metaflow.integrations auto-detects the configured backend or accepts an explicit backend= override. - Add "event_provider" category to _plugin_categories - Add EVENT_PROVIDERS_DESC + resolve_plugins("event_provider") - Create SFNEvent (EventBridge) and AirflowEvent (REST API) providers - Fix ArgoEvent.is_configured() to read env var at call time - Fix ArgoEvent.__init__ url resolution for late-bound env vars - Update Phase 2 trigger tests to use plugin-based send_event
1 parent 21adc5f commit d250359

8 files changed

Lines changed: 352 additions & 47 deletions

File tree

metaflow/extension_support/plugins.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ def resolve_plugins(category, path_only=False):
189189
"secrets_provider": lambda x: x.TYPE,
190190
"gcp_client_provider": lambda x: x.name,
191191
"deployer_impl_provider": lambda x: x.TYPE,
192+
"event_provider": lambda x: x.TYPE,
192193
"azure_client_provider": lambda x: x.name,
193194
"sidecar": None,
194195
"logging_sidecar": None,

metaflow/integrations.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717
# - name: name of the integration alias
1818
# - obj: object it points to
1919
#
20-
ALIASES_DESC = [("ArgoEvent", ".plugins.argo.argo_events.ArgoEvent")]
20+
ALIASES_DESC = [
21+
("ArgoEvent", ".plugins.argo.argo_events.ArgoEvent"),
22+
("SFNEvent", ".plugins.aws.step_functions.sfn_event.SFNEvent"),
23+
("AirflowEvent", ".plugins.airflow.airflow_event.AirflowEvent"),
24+
]
2125

2226
# Aliases can be enabled or disabled through configuration or extensions:
2327
# - ENABLED_INTEGRATION_ALIAS: list of alias names to enable.
@@ -27,5 +31,70 @@
2731
# ENABLED_INTEGRATION_ALIAS from the concatenation of the names in
2832
# ALIASES_DESC (concatenation of the names here as well as in the extensions).
2933

34+
35+
def _resolve_event_providers():
36+
"""Lazily resolve all registered event provider classes from the plugin system."""
37+
from metaflow.plugins import EVENT_PROVIDERS
38+
39+
return EVENT_PROVIDERS
40+
41+
42+
def send_event(name, payload=None, backend=None, **kwargs):
43+
"""Send a trigger event to wake a deployed @trigger flow.
44+
45+
Discovers event providers via the plugin system. Each provider class
46+
exposes ``TYPE`` (e.g. ``"argo-workflows"``) and ``is_configured()``
47+
which checks whether the required environment variables are set.
48+
49+
When *backend* is ``None``, the first provider whose ``is_configured()``
50+
returns ``True`` is used. When *backend* is given, it is matched against
51+
the provider's ``TYPE`` (dashes are normalised to underscores).
52+
53+
New backends are added by registering an ``EVENT_PROVIDERS_DESC`` entry
54+
in ``metaflow/plugins/__init__.py`` — no changes to this function needed.
55+
56+
Parameters
57+
----------
58+
name : str
59+
Event name (must match the @trigger event name on the deployed flow).
60+
payload : dict, optional
61+
Key-value pairs delivered with the event, used to set parameters.
62+
backend : str, optional
63+
Override auto-detection. Matched against provider TYPE
64+
(e.g. ``"argo-workflows"``, ``"step-functions"``, ``"airflow"``).
65+
Underscores are accepted as well (``"argo_workflows"``).
66+
**kwargs
67+
Passed through to the provider class constructor.
68+
69+
Returns
70+
-------
71+
str or None
72+
Event ID if published successfully.
73+
"""
74+
providers = _resolve_event_providers()
75+
76+
if backend is not None:
77+
# Normalise so "argo_workflows" matches "argo-workflows"
78+
norm = backend.replace("_", "-")
79+
for provider_class in providers:
80+
if provider_class.TYPE == norm:
81+
return provider_class(name, payload=payload, **kwargs).publish()
82+
available = [p.TYPE for p in providers]
83+
raise ValueError(
84+
"Unknown event backend %r. Available: %s" % (backend, available)
85+
)
86+
87+
# Auto-detect: pick the first configured provider
88+
for provider_class in providers:
89+
if hasattr(provider_class, "is_configured") and provider_class.is_configured():
90+
return provider_class(name, payload=payload, **kwargs).publish()
91+
92+
available = [p.TYPE for p in providers]
93+
raise RuntimeError(
94+
"Cannot auto-detect event backend — no provider is configured. "
95+
"Set the 'backend' parameter explicitly or configure one of: %s" % available
96+
)
97+
98+
3099
# Keep this line and make sure ALIASES_DESC is above this line.
31100
process_integration_aliases(globals())

metaflow/plugins/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,12 @@
172172
("airflow", ".airflow.airflow_deployer.AirflowDeployer"),
173173
]
174174

175+
EVENT_PROVIDERS_DESC = [
176+
("argo-workflows", ".argo.argo_events.ArgoEvent"),
177+
("step-functions", ".aws.step_functions.sfn_event.SFNEvent"),
178+
("airflow", ".airflow.airflow_event.AirflowEvent"),
179+
]
180+
175181
TL_PLUGINS_DESC = [
176182
("yaml_parser", ".parsers.yaml_parser"),
177183
("requirements_txt_parser", ".pypi.parsers.requirements_txt_parser"),
@@ -220,6 +226,8 @@ def get_runner_cli_path():
220226
if sys.version_info >= (3, 7):
221227
DEPLOYER_IMPL_PROVIDERS = resolve_plugins("deployer_impl_provider")
222228

229+
EVENT_PROVIDERS = resolve_plugins("event_provider")
230+
223231
TL_PLUGINS = resolve_plugins("tl_plugin")
224232

225233
from .cards.card_modules import MF_EXTERNAL_CARDS
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import json
2+
import sys
3+
import time
4+
import urllib
5+
import uuid
6+
from datetime import datetime
7+
8+
from metaflow.exception import MetaflowException
9+
10+
11+
class AirflowEventException(MetaflowException):
12+
headline = "Airflow Event Exception"
13+
14+
15+
class AirflowEvent(object):
16+
"""
17+
AirflowEvent triggers an Airflow DAG run via the Airflow REST API,
18+
used to start flows deployed with @trigger.
19+
20+
Parameters
21+
----------
22+
name : str
23+
Event name — maps to the DAG ID of the deployed flow.
24+
url : str, optional
25+
Airflow webserver base URL. Defaults to METAFLOW_AIRFLOW_WEBSERVER_URL env var.
26+
payload : dict, optional
27+
Key-value pairs delivered as DAG run conf parameters.
28+
"""
29+
30+
TYPE = "airflow"
31+
32+
@classmethod
33+
def is_configured(cls):
34+
import os
35+
36+
return bool(os.environ.get("METAFLOW_AIRFLOW_WEBSERVER_URL"))
37+
38+
def __init__(self, name, url=None, payload=None):
39+
import os
40+
41+
self._name = name
42+
self._url = url or os.environ.get(
43+
"METAFLOW_AIRFLOW_WEBSERVER_URL", "http://localhost:8080"
44+
)
45+
self._payload = payload or {}
46+
47+
def add_to_payload(self, key, value):
48+
"""Add a key-value pair to the event payload."""
49+
self._payload[key] = str(value)
50+
return self
51+
52+
def publish(self, payload=None, ignore_errors=True):
53+
"""
54+
Trigger an Airflow DAG run via the REST API.
55+
56+
Parameters
57+
----------
58+
payload : dict, optional
59+
Additional key-value pairs to merge into the conf.
60+
ignore_errors : bool, default True
61+
If True, errors are silently ignored.
62+
63+
Returns
64+
-------
65+
str or None
66+
The DAG run ID if triggered successfully, None otherwise.
67+
"""
68+
if payload is None:
69+
payload = {}
70+
71+
try:
72+
dag_run_id = "metaflow__%s__%s" % (
73+
self._name,
74+
str(uuid.uuid4())[:8],
75+
)
76+
conf = {
77+
"name": self._name,
78+
"id": str(uuid.uuid4()),
79+
"timestamp": int(time.time()),
80+
"utc_date": datetime.utcnow().strftime("%Y%m%d"),
81+
"generated-by-metaflow": True,
82+
**self._payload,
83+
**payload,
84+
}
85+
86+
api_url = "%s/api/v1/dags/%s/dagRuns" % (
87+
self._url.rstrip("/"),
88+
self._name,
89+
)
90+
data = json.dumps({"dag_run_id": dag_run_id, "conf": conf}).encode("utf-8")
91+
92+
request = urllib.request.Request(
93+
api_url,
94+
method="POST",
95+
headers={"Content-Type": "application/json"},
96+
data=data,
97+
)
98+
99+
urllib.request.urlopen(request, timeout=60)
100+
print("Airflow Event (%s) published." % self._name, file=sys.stderr)
101+
return dag_run_id
102+
103+
except Exception as e:
104+
msg = "Unable to publish Airflow Event (%s): %s" % (self._name, e)
105+
if ignore_errors:
106+
print(msg, file=sys.stderr)
107+
return None
108+
else:
109+
raise AirflowEventException(msg)

metaflow/plugins/argo/argo_events.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,18 @@ class ArgoEvent(object):
3535
A set of key-value pairs delivered in this event. Used to set parameters of triggered flows.
3636
"""
3737

38-
def __init__(
39-
self, name, url=ARGO_EVENTS_WEBHOOK_URL, payload=None, access_token=None
40-
):
38+
TYPE = "argo-workflows"
39+
40+
@classmethod
41+
def is_configured(cls):
42+
# Check the env var directly (not the module-level constant) so that
43+
# env vars set after import are picked up. Also fall back to the
44+
# already-loaded config value for non-env-var configuration sources.
45+
return bool(
46+
os.environ.get("ARGO_EVENTS_WEBHOOK_URL") or ARGO_EVENTS_WEBHOOK_URL
47+
)
48+
49+
def __init__(self, name, url=None, payload=None, access_token=None):
4150
# TODO: Introduce support for NATS
4251
if callable(name):
4352
name = name()
@@ -47,7 +56,10 @@ def __init__(
4756
% type(name).__name__
4857
)
4958
self._name = name
50-
self._url = url
59+
# Resolve URL: explicit arg > env var > config constant
60+
self._url = (
61+
url or os.environ.get("ARGO_EVENTS_WEBHOOK_URL") or ARGO_EVENTS_WEBHOOK_URL
62+
)
5163
self._payload = payload or {}
5264
self._access_token = access_token
5365

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import json
2+
import os
3+
import sys
4+
import time
5+
import uuid
6+
from datetime import datetime
7+
8+
from metaflow.exception import MetaflowException
9+
10+
11+
class SFNEventException(MetaflowException):
12+
headline = "SFN Event Exception"
13+
14+
15+
class SFNEvent(object):
16+
"""
17+
SFNEvent sends a trigger event via AWS EventBridge to start Step Functions
18+
workflows deployed with @trigger.
19+
20+
Parameters
21+
----------
22+
name : str
23+
Event name (must match the @trigger event name on the deployed flow).
24+
payload : dict, optional
25+
Key-value pairs delivered with the event, used to set parameters of
26+
triggered flows.
27+
"""
28+
29+
TYPE = "step-functions"
30+
31+
@classmethod
32+
def is_configured(cls):
33+
import os
34+
35+
return bool(
36+
os.environ.get("METAFLOW_SFN_IAM_ROLE")
37+
or os.environ.get("METAFLOW_SFN_STATE_MACHINE_PREFIX")
38+
)
39+
40+
def __init__(self, name, payload=None):
41+
self._name = name
42+
self._payload = payload or {}
43+
44+
def add_to_payload(self, key, value):
45+
"""Add a key-value pair to the event payload."""
46+
self._payload[key] = str(value)
47+
return self
48+
49+
def publish(self, payload=None, ignore_errors=True):
50+
"""
51+
Publish an event to EventBridge that will trigger any Step Functions
52+
workflow listening for this event name.
53+
54+
Parameters
55+
----------
56+
payload : dict, optional
57+
Additional key-value pairs to merge into the payload.
58+
ignore_errors : bool, default True
59+
If True, errors are silently ignored.
60+
61+
Returns
62+
-------
63+
str or None
64+
The event ID if published successfully, None otherwise.
65+
"""
66+
if payload is None:
67+
payload = {}
68+
69+
try:
70+
import boto3
71+
72+
event_bus = os.environ.get("METAFLOW_SFN_EVENT_BUS_ARN", "default")
73+
74+
event_id = str(uuid.uuid4())
75+
event_payload = {
76+
"name": self._name,
77+
"id": event_id,
78+
"timestamp": int(time.time()),
79+
"utc_date": datetime.utcnow().strftime("%Y%m%d"),
80+
"generated-by-metaflow": True,
81+
**self._payload,
82+
**payload,
83+
}
84+
85+
client = boto3.client("events")
86+
response = client.put_events(
87+
Entries=[
88+
{
89+
"Source": "metaflow",
90+
"DetailType": self._name,
91+
"Detail": json.dumps(event_payload),
92+
"EventBusName": event_bus,
93+
}
94+
]
95+
)
96+
97+
if response.get("FailedEntryCount", 0) > 0:
98+
err = response["Entries"][0].get("ErrorMessage", "Unknown error")
99+
raise SFNEventException(
100+
"Failed to publish event %s: %s" % (self._name, err)
101+
)
102+
103+
print("SFN Event (%s) published." % self._name, file=sys.stderr)
104+
return event_id
105+
106+
except Exception as e:
107+
msg = "Unable to publish SFN Event (%s): %s" % (self._name, e)
108+
if ignore_errors:
109+
print(msg, file=sys.stderr)
110+
return None
111+
else:
112+
raise SFNEventException(msg)

0 commit comments

Comments
 (0)