Skip to content

feat: propagate tags on trigger and resume#3105

Open
npow wants to merge 5 commits intomasterfrom
npow/tag-improvements-clean
Open

feat: propagate tags on trigger and resume#3105
npow wants to merge 5 commits intomasterfrom
npow/tag-improvements-clean

Conversation

@npow
Copy link
Copy Markdown
Collaborator

@npow npow commented Apr 13, 2026

Summary

  • Propagate trigger-time tags through SFN and Argo workflow executions via metaflow.trigger_tags parameter and METAFLOW_TRIGGER_TAGS env var
  • Tags set at trigger time are now available to all steps in the workflow, not just the start step
  • Resume operations preserve and propagate the original run's tags
  • Fix JsonPath syntax for SFN parameter references (remove extra . before [)
  • Fix join state parent access to use step names instead of array indexing for sfn-local compatibility

Test plan

  • test/unit/test_tag_improvements.py — unit tests for trigger-time tag propagation and resume tag preservation

🤖 Generated with Claude Code

Nissan Pow and others added 5 commits April 13, 2026 15:04
- Propagate trigger-time tags through SFN and Argo workflow executions
  via metaflow.trigger_tags parameter and METAFLOW_TRIGGER_TAGS env var
- Tags set at trigger time are available to all steps in the workflow
- Resume operations preserve and propagate the original run's tags

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Tests cover METAFLOW_TRIGGER_TAGS env var parsing, Argo/SFN trigger CLI
--tag option, SFN execution input format, and resume tag merge logic.
@npow npow force-pushed the npow/tag-improvements-clean branch from ef75451 to 04dcd2b Compare April 13, 2026 22:05
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 13, 2026

Greptile Summary

This PR propagates trigger-time tags through SFN and Argo workflow executions via METAFLOW_TRIGGER_TAGS, and carries origin-run tags forward on resume. The Argo implementation is solid — the workflow template parameter defaults to "[]", so non-tagged executions are safe. The SFN implementation has one gap: event_bridge_client.py was not updated to include TriggerTags in the scheduled-execution input, so re-deploying any flow with @schedule will cause all EventBridge-triggered executions to fail with a States.Runtime error immediately at the start step.

Confidence Score: 3/5

Not safe to merge for flows using @schedule with SFN — EventBridge-triggered executions will fail at the start step after redeployment.

The P1 finding (missing TriggerTags in EventBridge input) is a definite current breakage on the changed path for any SFN flow with @schedule. All other findings are P2. The Argo path is safe.

metaflow/plugins/aws/step_functions/event_bridge_client.py — must be updated to include TriggerTags in the EventBridge execution input.

Important Files Changed

Filename Overview
metaflow/plugins/aws/step_functions/step_functions.py Adds TriggerTags propagation through all SFN step types, but EventBridge-scheduled executions will break because the input format is not updated to include TriggerTags.
metaflow/plugins/aws/step_functions/step_functions_cli.py Adds --tag option to trigger command; removes duplicate validate_tags import (still available via line 18 import); calls validate_tags on provided tags correctly.
metaflow/plugins/argo/argo_workflows.py Adds metaflow-trigger-tags parameter with default [] to all Argo WorkflowTemplates and sets METAFLOW_TRIGGER_TAGS env var referencing the workflow parameter.
metaflow/plugins/argo/argo_client.py Adds tags support to trigger_workflow_template: passes tags as workflow parameter and annotation; safely omits parameter when tags=None.
metaflow/plugins/argo/argo_workflows_cli.py Adds --tag multiple option to Argo trigger CLI command with validate_tags call and passes tags to ArgoWorkflows.trigger().
metaflow/cli_components/run_cmds.py Moves before_run() call after origin_run_id resolution so origin run tags can be merged into CLI tags before sticky tags are registered.
metaflow/cli_components/step_cmd.py Reads METAFLOW_TRIGGER_TAGS env var and calls add_sticky_tags with parsed JSON list of tags; gracefully handles invalid JSON and non-list values.
test/unit/test_tag_improvements.py New unit test file covering trigger-time tags, resume tag propagation, and CLI option presence; some tests verify trivial properties rather than real integration.
metaflow/plugins/aws/step_functions/step_functions_deployer_objects.py Documentation-only change: updates docstring to mention the tag usage pattern.
metaflow/plugins/argo/argo_workflows_deployer_objects.py Documentation-only change: updates docstring to mention the tag usage pattern.

Reviews (1): Last reviewed commit: "cleanup: remove duplicate import, unused..." | Re-trigger Greptile

Comment on lines +628 to +631
# Propagate trigger-time tags from execution input to all steps.
# The trigger command always includes TriggerTags in the input.
attrs["metaflow.trigger_tags.$"] = "$.TriggerTags"
env["METAFLOW_TRIGGER_TAGS"] = "$.TriggerTags"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 EventBridge-scheduled executions will fail after redeployment

After redeploying a state machine with these changes, any execution that doesn't include TriggerTags in the input will throw a States.Runtime error when Step Functions tries to resolve $.TriggerTags. This includes all EventBridge-scheduled runs: event_bridge_client.py (line 57) passes {"Parameters": json.dumps({})} as the execution input — no TriggerTags key — so the JsonPath lookup fails immediately.

The Argo path avoids this by setting .value("[]") as a default on the workflow template parameter. SFN needs the same defensive default, which means updating EventBridgeClient._set() to include "TriggerTags": json.dumps([]) in its Input:

"Input": json.dumps({"Parameters": json.dumps({}), "TriggerTags": json.dumps([])}),

Comment on lines +192 to +198
try:
from ..client.core import Run

origin_run = Run("%s/%s" % (flow_name, origin_run_id), _namespace_check=False)
return list(origin_run.user_tags)
except Exception:
# If we can't read the origin run's tags (e.g. metadata service
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Overly broad exception suppression

The bare except Exception swallows everything — including AttributeError, ImportError, or other programming errors that signal a real bug. Consider catching only the expected failure modes (e.g. MetaflowNotFound, network errors) and letting unexpected exceptions surface, or at minimum logging a warning so callers know tags were silently dropped.

Suggested change
try:
from ..client.core import Run
origin_run = Run("%s/%s" % (flow_name, origin_run_id), _namespace_check=False)
return list(origin_run.user_tags)
except Exception:
# If we can't read the origin run's tags (e.g. metadata service
except Exception as e:
# If we can't read the origin run's tags (e.g. metadata service
# unavailable), we proceed without propagating tags.
import logging
logging.getLogger(__name__).debug(
"Could not retrieve tags for origin run %s/%s: %s",
flow_name,
origin_run_id,
e,
)
return []

Comment on lines 204 to +211
# Dump parameters into `Parameters` input field.
input = json.dumps({"Parameters": json.dumps(parameters)})
# Always include TriggerTags (defaulting to empty list) in the
# execution input. The state machine propagates this field through
# every step so that trigger-time tags are applied to all tasks.
input = json.dumps(
{
"Parameters": json.dumps(parameters),
"TriggerTags": json.dumps(tags if tags else []),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 TriggerTags double-JSON-encodes the list

tags if tags else [] is a list, and json.dumps(...) of that list gives a string like '["t1","t2"]'. That string is then embedded in the outer json.dumps(...), so what Step Functions receives for $.TriggerTags is a JSON-encoded string, not a list. When a step reads METAFLOW_TRIGGER_TAGS it gets '["t1","t2"]' and must call json.loads to recover the list — which the step-cmd code does, so it works. However this is an unusual convention worth a comment, and it must stay consistent with the Argo side where the parameter value is also json.dumps(tags). A brief comment here would clarify the intentional double-encoding.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant