Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,13 @@
)
# Toggle for step command being part of the Step Function payload, or if it should be offloaded to S3
SFN_COMPRESS_STATE_MACHINE = from_conf("SFN_COMPRESS_STATE_MACHINE", False)

###
# Schedule configuration
###
# When set to True, deploys the workflow with the schedule disabled (suspended).
# Useful for deploying dev/test branches without activating production schedules.
SCHEDULE_DISABLED = from_conf("SCHEDULE_DISABLED", False)
###
Comment on lines +396 to 403
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing blank line and misplaced section

The new SCHEDULE_DISABLED config block is placed immediately after the SFN-specific settings with no blank line before the ### Kubernetes configuration ### section. Since this setting applies to both SFN and Argo, it would read more clearly if it were placed in a more neutral location and had the standard blank separator line.

Suggested change
###
# Schedule configuration
###
# When set to True, deploys the workflow with the schedule disabled (suspended).
# Useful for deploying dev/test branches without activating production schedules.
SCHEDULE_DISABLED = from_conf("SCHEDULE_DISABLED", False)
###
SCHEDULE_DISABLED = from_conf("SCHEDULE_DISABLED", False)
###
# Kubernetes configuration
###

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

# Kubernetes configuration
###
Expand Down
8 changes: 4 additions & 4 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,12 @@ def _get_schedule(self):
return " ".join(schedule.schedule.split()[:5]), schedule.timezone
return None, None

def schedule(self):
def schedule(self, schedule_disabled=False):
try:
schedule = None if schedule_disabled else self._schedule
timezone = None if schedule_disabled else self._timezone
argo_client = ArgoClient(namespace=KUBERNETES_NAMESPACE)
argo_client.schedule_workflow_template(
self.name, self._schedule, self._timezone
)
argo_client.schedule_workflow_template(self.name, schedule, timezone)
# Register sensor.
# Metaflow will overwrite any existing sensor.
sensor_name = ArgoWorkflows._sensor_name(self.name)
Expand Down
32 changes: 31 additions & 1 deletion metaflow/plugins/argo/argo_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ARGO_WORKFLOWS_UI_URL,
FEAT_ALWAYS_UPLOAD_CODE_PACKAGE,
KUBERNETES_NAMESPACE,
SCHEDULE_DISABLED,
SERVICE_VERSION_CHECK,
UI_URL,
)
Expand Down Expand Up @@ -68,6 +69,17 @@ class UnsupportedPythonVersion(MetaflowException):
headline = "Unsupported version of Python"


def _is_schedule_disabled(enable_schedule):
"""Resolve whether the schedule should be disabled.

CLI flag (--enable-schedule / --no-enable-schedule) takes precedence.
If not specified, falls back to the METAFLOW_SCHEDULE_DISABLED env var / config.
"""
if enable_schedule is not None:
return not enable_schedule
Comment on lines 69 to +79
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Duplicated _is_schedule_disabled helper

_is_schedule_disabled is defined identically in argo_workflows_cli.py and step_functions_cli.py. Consider extracting it to a shared location (e.g. a small utility in metaflow/metaflow_config.py or a common CLI helpers module) and importing it from both CLI files to keep the logic in one place.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

return SCHEDULE_DISABLED


@click.group()
def cli():
pass
Expand Down Expand Up @@ -214,6 +226,14 @@ def argo_workflows(obj, name=None):
show_default=True,
help="Use a daemon container to broadcast heartbeats.",
)
@click.option(
"--enable-schedule/--no-enable-schedule",
default=None,
show_default=False,
help="Deploy the workflow with the schedule enabled or disabled (suspended). "
"Useful for deploying dev/test branches without activating production schedules. "
"Defaults to enabled unless METAFLOW_SCHEDULE_DISABLED is set.",
)
@click.option(
"--deployer-attribute-file",
default=None,
Expand Down Expand Up @@ -261,6 +281,7 @@ def create(
incident_io_alert_source_config_id=None,
incident_io_metadata=None,
enable_heartbeat_daemon=True,
enable_schedule=None,
workflow_title=None,
workflow_description=None,
deployer_attribute_file=None,
Expand Down Expand Up @@ -425,8 +446,17 @@ def create(
"%s/%s\n\n" % (argo_workflowtemplate_link, obj.workflow_name),
indent=True,
)
flow.schedule()
# Resolve schedule_disabled: CLI flag takes precedence, then env var
schedule_disabled = _is_schedule_disabled(enable_schedule)

flow.schedule(schedule_disabled=schedule_disabled)
obj.echo("What will trigger execution of the workflow:", bold=True)
if schedule_disabled:
obj.echo(
"The schedule for this workflow has been *disabled* "
"(deployed in suspended state).",
indent=True,
)
obj.echo(flow.trigger_explanation(), indent=True)

Comment on lines 453 to 461
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Contradictory output when schedule is disabled

When schedule_disabled=True and the flow has a @schedule decorator, both messages are printed:

  1. "The schedule for this workflow has been *disabled* (deployed in suspended state)."
  2. trigger_explanation()"This workflow triggers automatically via the CronWorkflow *<name>*."

These directly contradict each other. The same issue exists in the SFN CLI (step_functions_cli.py lines 265-269). The call to trigger_explanation() should be guarded so it is not printed when the schedule is disabled.

Suggested change
obj.echo("What will trigger execution of the workflow:", bold=True)
if schedule_disabled:
obj.echo(
"The schedule for this workflow has been *disabled* "
"(deployed in suspended state).",
indent=True,
)
obj.echo(flow.trigger_explanation(), indent=True)
flow.schedule(schedule_disabled=schedule_disabled)
obj.echo("What will trigger execution of the workflow:", bold=True)
if schedule_disabled:
obj.echo(
"The schedule for this workflow has been *disabled* "
"(deployed in suspended state).",
indent=True,
)
else:
obj.echo(flow.trigger_explanation(), indent=True)

# TODO: Print events emitted by execution of this flow
Expand Down
4 changes: 4 additions & 0 deletions metaflow/plugins/argo/argo_workflows_deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ def create(
Write the workflow name to the specified file. Used internally for Metaflow's Deployer API.
enable_error_msg_capture : bool, optional, default True
Capture stack trace of first failed task in exit hook.
enable_schedule : bool, optional, default None
Deploy the workflow with the schedule enabled or disabled (suspended).
Useful for deploying dev/test branches without activating production schedules.
Defaults to enabled unless METAFLOW_SCHEDULE_DISABLED is set.

Returns
-------
Expand Down
6 changes: 5 additions & 1 deletion metaflow/plugins/aws/step_functions/step_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,12 @@ def deploy(self, log_execution_history):
except Exception as e:
raise StepFunctionsException(repr(e))

def schedule(self):
def schedule(self, schedule_disabled=False):
# Scheduling is currently enabled via AWS Event Bridge.
# If no cron schedule is defined, or schedule is explicitly disabled,
# nothing to do.
if not self._cron or schedule_disabled:
return
Comment on lines +143 to +148
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Existing EventBridge rule is not disabled on re-deployment

When schedule_disabled=True, the method returns immediately without touching EventBridge. If the flow was previously deployed with an active EventBridge rule, that rule is left intact and the cron continues to fire. Argo's schedule() sidesteps this by always calling schedule_workflow_template (passing None for schedule/timezone), which updates the CronWorkflow to suspend: true. SFN has no equivalent cleanup, so re-deploying with --no-enable-schedule gives users a false sense of security — the schedule is not actually deactivated.

Consider calling EventBridgeClient(self.name).delete() (or an equivalent disable operation) when schedule_disabled=True and an existing rule is present, to achieve parity with Argo's behaviour.

if EVENTS_SFN_ACCESS_IAM_ROLE is None:
raise StepFunctionsSchedulingException(
"No IAM role found for AWS "
Expand Down
32 changes: 31 additions & 1 deletion metaflow/plugins/aws/step_functions/step_functions_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from metaflow.exception import MetaflowException, MetaflowInternalError
from metaflow.metaflow_config import (
FEAT_ALWAYS_UPLOAD_CODE_PACKAGE,
SCHEDULE_DISABLED,
SERVICE_VERSION_CHECK,
SFN_STATE_MACHINE_PREFIX,
SFN_COMPRESS_STATE_MACHINE,
Expand Down Expand Up @@ -42,6 +43,17 @@ class StepFunctionsStateMachineNameTooLong(MetaflowException):
headline = "AWS Step Functions state machine name too long"


def _is_schedule_disabled(enable_schedule):
"""Resolve whether the schedule should be disabled.

CLI flag (--enable-schedule / --no-enable-schedule) takes precedence.
If not specified, falls back to the METAFLOW_SCHEDULE_DISABLED env var / config.
"""
if enable_schedule is not None:
return not enable_schedule
return SCHEDULE_DISABLED


@click.group()
def cli():
pass
Expand Down Expand Up @@ -147,6 +159,14 @@ def step_functions(obj, name=None):
default=SFN_COMPRESS_STATE_MACHINE,
help="Compress AWS Step Functions state machine to fit within the 8K limit.",
)
@click.option(
"--enable-schedule/--no-enable-schedule",
default=None,
show_default=False,
help="Deploy the workflow with the schedule enabled or disabled. "
"Useful for deploying dev/test branches without activating production schedules. "
"Defaults to enabled unless METAFLOW_SCHEDULE_DISABLED is set.",
)
@click.option(
"--deployer-attribute-file",
default=None,
Expand All @@ -170,6 +190,7 @@ def create(
log_execution_history=False,
use_distributed_map=False,
compress_state_machine=False,
enable_schedule=None,
deployer_attribute_file=None,
):
for node in obj.graph:
Expand Down Expand Up @@ -241,8 +262,17 @@ def create(
"due to a length limit on AWS Step Functions. The "
"original long name is stored in task metadata.\n"
)
flow.schedule()
# Resolve schedule_disabled: CLI flag takes precedence, then env var
schedule_disabled = _is_schedule_disabled(enable_schedule)

flow.schedule(schedule_disabled=schedule_disabled)
obj.echo("What will trigger execution of the workflow:", bold=True)
Comment on lines +265 to 269
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 AttributeError crash when --no-enable-schedule is used with a scheduled flow

When schedule_disabled=True and the flow has a @schedule decorator, flow.schedule() returns early (line 147 of step_functions.py) without assigning self.event_bridge_rule. The very next call to flow.trigger_explanation() at line 269 checks if self._cron: (which is True) and then unconditionally accesses self.event_bridge_rule — raising AttributeError: 'StepFunctions' object has no attribute 'event_bridge_rule'. The primary use-case of this feature crashes at every invocation for SFN flows with a schedule.

The minimal fix is to initialise self.event_bridge_rule = None in StepFunctions.__init__ and guard trigger_explanation() against None, or suppress the schedule-based explanation when schedule_disabled is True.

Suggested change
# Resolve schedule_disabled: CLI flag takes precedence, then env var
schedule_disabled = _is_schedule_disabled(enable_schedule)
flow.schedule(schedule_disabled=schedule_disabled)
obj.echo("What will trigger execution of the workflow:", bold=True)
flow.schedule(schedule_disabled=schedule_disabled)
obj.echo("What will trigger execution of the workflow:", bold=True)
if schedule_disabled:
obj.echo(
"The schedule for this workflow has been *disabled* "
"(EventBridge rule will not be created).",
indent=True,
)
else:
obj.echo(flow.trigger_explanation(), indent=True)

if schedule_disabled:
obj.echo(
"The schedule for this workflow has been *disabled* "
"(EventBridge rule will not be created).",
indent=True,
)
obj.echo(flow.trigger_explanation(), indent=True)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ def create(
tasks in Amazon State Language.
compress_state_machine : bool, optional, default False
Compress AWS Step Functions state machine to fit within the 8K limit.
enable_schedule : bool, optional, default None
Deploy the workflow with the schedule enabled or disabled.
Useful for deploying dev/test branches without activating production schedules.
Defaults to enabled unless METAFLOW_SCHEDULE_DISABLED is set.

deployer_attribute_file : str, optional, default None
Write the workflow name to the specified file. Used internally for Metaflow's Deployer API.
Expand Down
180 changes: 180 additions & 0 deletions test/unit/test_schedule_toggle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""Tests for --enable-schedule / --no-enable-schedule CLI flag."""

import pytest
from unittest.mock import patch


class TestIsScheduleDisabledArgo:
"""Test _is_schedule_disabled in argo_workflows_cli."""

def _get_fn(self):
from metaflow.plugins.argo.argo_workflows_cli import _is_schedule_disabled

return _is_schedule_disabled

def test_cli_flag_enable_overrides_env(self):
fn = self._get_fn()
with patch("metaflow.plugins.argo.argo_workflows_cli.SCHEDULE_DISABLED", True):
# --enable-schedule should override env var
assert fn(True) is False

def test_cli_flag_disable_overrides_env(self):
fn = self._get_fn()
with patch("metaflow.plugins.argo.argo_workflows_cli.SCHEDULE_DISABLED", False):
# --no-enable-schedule should override env var
assert fn(False) is True

def test_env_var_disabled(self):
fn = self._get_fn()
with patch("metaflow.plugins.argo.argo_workflows_cli.SCHEDULE_DISABLED", True):
# No CLI flag, env var says disabled
assert fn(None) is True

def test_env_var_enabled(self):
fn = self._get_fn()
with patch("metaflow.plugins.argo.argo_workflows_cli.SCHEDULE_DISABLED", False):
# No CLI flag, env var says enabled (default)
assert fn(None) is False


class TestIsScheduleDisabledSFN:
"""Test _is_schedule_disabled in step_functions_cli."""

def _get_fn(self):
from metaflow.plugins.aws.step_functions.step_functions_cli import (
_is_schedule_disabled,
)

return _is_schedule_disabled

def test_cli_flag_enable_overrides_env(self):
fn = self._get_fn()
with patch(
"metaflow.plugins.aws.step_functions.step_functions_cli.SCHEDULE_DISABLED",
True,
):
assert fn(True) is False

def test_cli_flag_disable_overrides_env(self):
fn = self._get_fn()
with patch(
"metaflow.plugins.aws.step_functions.step_functions_cli.SCHEDULE_DISABLED",
False,
):
assert fn(False) is True

def test_env_var_disabled(self):
fn = self._get_fn()
with patch(
"metaflow.plugins.aws.step_functions.step_functions_cli.SCHEDULE_DISABLED",
True,
):
assert fn(None) is True

def test_env_var_enabled(self):
fn = self._get_fn()
with patch(
"metaflow.plugins.aws.step_functions.step_functions_cli.SCHEDULE_DISABLED",
False,
):
assert fn(None) is False


class TestArgoScheduleDisabled:
"""Test that ArgoWorkflows.schedule() respects schedule_disabled."""

def test_schedule_passes_none_when_disabled(self):
"""When schedule_disabled=True, schedule and timezone should be None."""
from unittest.mock import MagicMock

from metaflow.plugins.argo.argo_workflows import ArgoWorkflows

aw = object.__new__(ArgoWorkflows)
aw.name = "test-workflow"
aw._schedule = "0 * * * *"
aw._timezone = "UTC"
aw._sensor = None

mock_client = MagicMock()
with patch(
"metaflow.plugins.argo.argo_workflows.ArgoClient",
return_value=mock_client,
):
aw.schedule(schedule_disabled=True)

mock_client.schedule_workflow_template.assert_called_once_with(
"test-workflow", None, None
)

def test_schedule_passes_values_when_enabled(self):
"""When schedule_disabled=False, schedule and timezone should pass through."""
from unittest.mock import MagicMock

from metaflow.plugins.argo.argo_workflows import ArgoWorkflows

aw = object.__new__(ArgoWorkflows)
aw.name = "test-workflow"
aw._schedule = "0 * * * *"
aw._timezone = "UTC"
aw._sensor = None

mock_client = MagicMock()
with patch(
"metaflow.plugins.argo.argo_workflows.ArgoClient",
return_value=mock_client,
):
aw.schedule(schedule_disabled=False)

mock_client.schedule_workflow_template.assert_called_once_with(
"test-workflow", "0 * * * *", "UTC"
)


class TestSFNScheduleDisabled:
"""Test that StepFunctions.schedule() respects schedule_disabled."""

def test_schedule_skipped_when_disabled(self):
"""When schedule_disabled=True, EventBridge schedule should not be created."""
from unittest.mock import MagicMock

from metaflow.plugins.aws.step_functions.step_functions import StepFunctions

sf = object.__new__(StepFunctions)
sf.name = "test-state-machine"
sf._cron = "0 * * * ? *"

with patch(
"metaflow.plugins.aws.step_functions.step_functions.EventBridgeClient"
) as mock_eb:
sf.schedule(schedule_disabled=True)

# EventBridgeClient should not be called at all
mock_eb.assert_not_called()

def test_schedule_created_when_enabled(self):
"""When schedule_disabled=False, EventBridge schedule should be created normally."""
from unittest.mock import MagicMock

from metaflow.plugins.aws.step_functions.step_functions import StepFunctions

sf = object.__new__(StepFunctions)
sf.name = "test-state-machine"
sf._cron = "0 * * * ? *"
sf._state_machine_arn = "arn:aws:states:us-east-1:123:stateMachine:test"

mock_eb_instance = MagicMock()
mock_eb_instance.cron.return_value = mock_eb_instance
mock_eb_instance.role_arn.return_value = mock_eb_instance
mock_eb_instance.state_machine_arn.return_value = mock_eb_instance
mock_eb_instance.schedule.return_value = "test-state-machine"

with patch(
"metaflow.plugins.aws.step_functions.step_functions.EventBridgeClient",
return_value=mock_eb_instance,
), patch(
"metaflow.plugins.aws.step_functions.step_functions.EVENTS_SFN_ACCESS_IAM_ROLE",
"arn:aws:iam::123:role/test",
):
sf.schedule(schedule_disabled=False)

mock_eb_instance.cron.assert_called_once_with("0 * * * ? *")
Loading