Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,13 @@

ARGO_WORKFLOWS_UI_URL = from_conf("ARGO_WORKFLOWS_UI_URL")

# How long (in seconds) to keep completed Argo Workflows before auto-deletion.
# Default: 7 days (same as Kubernetes jobs). Maps to Argo's ttlStrategy.
# Set to 0 to disable TTL (workflows persist indefinitely).
ARGO_WORKFLOWS_TTL_SECONDS_AFTER_COMPLETION = from_conf(
"ARGO_WORKFLOWS_TTL_SECONDS_AFTER_COMPLETION", 7 * 24 * 60 * 60
)
Comment on lines +482 to +484
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Opt-in default TTL is a behavior change for existing users

The default of 7 days means every existing Metaflow user who upgrades will silently start having their completed Argo Workflow resources auto-deleted after 7 days. Users who rely on workflow history beyond that window (e.g. for audit, debugging, or custom tooling that queries old workflow objects) will lose it without any explicit action. A safer default might be None or 0 (disable TTL) to preserve the existing behavior, with opt-in for cleanup.


##
# Airflow Configuration
##
Expand Down
11 changes: 11 additions & 0 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT,
ARGO_WORKFLOWS_ENV_VARS_TO_SKIP,
ARGO_WORKFLOWS_KUBERNETES_SECRETS,
ARGO_WORKFLOWS_TTL_SECONDS_AFTER_COMPLETION,
ARGO_WORKFLOWS_UI_URL,
AWS_SECRETS_MANAGER_DEFAULT_REGION,
AZURE_KEY_VAULT_PREFIX,
Expand Down Expand Up @@ -897,6 +898,8 @@ def _compile_workflow_template(self):
WorkflowSpec()
# Set overall workflow timeout.
.active_deadline_seconds(self.workflow_timeout)
# Set TTL for completed workflows (default: 7 days).
.ttl_strategy(ARGO_WORKFLOWS_TTL_SECONDS_AFTER_COMPLETION)
# TODO: Allow Argo to optionally archive all workflow execution logs
# It's disabled for now since it requires all Argo installations
# to enable an artifactory repository. If log archival is
Expand Down Expand Up @@ -4295,6 +4298,14 @@ def hooks(self, hooks):
self.payload["hooks"].update({k: v.to_json()})
return self

def ttl_strategy(self, seconds_after_completion):
# https://argoproj.github.io/argo-workflows/fields/#ttlstrategy
if seconds_after_completion is not None and int(seconds_after_completion) > 0:
self.payload["ttlStrategy"] = {
"secondsAfterCompletion": int(seconds_after_completion)
}
return self
Comment on lines +4301 to +4307
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 int() called twice and raises unhandled ValueError on bad env-var input

int(seconds_after_completion) is evaluated twice (once in the condition, once in the payload). More importantly, if a user sets METAFLOW_ARGO_WORKFLOWS_TTL_SECONDS_AFTER_COMPLETION=invalid in their environment, int("invalid") will raise an unhandled ValueError at deploy time with no helpful message. Converting and validating once at the top is cleaner:

Suggested change
def ttl_strategy(self, seconds_after_completion):
# https://argoproj.github.io/argo-workflows/fields/#ttlstrategy
if seconds_after_completion is not None and int(seconds_after_completion) > 0:
self.payload["ttlStrategy"] = {
"secondsAfterCompletion": int(seconds_after_completion)
}
return self
def ttl_strategy(self, seconds_after_completion):
# https://argoproj.github.io/argo-workflows/fields/#ttlstrategy
if seconds_after_completion is not None:
try:
seconds = int(seconds_after_completion)
except (TypeError, ValueError):
seconds = 0
if seconds > 0:
self.payload["ttlStrategy"] = {
"secondsAfterCompletion": seconds
}
return self


def to_json(self):
return self.payload

Expand Down
40 changes: 40 additions & 0 deletions test/unit/test_argo_ttl_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Tests for WorkflowSpec.ttl_strategy()."""

from metaflow.plugins.argo.argo_workflows import WorkflowSpec


def test_positive_value_sets_ttl():
ws = WorkflowSpec()
ws.ttl_strategy(604800)
assert ws.payload["ttlStrategy"] == {"secondsAfterCompletion": 604800}


def test_zero_disables_ttl():
ws = WorkflowSpec()
ws.ttl_strategy(0)
assert "ttlStrategy" not in ws.payload


def test_none_disables_ttl():
ws = WorkflowSpec()
ws.ttl_strategy(None)
assert "ttlStrategy" not in ws.payload


def test_negative_disables_ttl():
ws = WorkflowSpec()
ws.ttl_strategy(-1)
assert "ttlStrategy" not in ws.payload


def test_string_value_converted_to_int():
ws = WorkflowSpec()
ws.ttl_strategy("86400")
assert ws.payload["ttlStrategy"] == {"secondsAfterCompletion": 86400}


def test_chaining():
"""ttl_strategy returns self for method chaining."""
ws = WorkflowSpec()
result = ws.ttl_strategy(3600)
assert result is ws
Loading