Skip to content

Commit 1180f0d

Browse files
author
Nissan Pow
committed
Add TTL strategy to Argo WorkflowTemplates and update CronWorkflow to use schedules
Add a configurable ttlStrategy to Argo WorkflowTemplates so completed workflows are automatically cleaned up after 7 days (matching the default Kubernetes job TTL). Configurable via METAFLOW_ARGO_WORKFLOWS_TTL_SECONDS_AFTER_COMPLETION env var; set to 0 to disable. Also update CronWorkflow creation to use the `schedules` array field instead of the deprecated singular `schedule` field, per Argo Workflows deprecation notice. Closes #1231, closes #2351
1 parent 961b844 commit 1180f0d

3 files changed

Lines changed: 19 additions & 1 deletion

File tree

metaflow/metaflow_config.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,13 @@
473473

474474
ARGO_WORKFLOWS_UI_URL = from_conf("ARGO_WORKFLOWS_UI_URL")
475475

476+
# How long (in seconds) to keep completed Argo Workflows before auto-deletion.
477+
# Default: 7 days (same as Kubernetes jobs). Maps to Argo's ttlStrategy.
478+
# Set to 0 to disable TTL (workflows persist indefinitely).
479+
ARGO_WORKFLOWS_TTL_SECONDS_AFTER_COMPLETION = from_conf(
480+
"ARGO_WORKFLOWS_TTL_SECONDS_AFTER_COMPLETION", 7 * 24 * 60 * 60
481+
)
482+
476483
##
477484
# Airflow Configuration
478485
##

metaflow/plugins/argo/argo_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ def schedule_workflow_template(self, name, schedule=None, timezone=None):
325325
"metadata": {"name": name},
326326
"spec": {
327327
"suspend": schedule is None,
328-
"schedule": schedule,
328+
"schedules": [schedule] if schedule is not None else [],
329329
"timezone": timezone,
330330
"failedJobsHistoryLimit": 10000, # default is unfortunately 1
331331
"successfulJobsHistoryLimit": 10000, # default is unfortunately 3

metaflow/plugins/argo/argo_workflows.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT,
2626
ARGO_WORKFLOWS_ENV_VARS_TO_SKIP,
2727
ARGO_WORKFLOWS_KUBERNETES_SECRETS,
28+
ARGO_WORKFLOWS_TTL_SECONDS_AFTER_COMPLETION,
2829
ARGO_WORKFLOWS_UI_URL,
2930
AWS_SECRETS_MANAGER_DEFAULT_REGION,
3031
AZURE_KEY_VAULT_PREFIX,
@@ -897,6 +898,8 @@ def _compile_workflow_template(self):
897898
WorkflowSpec()
898899
# Set overall workflow timeout.
899900
.active_deadline_seconds(self.workflow_timeout)
901+
# Set TTL for completed workflows (default: 7 days).
902+
.ttl_strategy(ARGO_WORKFLOWS_TTL_SECONDS_AFTER_COMPLETION)
900903
# TODO: Allow Argo to optionally archive all workflow execution logs
901904
# It's disabled for now since it requires all Argo installations
902905
# to enable an artifactory repository. If log archival is
@@ -4211,6 +4214,14 @@ def hooks(self, hooks):
42114214
self.payload["hooks"].update({k: v.to_json()})
42124215
return self
42134216

4217+
def ttl_strategy(self, seconds_after_completion):
4218+
# https://argoproj.github.io/argo-workflows/fields/#ttlstrategy
4219+
if seconds_after_completion is not None and int(seconds_after_completion) > 0:
4220+
self.payload["ttlStrategy"] = {
4221+
"secondsAfterCompletion": int(seconds_after_completion)
4222+
}
4223+
return self
4224+
42144225
def to_json(self):
42154226
return self.payload
42164227

0 commit comments

Comments
 (0)