Skip to content

Commit d6c7d63

Browse files
author
Nissan Pow
committed
fix: export all Argo manifests (cron workflow, sensor) in --only-json
Previously, `argo-workflows create --only-json` only exported the WorkflowTemplate JSON, silently dropping CronWorkflow config (for @schedule flows) and Sensor config (for @trigger flows). Now the output is a JSON object with up to three keys: - "workflow_template": always present - "cron_workflow": present when @schedule is used - "sensor": present when @trigger/@trigger_on_finish is used Fixes #1730, fixes #1940.
1 parent 5bc105a commit d6c7d63

3 files changed

Lines changed: 217 additions & 2 deletions

File tree

metaflow/plugins/argo/argo_workflows.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,37 @@ def __init__(
194194
def __str__(self):
195195
return str(self._workflow_template)
196196

197+
def _cron_workflow_json(self):
198+
"""Build the CronWorkflow manifest as a dict (without deploying it)."""
199+
if self._schedule is None:
200+
return None
201+
return {
202+
"apiVersion": "argoproj.io/v1alpha1",
203+
"kind": "CronWorkflow",
204+
"metadata": {"name": self.name},
205+
"spec": {
206+
"suspend": False,
207+
"schedule": self._schedule,
208+
"timezone": self._timezone,
209+
"failedJobsHistoryLimit": 10000,
210+
"successfulJobsHistoryLimit": 10000,
211+
"workflowSpec": {"workflowTemplateRef": {"name": self.name}},
212+
"startingDeadlineSeconds": 3540,
213+
},
214+
}
215+
216+
def export_all_json(self):
217+
"""Return a JSON string with all Argo manifests for this flow."""
218+
result = {
219+
"workflow_template": self._workflow_template.to_json(),
220+
}
221+
cron = self._cron_workflow_json()
222+
if cron is not None:
223+
result["cron_workflow"] = cron
224+
if self._sensor is not None:
225+
result["sensor"] = self._sensor.to_json()
226+
return json.dumps(result, indent=4)
227+
197228
def deploy(self):
198229
self.cleanup_previous_sensors()
199230
try:

metaflow/plugins/argo/argo_workflows_cli.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,7 @@ def create(
364364
)
365365

366366
if only_json:
367-
obj.echo_always(str(flow), err=False, no_bold=True)
368-
# TODO: Support echo-ing Argo Events Sensor template
367+
obj.echo_always(flow.export_all_json(), err=False, no_bold=True)
369368
else:
370369
flow.deploy()
371370
obj.echo(
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
"""
2+
Tests for `argo-workflows create --only-json` manifest export.
3+
4+
Verifies that export_all_json() outputs all Argo manifests (workflow template,
5+
cron workflow, and sensor) as a single JSON object, not just the workflow
6+
template.
7+
8+
These tests exercise the ArgoWorkflows.export_all_json() method directly,
9+
bypassing the CLI and cluster access requirements.
10+
"""
11+
12+
import json
13+
from unittest.mock import MagicMock
14+
15+
import pytest
16+
17+
from metaflow.plugins.argo.argo_workflows import (
18+
ArgoWorkflows,
19+
Sensor,
20+
WorkflowTemplate,
21+
)
22+
23+
24+
def _make_argo_workflows(schedule=None, timezone=None, sensor=None):
25+
"""Create an ArgoWorkflows instance with pre-set internal state via mocking.
26+
27+
We bypass __init__ to avoid the many dependencies (graph, flow, datastore, etc.)
28+
and directly set the three internal attributes that export_all_json() reads.
29+
"""
30+
obj = object.__new__(ArgoWorkflows)
31+
32+
# Build a minimal WorkflowTemplate
33+
wt = WorkflowTemplate()
34+
wt.payload = {
35+
"apiVersion": "argoproj.io/v1alpha1",
36+
"kind": "WorkflowTemplate",
37+
"metadata": {"name": "test-flow"},
38+
"spec": {},
39+
}
40+
obj._workflow_template = wt
41+
obj.name = "test-flow"
42+
obj._schedule = schedule
43+
obj._timezone = timezone
44+
obj._sensor = sensor
45+
46+
return obj
47+
48+
49+
class TestExportAllJsonScheduled:
50+
"""A flow with @schedule should export both workflow_template and cron_workflow."""
51+
52+
def test_contains_workflow_template(self):
53+
argo = _make_argo_workflows(schedule="0 10 * * *", timezone="US/Pacific")
54+
data = json.loads(argo.export_all_json())
55+
assert "workflow_template" in data
56+
assert data["workflow_template"]["kind"] == "WorkflowTemplate"
57+
58+
def test_contains_cron_workflow(self):
59+
argo = _make_argo_workflows(schedule="0 10 * * *", timezone="US/Pacific")
60+
data = json.loads(argo.export_all_json())
61+
assert "cron_workflow" in data
62+
cron = data["cron_workflow"]
63+
assert cron["kind"] == "CronWorkflow"
64+
assert cron["apiVersion"] == "argoproj.io/v1alpha1"
65+
assert cron["spec"]["schedule"] == "0 10 * * *"
66+
assert cron["spec"]["timezone"] == "US/Pacific"
67+
assert cron["spec"]["suspend"] is False
68+
assert cron["metadata"]["name"] == "test-flow"
69+
assert (
70+
cron["spec"]["workflowSpec"]["workflowTemplateRef"]["name"] == "test-flow"
71+
)
72+
73+
def test_no_sensor_without_trigger(self):
74+
argo = _make_argo_workflows(schedule="0 10 * * *", timezone="US/Pacific")
75+
data = json.loads(argo.export_all_json())
76+
assert "sensor" not in data
77+
78+
79+
class TestExportAllJsonTriggered:
80+
"""A flow with @trigger should export both workflow_template and sensor."""
81+
82+
def _make_sensor(self):
83+
sensor = Sensor()
84+
sensor.payload = {
85+
"apiVersion": "argoproj.io/v1alpha1",
86+
"kind": "Sensor",
87+
"metadata": {"name": "test-flow"},
88+
"spec": {"dependencies": [], "triggers": []},
89+
}
90+
return sensor
91+
92+
def test_contains_workflow_template(self):
93+
argo = _make_argo_workflows(sensor=self._make_sensor())
94+
data = json.loads(argo.export_all_json())
95+
assert "workflow_template" in data
96+
assert data["workflow_template"]["kind"] == "WorkflowTemplate"
97+
98+
def test_contains_sensor(self):
99+
argo = _make_argo_workflows(sensor=self._make_sensor())
100+
data = json.loads(argo.export_all_json())
101+
assert "sensor" in data
102+
assert data["sensor"]["kind"] == "Sensor"
103+
104+
def test_no_cron_without_schedule(self):
105+
argo = _make_argo_workflows(sensor=self._make_sensor())
106+
data = json.loads(argo.export_all_json())
107+
assert "cron_workflow" not in data
108+
109+
110+
class TestExportAllJsonPlain:
111+
"""A plain flow (no @schedule, no @trigger) exports only the workflow template."""
112+
113+
def test_only_workflow_template(self):
114+
argo = _make_argo_workflows()
115+
data = json.loads(argo.export_all_json())
116+
assert "workflow_template" in data
117+
assert "cron_workflow" not in data
118+
assert "sensor" not in data
119+
120+
def test_output_is_valid_json(self):
121+
argo = _make_argo_workflows()
122+
raw = argo.export_all_json()
123+
# Should parse without error
124+
data = json.loads(raw)
125+
assert isinstance(data, dict)
126+
127+
128+
class TestExportAllJsonBothScheduleAndTrigger:
129+
"""A flow with both @schedule and @trigger exports all three manifests."""
130+
131+
def _make_sensor(self):
132+
sensor = Sensor()
133+
sensor.payload = {
134+
"apiVersion": "argoproj.io/v1alpha1",
135+
"kind": "Sensor",
136+
"metadata": {"name": "test-flow"},
137+
"spec": {"dependencies": [], "triggers": []},
138+
}
139+
return sensor
140+
141+
def test_all_three_present(self):
142+
argo = _make_argo_workflows(
143+
schedule="*/5 * * * *",
144+
timezone="UTC",
145+
sensor=self._make_sensor(),
146+
)
147+
data = json.loads(argo.export_all_json())
148+
assert "workflow_template" in data
149+
assert "cron_workflow" in data
150+
assert "sensor" in data
151+
152+
def test_cron_schedule_value(self):
153+
argo = _make_argo_workflows(
154+
schedule="*/5 * * * *",
155+
timezone="UTC",
156+
sensor=self._make_sensor(),
157+
)
158+
data = json.loads(argo.export_all_json())
159+
assert data["cron_workflow"]["spec"]["schedule"] == "*/5 * * * *"
160+
assert data["cron_workflow"]["spec"]["timezone"] == "UTC"
161+
162+
163+
class TestCronWorkflowJson:
164+
"""Tests for the _cron_workflow_json() helper method."""
165+
166+
def test_returns_none_when_no_schedule(self):
167+
argo = _make_argo_workflows()
168+
assert argo._cron_workflow_json() is None
169+
170+
def test_returns_dict_when_schedule_set(self):
171+
argo = _make_argo_workflows(schedule="0 0 * * *", timezone="UTC")
172+
cron = argo._cron_workflow_json()
173+
assert isinstance(cron, dict)
174+
assert cron["kind"] == "CronWorkflow"
175+
176+
def test_cron_spec_matches_argo_client(self):
177+
"""Verify the cron body matches what ArgoClient.schedule_workflow_template builds."""
178+
argo = _make_argo_workflows(schedule="0 10 * * *", timezone="US/Pacific")
179+
cron = argo._cron_workflow_json()
180+
assert cron["spec"]["failedJobsHistoryLimit"] == 10000
181+
assert cron["spec"]["successfulJobsHistoryLimit"] == 10000
182+
assert cron["spec"]["startingDeadlineSeconds"] == 3540
183+
assert cron["spec"]["workflowSpec"] == {
184+
"workflowTemplateRef": {"name": "test-flow"}
185+
}

0 commit comments

Comments
 (0)