Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/v3/concepts/automations.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ Prefect infers the relevant event whenever possible, but sometimes one does not

Specify a name and, optionally, a description for the automation.

### Tracing automation actions

When an automation fires, it emits events that you can use to trace what happened:

- `prefect.automation.triggered` or `prefect.automation.resolved` - emitted when the trigger condition is met
- `prefect.automation.action.triggered` - emitted when an action starts
- `prefect.automation.action.executed` or `prefect.automation.action.failed` - emitted when an action completes

The action events include related resources that link back to their source events:

| Related resource role | Description |
| --------------------- | ----------- |
| `triggering-event` | The original event that caused the automation to fire |
| `automation-triggered-event` | The `automation.triggered` or `automation.resolved` event that prompted the action |

These links help you trace from an action failure back to the specific trigger and original event that caused it.


## Sending notifications with automations

Expand Down
44 changes: 25 additions & 19 deletions src/prefect/server/events/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,17 @@ async def fail(self, triggered_action: "TriggeredAction", reason: str) -> None:
if abs(time_since_trigger) < TIGHT_TIMING:
follows_id = triggered_action.triggering_event.id

# Build related resources including triggering event reference
# Build related resources including automation.triggered and triggering event
related_resources = list(self._resulting_related_resources)
if triggered_action.automation_triggered_event_id:
related_resources.append(
RelatedResource(
{
"prefect.resource.id": f"prefect.event.{triggered_action.automation_triggered_event_id}",
"prefect.resource.role": "automation-triggered-event",
}
)
)
if triggered_action.triggering_event:
related_resources.append(
RelatedResource(
Expand All @@ -190,7 +199,7 @@ async def fail(self, triggered_action: "TriggeredAction", reason: str) -> None:
occurred=triggered_action.triggered,
event="prefect.automation.action.triggered",
resource=resource,
related=related_resources,
related=self._resulting_related_resources,
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The action.triggered event in the fail method is using self._resulting_related_resources instead of related_resources. This means the action.triggered event will not include the automation-triggered-event and triggering-event related resources that were just constructed.

This is inconsistent with the succeed method where both the action.triggered and action.executed events correctly use related_resources. The action.triggered event should include these links so users can trace from the triggered event back to the automation.triggered event and original triggering event.

Suggested change
related=self._resulting_related_resources,
related=related_resources,

Copilot uses AI. Check for mistakes.
payload=action_details,
id=triggered_event_id,
follows=follows_id,
Expand All @@ -201,7 +210,7 @@ async def fail(self, triggered_action: "TriggeredAction", reason: str) -> None:
occurred=now("UTC"),
event="prefect.automation.action.failed",
resource=resource,
related=self._resulting_related_resources,
related=related_resources,
payload={
**action_details,
"reason": reason,
Expand Down Expand Up @@ -254,8 +263,17 @@ async def succeed(self, triggered_action: "TriggeredAction") -> None:
if abs(time_since_trigger) < TIGHT_TIMING:
follows_id = triggered_action.triggering_event.id

# Build related resources including triggering event reference
# Build related resources including automation.triggered and triggering event
related_resources = list(self._resulting_related_resources)
if triggered_action.automation_triggered_event_id:
related_resources.append(
RelatedResource(
{
"prefect.resource.id": f"prefect.event.{triggered_action.automation_triggered_event_id}",
"prefect.resource.role": "automation-triggered-event",
}
)
)
if triggered_action.triggering_event:
related_resources.append(
RelatedResource(
Expand All @@ -269,13 +287,7 @@ async def succeed(self, triggered_action: "TriggeredAction") -> None:
Event(
occurred=triggered_action.triggered,
event="prefect.automation.action.triggered",
resource=Resource(
{
"prefect.resource.id": automation_resource_id,
"prefect.resource.name": automation.name,
"prefect.trigger-type": automation.trigger.type,
}
),
resource=resource,
related=related_resources,
payload=action_details,
id=triggered_event_id,
Expand All @@ -286,14 +298,8 @@ async def succeed(self, triggered_action: "TriggeredAction") -> None:
Event(
occurred=now("UTC"),
event="prefect.automation.action.executed",
resource=Resource(
{
"prefect.resource.id": automation_resource_id,
"prefect.resource.name": automation.name,
"prefect.trigger-type": automation.trigger.type,
}
),
related=self._resulting_related_resources,
resource=resource,
related=related_resources,
payload={
**action_details,
**self._result_details,
Expand Down
18 changes: 18 additions & 0 deletions src/prefect/server/events/schemas/automations.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,24 @@ class TriggeredAction(PrefectBaseModel):
default=0,
description="The index of the action within the automation",
)
automation_triggered_event_id: UUID | None = Field(
default=None,
description=(
"The ID of the automation.triggered or automation.resolved event that "
"prompted this action, used to link automation.action.* events back to "
"the state change event"
),
)

@field_validator("automation_triggered_event_id")
@classmethod
def validate_automation_triggered_event_id(cls, v, info):
"""Ensure automation_triggered_event_id is only set when triggering_event exists."""
if v is not None and info.data.get("triggering_event") is None:
raise ValueError(
"automation_triggered_event_id can only be set when triggering_event is provided"
)
Comment on lines +775 to +779
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validator logic is incorrect. It prevents automation_triggered_event_id from being set when triggering_event is None. However, proactive triggers (like metric thresholds or time-based triggers) don't have an original triggering event, but they still emit automation.triggered events and should be able to link action events back to those automation state change events.

The automation_triggered_event_id field refers to the automation.triggered or automation.resolved event, not the original triggering event, so it should be allowed to be set independently of whether triggering_event exists.

Suggested change
"""Ensure automation_triggered_event_id is only set when triggering_event exists."""
if v is not None and info.data.get("triggering_event") is None:
raise ValueError(
"automation_triggered_event_id can only be set when triggering_event is provided"
)
"""Allow automation_triggered_event_id to be set independently of triggering_event."""

Copilot uses AI. Check for mistakes.
return v

def idempotency_key(self) -> str:
"""Produce a human-friendly idempotency key for this action"""
Expand Down
33 changes: 27 additions & 6 deletions src/prefect/server/events/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,21 +436,37 @@ async def act(firing: Firing) -> None:
}
await messaging.publish(state_change_events.values())

# By default, all `automation.actions` are fired
source_actions: List[Tuple[Optional[ReceivedEvent], ServerActionTypes]] = [
(firing.triggering_event, action) for action in automation.actions
# Determine the primary state change event ID for linking action events back to
# the automation.triggered or automation.resolved event. Prefer Triggered over
# Resolved when both are present.
primary_state_change_event = state_change_events.get(
TriggerState.Triggered
) or state_change_events.get(TriggerState.Resolved)
primary_state_change_event_id = (
primary_state_change_event.id if primary_state_change_event else None
)

# By default, all `automation.actions` are fired. Each tuple contains:
# (triggering_event, action, automation_triggered_event_id)
source_actions: List[
Tuple[Optional[ReceivedEvent], ServerActionTypes, UUID | None]
] = [
(firing.triggering_event, action, primary_state_change_event_id)
for action in automation.actions
]

# Conditionally add in actions that fire on specific trigger states
if TriggerState.Triggered in firing.trigger_states:
triggered_event = state_change_events[TriggerState.Triggered]
source_actions += [
(state_change_events[TriggerState.Triggered], action)
(triggered_event, action, primary_state_change_event_id)
for action in automation.actions_on_trigger
]

if TriggerState.Resolved in firing.trigger_states:
resolved_event = state_change_events[TriggerState.Resolved]
source_actions += [
(state_change_events[TriggerState.Resolved], action)
(resolved_event, action, primary_state_change_event_id)
for action in automation.actions_on_resolve
]

Expand All @@ -463,8 +479,13 @@ async def act(firing: Firing) -> None:
triggering_event=action_triggering_event,
action=action,
action_index=index,
automation_triggered_event_id=automation_triggered_event_id,
)
for index, (action_triggering_event, action) in enumerate(source_actions)
for index, (
action_triggering_event,
action,
automation_triggered_event_id,
) in enumerate(source_actions)
]

async with messaging.create_actions_publisher() as publisher:
Expand Down
Loading
Loading