Skip to content

Commit 85cdb74

Browse files
author
Nissan Pow
committed
add Phase 2 @trigger tests for OSS backends
5 tests using ArgoEvent.publish() to send events that wake deployed trigger flows: - test_static_triggers: static @trigger with multiple events and parameter mappings - test_deploy_time_trigger_toplevel_list: deploy-time trigger with callable returning list - test_deploy_time_trigger_toplevel_name: deploy-time trigger with config_expr event name - test_deploy_time_trigger_events_nested: deploy-time trigger with nested callables/config_expr - test_deploy_time_trigger_event_nested: deploy-time single event with nested callable parameters Tests skip gracefully on non-Argo backends (sfn, airflow) via _require_argo_events(). Flow files copied verbatim from internal test suite.
1 parent 1820867 commit 85cdb74

6 files changed

Lines changed: 712 additions & 0 deletions
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from metaflow import (
2+
FlowSpec,
3+
step,
4+
Parameter,
5+
trigger,
6+
current,
7+
config_expr,
8+
Config,
9+
project,
10+
)
11+
12+
13+
def myfunction1(context):
14+
return f"{current.branch_name}.HelloDeployTimeTriggerEventNestedFlow"
15+
16+
17+
def myfunction2(context):
18+
return [
19+
"param",
20+
(
21+
"param_field",
22+
"events_field",
23+
),
24+
]
25+
26+
27+
@trigger(event={"name": myfunction1, "parameters": myfunction2})
28+
@project(
29+
name=config_expr("config.project"),
30+
branch=config_expr("config.branch"),
31+
)
32+
class HelloDeployTimeTriggerEventNestedFlow(FlowSpec):
33+
config = Config("config")
34+
35+
param = Parameter(
36+
"param",
37+
help="flow parameter",
38+
default="default_value",
39+
)
40+
param_field = Parameter(
41+
"param_field",
42+
help="flow parameter",
43+
default="default_value",
44+
)
45+
46+
@step
47+
def start(self):
48+
from metaflow import metaflow_version
49+
50+
print(f"In start step and using metaflow: {metaflow_version.get_version()}")
51+
print("In HelloDeployTimeTriggerEventNestedFlow")
52+
print(f"param: {self.param}")
53+
print(f"param_field: {self.param_field}")
54+
self.trigger = current.trigger
55+
self.next(self.end)
56+
57+
@step
58+
def end(self):
59+
pass
60+
61+
62+
if __name__ == "__main__":
63+
HelloDeployTimeTriggerEventNestedFlow()
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
from metaflow import (
2+
FlowSpec,
3+
step,
4+
Parameter,
5+
trigger,
6+
current,
7+
config_expr,
8+
Config,
9+
project,
10+
)
11+
12+
13+
def myfunc1(context):
14+
return f"{current.branch_name}.HelloDeployTimeTriggerEventsNestedFlow1"
15+
16+
17+
def myfunc2(context):
18+
return ["param1"]
19+
20+
21+
def myfunc3(context):
22+
return ["param2"]
23+
24+
25+
def myfunc4(context):
26+
return {
27+
"name": config_expr(
28+
"config.branch + '.HelloDeployTimeTriggerEventsNestedFlow3'"
29+
),
30+
"parameters": ["param3"],
31+
}
32+
33+
34+
@trigger(
35+
events=[
36+
config_expr("config.branch + '.HelloDeployTimeTriggerEventsNestedFlow'"),
37+
{
38+
"name": myfunc1,
39+
"parameters": myfunc2,
40+
},
41+
{
42+
"name": config_expr(
43+
"config.branch + '.HelloDeployTimeTriggerEventsNestedFlow2'"
44+
),
45+
"parameters": myfunc3,
46+
},
47+
myfunc4,
48+
{
49+
"name": config_expr(
50+
"config.branch + '.HelloDeployTimeTriggerEventsNestedFlow4'"
51+
),
52+
"parameters": {"param4": "param_from_trigger"},
53+
},
54+
]
55+
)
56+
@project(
57+
name=config_expr("config.project"),
58+
branch=config_expr("config.branch"),
59+
)
60+
class HelloDeployTimeTriggerEventsNestedFlow(FlowSpec):
61+
config = Config("config")
62+
param1 = Parameter("param1", type=str, default="default_value")
63+
param2 = Parameter("param2", type=str, default="default_value")
64+
param3 = Parameter("param3", type=str, default="default_value")
65+
param4 = Parameter("param4", type=str, default="default_value")
66+
67+
@step
68+
def start(self):
69+
from metaflow import metaflow_version
70+
71+
print(f"In start step and using metaflow: {metaflow_version.get_version()}")
72+
print("In HelloDeployTimeTriggerEventsNestedFlow")
73+
print(f"param1: {self.param1}")
74+
print(f"param2: {self.param2}")
75+
print(f"param3: {self.param3}")
76+
print(f"param4: {self.param4}")
77+
self.trigger = current.trigger
78+
self.next(self.end)
79+
80+
@step
81+
def end(self):
82+
pass
83+
84+
85+
if __name__ == "__main__":
86+
HelloDeployTimeTriggerEventsNestedFlow()
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from metaflow import (
2+
FlowSpec,
3+
step,
4+
project,
5+
current,
6+
trigger,
7+
config_expr,
8+
Config,
9+
Parameter,
10+
)
11+
12+
13+
def myfunction(context):
14+
return [
15+
{
16+
"name": f"{current.branch_name}.HelloDeployTimeTriggerTopLevelListFlow1",
17+
"parameters": ["param1"],
18+
},
19+
{
20+
"name": f"{current.branch_name}.HelloDeployTimeTriggerTopLevelListFlow2",
21+
"parameters": ["param2"],
22+
},
23+
]
24+
25+
26+
@trigger(events=myfunction)
27+
@project(
28+
name=config_expr("config.project"),
29+
branch=config_expr("config.branch"),
30+
)
31+
class HelloDeployTimeTriggerTopLevelListFlow(FlowSpec):
32+
config = Config("config")
33+
34+
param1 = Parameter(
35+
"param1", type=str, default="default_value", help="flow parameter"
36+
)
37+
param2 = Parameter(
38+
"param2", type=str, default="default_value", help="flow parameter"
39+
)
40+
41+
@step
42+
def start(self):
43+
from metaflow import metaflow_version
44+
45+
print(f"In start step and using metaflow: {metaflow_version.get_version()}")
46+
print("In HelloDeployTimeTriggerTopLevelListFlow")
47+
print("param1: ", self.param1)
48+
print("param2: ", self.param2)
49+
self.trigger = current.trigger
50+
self.next(self.end)
51+
52+
@step
53+
def end(self):
54+
pass
55+
56+
57+
if __name__ == "__main__":
58+
HelloDeployTimeTriggerTopLevelListFlow()
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from metaflow import (
2+
FlowSpec,
3+
step,
4+
Parameter,
5+
trigger,
6+
current,
7+
config_expr,
8+
Config,
9+
project,
10+
)
11+
12+
13+
@trigger(event=config_expr("config.branch + '.HelloDeployTimeTriggerTopLevelNameFlow'"))
14+
@project(name="dummy_project")
15+
class HelloDeployTimeTriggerTopLevelNameFlow(FlowSpec):
16+
config = Config("config")
17+
18+
param1 = Parameter("param1", type=str, default="default_value")
19+
20+
@step
21+
def start(self):
22+
from metaflow import metaflow_version
23+
24+
print(f"In start step and using metaflow: {metaflow_version.get_version()}")
25+
print("In HelloDeployTimeTriggerTopLevelNameFlow")
26+
self.trigger = current.trigger
27+
self.next(self.end)
28+
29+
@step
30+
def end(self):
31+
pass
32+
33+
34+
if __name__ == "__main__":
35+
HelloDeployTimeTriggerTopLevelNameFlow()
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from metaflow import (
2+
FlowSpec,
3+
step,
4+
Parameter,
5+
trigger,
6+
current,
7+
config_expr,
8+
Config,
9+
project,
10+
)
11+
12+
13+
@trigger(
14+
events=[
15+
config_expr("config.branch + '.HelloStaticTriggerName'"),
16+
{
17+
"name": config_expr("config.branch + '.HelloStaticTriggerName1'"),
18+
"parameters": ["param1"],
19+
},
20+
{
21+
"name": config_expr("config.branch + '.HelloStaticTriggerName2'"),
22+
"parameters": [("param2", "param2_events_field")],
23+
},
24+
{
25+
"name": config_expr("config.branch + '.HelloStaticTriggerName3'"),
26+
"parameters": {"param3": "param3_events_field"},
27+
},
28+
]
29+
)
30+
@project(name=config_expr("config.project"))
31+
class HelloStaticTriggerFlow(FlowSpec):
32+
config = Config("config")
33+
param1 = Parameter("param1", type=str, default="default_value")
34+
param2 = Parameter("param2", type=str, default="default_value")
35+
param3 = Parameter("param3", type=str, default="default_value")
36+
37+
@step
38+
def start(self):
39+
from metaflow import metaflow_version
40+
41+
print(f"In start step and using metaflow: {metaflow_version.get_version()}")
42+
print("In HelloStaticTriggerFlow")
43+
print(f"param1: {self.param1}")
44+
print(f"param2: {self.param2}")
45+
print(f"param3: {self.param3}")
46+
self.trigger = current.trigger
47+
self.next(self.end)
48+
49+
@step
50+
def end(self):
51+
pass
52+
53+
54+
if __name__ == "__main__":
55+
HelloStaticTriggerFlow()

0 commit comments

Comments
 (0)