Skip to content

Commit 5433d72

Browse files
author
Nissan Pow
committed
feat: add --enable-schedule/--no-enable-schedule flag to disable schedules during deployment
When deploying dev/test branches of production flows that use @schedule, the schedule gets deployed too, potentially interfering with production. This adds a --no-enable-schedule flag to both `argo-workflows create` and `step-functions create` that deploys the flow with the schedule suspended (Argo) or skipped (SFN). Also supports METAFLOW_SCHEDULE_DISABLED=1 env var for CI/CD use. CLI flag takes precedence over the env var. Closes #1169
1 parent ddf5d68 commit 5433d72

8 files changed

Lines changed: 265 additions & 9 deletions

File tree

metaflow/metaflow_config.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,13 @@
390390
)
391391
# Toggle for step command being part of the Step Function payload, or if it should be offloaded to S3
392392
SFN_COMPRESS_STATE_MACHINE = from_conf("SFN_COMPRESS_STATE_MACHINE", False)
393+
394+
###
395+
# Schedule configuration
396+
###
397+
# When set to True, deploys the workflow with the schedule disabled (suspended).
398+
# Useful for deploying dev/test branches without activating production schedules.
399+
SCHEDULE_DISABLED = from_conf("SCHEDULE_DISABLED", False)
393400
###
394401
# Kubernetes configuration
395402
###

metaflow/plugins/argo/argo_workflows.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -463,12 +463,12 @@ def _get_schedule(self):
463463
return " ".join(schedule.schedule.split()[:5]), schedule.timezone
464464
return None, None
465465

466-
def schedule(self):
466+
def schedule(self, schedule_disabled=False):
467467
try:
468+
schedule = None if schedule_disabled else self._schedule
469+
timezone = None if schedule_disabled else self._timezone
468470
argo_client = ArgoClient(namespace=KUBERNETES_NAMESPACE)
469-
argo_client.schedule_workflow_template(
470-
self.name, self._schedule, self._timezone
471-
)
471+
argo_client.schedule_workflow_template(self.name, schedule, timezone)
472472
# Register sensor.
473473
# Metaflow will overwrite any existing sensor.
474474
sensor_name = ArgoWorkflows._sensor_name(self.name)

metaflow/plugins/argo/argo_workflows_cli.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
ARGO_WORKFLOWS_UI_URL,
1818
FEAT_ALWAYS_UPLOAD_CODE_PACKAGE,
1919
KUBERNETES_NAMESPACE,
20+
SCHEDULE_DISABLED,
2021
SERVICE_VERSION_CHECK,
2122
UI_URL,
2223
)
@@ -68,6 +69,17 @@ class UnsupportedPythonVersion(MetaflowException):
6869
headline = "Unsupported version of Python"
6970

7071

72+
def _is_schedule_disabled(enable_schedule):
73+
"""Resolve whether the schedule should be disabled.
74+
75+
CLI flag (--enable-schedule / --no-enable-schedule) takes precedence.
76+
If not specified, falls back to the METAFLOW_SCHEDULE_DISABLED env var / config.
77+
"""
78+
if enable_schedule is not None:
79+
return not enable_schedule
80+
return SCHEDULE_DISABLED
81+
82+
7183
@click.group()
7284
def cli():
7385
pass
@@ -214,6 +226,14 @@ def argo_workflows(obj, name=None):
214226
show_default=True,
215227
help="Use a daemon container to broadcast heartbeats.",
216228
)
229+
@click.option(
230+
"--enable-schedule/--no-enable-schedule",
231+
default=None,
232+
show_default=False,
233+
help="Deploy the workflow with the schedule enabled or disabled (suspended). "
234+
"Useful for deploying dev/test branches without activating production schedules. "
235+
"Defaults to enabled unless METAFLOW_SCHEDULE_DISABLED is set.",
236+
)
217237
@click.option(
218238
"--deployer-attribute-file",
219239
default=None,
@@ -261,6 +281,7 @@ def create(
261281
incident_io_alert_source_config_id=None,
262282
incident_io_metadata=None,
263283
enable_heartbeat_daemon=True,
284+
enable_schedule=None,
264285
workflow_title=None,
265286
workflow_description=None,
266287
deployer_attribute_file=None,
@@ -425,8 +446,17 @@ def create(
425446
"%s/%s\n\n" % (argo_workflowtemplate_link, obj.workflow_name),
426447
indent=True,
427448
)
428-
flow.schedule()
449+
# Resolve schedule_disabled: CLI flag takes precedence, then env var
450+
schedule_disabled = _is_schedule_disabled(enable_schedule)
451+
452+
flow.schedule(schedule_disabled=schedule_disabled)
429453
obj.echo("What will trigger execution of the workflow:", bold=True)
454+
if schedule_disabled:
455+
obj.echo(
456+
"The schedule for this workflow has been *disabled* "
457+
"(deployed in suspended state).",
458+
indent=True,
459+
)
430460
obj.echo(flow.trigger_explanation(), indent=True)
431461

432462
# TODO: Print events emitted by execution of this flow

metaflow/plugins/argo/argo_workflows_deployer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ def create(
9090
Write the workflow name to the specified file. Used internally for Metaflow's Deployer API.
9191
enable_error_msg_capture : bool, optional, default True
9292
Capture stack trace of first failed task in exit hook.
93+
enable_schedule : bool, optional, default None
94+
Deploy the workflow with the schedule enabled or disabled (suspended).
95+
Useful for deploying dev/test branches without activating production schedules.
96+
Defaults to enabled unless METAFLOW_SCHEDULE_DISABLED is set.
9397
9498
Returns
9599
-------

metaflow/plugins/aws/step_functions/step_functions.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,11 @@ def deploy(self, log_execution_history):
147147
except Exception as e:
148148
raise StepFunctionsException(repr(e))
149149

150-
def schedule(self):
150+
def schedule(self, schedule_disabled=False):
151151
# Scheduling is currently enabled via AWS Event Bridge.
152-
# If no cron schedule is defined, nothing to do.
153-
if not self._cron:
152+
# If no cron schedule is defined, or schedule is explicitly disabled,
153+
# nothing to do.
154+
if not self._cron or schedule_disabled:
154155
return
155156
if EVENTS_SFN_ACCESS_IAM_ROLE is None:
156157
raise StepFunctionsSchedulingException(

metaflow/plugins/aws/step_functions/step_functions_cli.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from metaflow.exception import MetaflowException, MetaflowInternalError
99
from metaflow.metaflow_config import (
1010
FEAT_ALWAYS_UPLOAD_CODE_PACKAGE,
11+
SCHEDULE_DISABLED,
1112
SERVICE_VERSION_CHECK,
1213
SFN_STATE_MACHINE_PREFIX,
1314
SFN_COMPRESS_STATE_MACHINE,
@@ -42,6 +43,17 @@ class StepFunctionsStateMachineNameTooLong(MetaflowException):
4243
headline = "AWS Step Functions state machine name too long"
4344

4445

46+
def _is_schedule_disabled(enable_schedule):
47+
"""Resolve whether the schedule should be disabled.
48+
49+
CLI flag (--enable-schedule / --no-enable-schedule) takes precedence.
50+
If not specified, falls back to the METAFLOW_SCHEDULE_DISABLED env var / config.
51+
"""
52+
if enable_schedule is not None:
53+
return not enable_schedule
54+
return SCHEDULE_DISABLED
55+
56+
4557
@click.group()
4658
def cli():
4759
pass
@@ -147,6 +159,14 @@ def step_functions(obj, name=None):
147159
default=SFN_COMPRESS_STATE_MACHINE,
148160
help="Compress AWS Step Functions state machine to fit within the 8K limit.",
149161
)
162+
@click.option(
163+
"--enable-schedule/--no-enable-schedule",
164+
default=None,
165+
show_default=False,
166+
help="Deploy the workflow with the schedule enabled or disabled. "
167+
"Useful for deploying dev/test branches without activating production schedules. "
168+
"Defaults to enabled unless METAFLOW_SCHEDULE_DISABLED is set.",
169+
)
150170
@click.option(
151171
"--deployer-attribute-file",
152172
default=None,
@@ -170,6 +190,7 @@ def create(
170190
log_execution_history=False,
171191
use_distributed_map=False,
172192
compress_state_machine=False,
193+
enable_schedule=None,
173194
deployer_attribute_file=None,
174195
):
175196
for node in obj.graph:
@@ -241,8 +262,17 @@ def create(
241262
"due to a length limit on AWS Step Functions. The "
242263
"original long name is stored in task metadata.\n"
243264
)
244-
flow.schedule()
265+
# Resolve schedule_disabled: CLI flag takes precedence, then env var
266+
schedule_disabled = _is_schedule_disabled(enable_schedule)
267+
268+
flow.schedule(schedule_disabled=schedule_disabled)
245269
obj.echo("What will trigger execution of the workflow:", bold=True)
270+
if schedule_disabled:
271+
obj.echo(
272+
"The schedule for this workflow has been *disabled* "
273+
"(EventBridge rule will not be created).",
274+
indent=True,
275+
)
246276
obj.echo(flow.trigger_explanation(), indent=True)
247277

248278

metaflow/plugins/aws/step_functions/step_functions_deployer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ def create(
7878
tasks in Amazon State Language.
7979
compress_state_machine : bool, optional, default False
8080
Compress AWS Step Functions state machine to fit within the 8K limit.
81+
enable_schedule : bool, optional, default None
82+
Deploy the workflow with the schedule enabled or disabled.
83+
Useful for deploying dev/test branches without activating production schedules.
84+
Defaults to enabled unless METAFLOW_SCHEDULE_DISABLED is set.
8185
8286
deployer_attribute_file : str, optional, default None
8387
Write the workflow name to the specified file. Used internally for Metaflow's Deployer API.

test/unit/test_schedule_toggle.py

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
"""Tests for --enable-schedule / --no-enable-schedule CLI flag."""
2+
3+
import pytest
4+
from unittest.mock import patch
5+
6+
7+
class TestIsScheduleDisabledArgo:
8+
"""Test _is_schedule_disabled in argo_workflows_cli."""
9+
10+
def _get_fn(self):
11+
from metaflow.plugins.argo.argo_workflows_cli import _is_schedule_disabled
12+
13+
return _is_schedule_disabled
14+
15+
def test_cli_flag_enable_overrides_env(self):
16+
fn = self._get_fn()
17+
with patch("metaflow.plugins.argo.argo_workflows_cli.SCHEDULE_DISABLED", True):
18+
# --enable-schedule should override env var
19+
assert fn(True) is False
20+
21+
def test_cli_flag_disable_overrides_env(self):
22+
fn = self._get_fn()
23+
with patch("metaflow.plugins.argo.argo_workflows_cli.SCHEDULE_DISABLED", False):
24+
# --no-enable-schedule should override env var
25+
assert fn(False) is True
26+
27+
def test_env_var_disabled(self):
28+
fn = self._get_fn()
29+
with patch("metaflow.plugins.argo.argo_workflows_cli.SCHEDULE_DISABLED", True):
30+
# No CLI flag, env var says disabled
31+
assert fn(None) is True
32+
33+
def test_env_var_enabled(self):
34+
fn = self._get_fn()
35+
with patch("metaflow.plugins.argo.argo_workflows_cli.SCHEDULE_DISABLED", False):
36+
# No CLI flag, env var says enabled (default)
37+
assert fn(None) is False
38+
39+
40+
class TestIsScheduleDisabledSFN:
41+
"""Test _is_schedule_disabled in step_functions_cli."""
42+
43+
def _get_fn(self):
44+
from metaflow.plugins.aws.step_functions.step_functions_cli import (
45+
_is_schedule_disabled,
46+
)
47+
48+
return _is_schedule_disabled
49+
50+
def test_cli_flag_enable_overrides_env(self):
51+
fn = self._get_fn()
52+
with patch(
53+
"metaflow.plugins.aws.step_functions.step_functions_cli.SCHEDULE_DISABLED",
54+
True,
55+
):
56+
assert fn(True) is False
57+
58+
def test_cli_flag_disable_overrides_env(self):
59+
fn = self._get_fn()
60+
with patch(
61+
"metaflow.plugins.aws.step_functions.step_functions_cli.SCHEDULE_DISABLED",
62+
False,
63+
):
64+
assert fn(False) is True
65+
66+
def test_env_var_disabled(self):
67+
fn = self._get_fn()
68+
with patch(
69+
"metaflow.plugins.aws.step_functions.step_functions_cli.SCHEDULE_DISABLED",
70+
True,
71+
):
72+
assert fn(None) is True
73+
74+
def test_env_var_enabled(self):
75+
fn = self._get_fn()
76+
with patch(
77+
"metaflow.plugins.aws.step_functions.step_functions_cli.SCHEDULE_DISABLED",
78+
False,
79+
):
80+
assert fn(None) is False
81+
82+
83+
class TestArgoScheduleDisabled:
84+
"""Test that ArgoWorkflows.schedule() respects schedule_disabled."""
85+
86+
def test_schedule_passes_none_when_disabled(self):
87+
"""When schedule_disabled=True, schedule and timezone should be None."""
88+
from unittest.mock import MagicMock
89+
90+
from metaflow.plugins.argo.argo_workflows import ArgoWorkflows
91+
92+
aw = object.__new__(ArgoWorkflows)
93+
aw.name = "test-workflow"
94+
aw._schedule = "0 * * * *"
95+
aw._timezone = "UTC"
96+
aw._sensor = None
97+
98+
mock_client = MagicMock()
99+
with patch(
100+
"metaflow.plugins.argo.argo_workflows.ArgoClient",
101+
return_value=mock_client,
102+
):
103+
aw.schedule(schedule_disabled=True)
104+
105+
mock_client.schedule_workflow_template.assert_called_once_with(
106+
"test-workflow", None, None
107+
)
108+
109+
def test_schedule_passes_values_when_enabled(self):
110+
"""When schedule_disabled=False, schedule and timezone should pass through."""
111+
from unittest.mock import MagicMock
112+
113+
from metaflow.plugins.argo.argo_workflows import ArgoWorkflows
114+
115+
aw = object.__new__(ArgoWorkflows)
116+
aw.name = "test-workflow"
117+
aw._schedule = "0 * * * *"
118+
aw._timezone = "UTC"
119+
aw._sensor = None
120+
121+
mock_client = MagicMock()
122+
with patch(
123+
"metaflow.plugins.argo.argo_workflows.ArgoClient",
124+
return_value=mock_client,
125+
):
126+
aw.schedule(schedule_disabled=False)
127+
128+
mock_client.schedule_workflow_template.assert_called_once_with(
129+
"test-workflow", "0 * * * *", "UTC"
130+
)
131+
132+
133+
class TestSFNScheduleDisabled:
134+
"""Test that StepFunctions.schedule() respects schedule_disabled."""
135+
136+
def test_schedule_skipped_when_disabled(self):
137+
"""When schedule_disabled=True, EventBridge schedule should not be created."""
138+
from unittest.mock import MagicMock
139+
140+
from metaflow.plugins.aws.step_functions.step_functions import StepFunctions
141+
142+
sf = object.__new__(StepFunctions)
143+
sf.name = "test-state-machine"
144+
sf._cron = "0 * * * ? *"
145+
146+
with patch(
147+
"metaflow.plugins.aws.step_functions.step_functions.EventBridgeClient"
148+
) as mock_eb:
149+
sf.schedule(schedule_disabled=True)
150+
151+
# EventBridgeClient should not be called at all
152+
mock_eb.assert_not_called()
153+
154+
def test_schedule_created_when_enabled(self):
155+
"""When schedule_disabled=False, EventBridge schedule should be created normally."""
156+
from unittest.mock import MagicMock
157+
158+
from metaflow.plugins.aws.step_functions.step_functions import StepFunctions
159+
160+
sf = object.__new__(StepFunctions)
161+
sf.name = "test-state-machine"
162+
sf._cron = "0 * * * ? *"
163+
sf._state_machine_arn = "arn:aws:states:us-east-1:123:stateMachine:test"
164+
165+
mock_eb_instance = MagicMock()
166+
mock_eb_instance.cron.return_value = mock_eb_instance
167+
mock_eb_instance.role_arn.return_value = mock_eb_instance
168+
mock_eb_instance.state_machine_arn.return_value = mock_eb_instance
169+
mock_eb_instance.schedule.return_value = "test-state-machine"
170+
171+
with patch(
172+
"metaflow.plugins.aws.step_functions.step_functions.EventBridgeClient",
173+
return_value=mock_eb_instance,
174+
), patch(
175+
"metaflow.plugins.aws.step_functions.step_functions.EVENTS_SFN_ACCESS_IAM_ROLE",
176+
"arn:aws:iam::123:role/test",
177+
):
178+
sf.schedule(schedule_disabled=False)
179+
180+
mock_eb_instance.cron.assert_called_once_with("0 * * * ? *")

0 commit comments

Comments
 (0)