diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index e5e03a401a6..832b14c6865 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -476,6 +476,13 @@ ARGO_WORKFLOWS_UI_URL = from_conf("ARGO_WORKFLOWS_UI_URL") +# How long (in seconds) to keep completed Argo Workflows before auto-deletion. +# Default: 7 days (same as Kubernetes jobs). Maps to Argo's ttlStrategy. +# Set to 0 to disable TTL (workflows persist indefinitely). +ARGO_WORKFLOWS_TTL_SECONDS_AFTER_COMPLETION = from_conf( + "ARGO_WORKFLOWS_TTL_SECONDS_AFTER_COMPLETION", 7 * 24 * 60 * 60 +) + ## # Airflow Configuration ## diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index b7b25c8c69d..6fadb0f31f7 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -25,6 +25,7 @@ ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT, ARGO_WORKFLOWS_ENV_VARS_TO_SKIP, ARGO_WORKFLOWS_KUBERNETES_SECRETS, + ARGO_WORKFLOWS_TTL_SECONDS_AFTER_COMPLETION, ARGO_WORKFLOWS_UI_URL, AWS_SECRETS_MANAGER_DEFAULT_REGION, AZURE_KEY_VAULT_PREFIX, @@ -897,6 +898,8 @@ def _compile_workflow_template(self): WorkflowSpec() # Set overall workflow timeout. .active_deadline_seconds(self.workflow_timeout) + # Set TTL for completed workflows (default: 7 days). + .ttl_strategy(ARGO_WORKFLOWS_TTL_SECONDS_AFTER_COMPLETION) # TODO: Allow Argo to optionally archive all workflow execution logs # It's disabled for now since it requires all Argo installations # to enable an artifactory repository. If log archival is @@ -4295,6 +4298,14 @@ def hooks(self, hooks): self.payload["hooks"].update({k: v.to_json()}) return self + def ttl_strategy(self, seconds_after_completion): + # https://argoproj.github.io/argo-workflows/fields/#ttlstrategy + if seconds_after_completion is not None and int(seconds_after_completion) > 0: + self.payload["ttlStrategy"] = { + "secondsAfterCompletion": int(seconds_after_completion) + } + return self + def to_json(self): return self.payload diff --git a/test/unit/test_argo_ttl_strategy.py b/test/unit/test_argo_ttl_strategy.py new file mode 100644 index 00000000000..c32353cae8b --- /dev/null +++ b/test/unit/test_argo_ttl_strategy.py @@ -0,0 +1,40 @@ +"""Tests for WorkflowSpec.ttl_strategy().""" + +from metaflow.plugins.argo.argo_workflows import WorkflowSpec + + +def test_positive_value_sets_ttl(): + ws = WorkflowSpec() + ws.ttl_strategy(604800) + assert ws.payload["ttlStrategy"] == {"secondsAfterCompletion": 604800} + + +def test_zero_disables_ttl(): + ws = WorkflowSpec() + ws.ttl_strategy(0) + assert "ttlStrategy" not in ws.payload + + +def test_none_disables_ttl(): + ws = WorkflowSpec() + ws.ttl_strategy(None) + assert "ttlStrategy" not in ws.payload + + +def test_negative_disables_ttl(): + ws = WorkflowSpec() + ws.ttl_strategy(-1) + assert "ttlStrategy" not in ws.payload + + +def test_string_value_converted_to_int(): + ws = WorkflowSpec() + ws.ttl_strategy("86400") + assert ws.payload["ttlStrategy"] == {"secondsAfterCompletion": 86400} + + +def test_chaining(): + """ttl_strategy returns self for method chaining.""" + ws = WorkflowSpec() + result = ws.ttl_strategy(3600) + assert result is ws