Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions src/prefect/server/events/models/composite_trigger_child_firing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,46 @@

from prefect.server.database import PrefectDBInterface, db_injector
from prefect.server.events.schemas.automations import CompositeTrigger, Firing
from prefect.server.utilities.database import get_dialect
from prefect.types._datetime import DateTime, now

if TYPE_CHECKING:
from prefect.server.database.orm_models import ORMCompositeTriggerChildFiring


async def acquire_composite_trigger_lock(
session: AsyncSession,
trigger: CompositeTrigger,
) -> None:
"""
Acquire a transaction-scoped advisory lock for the given composite trigger.

This serializes concurrent child trigger evaluations for the same compound
trigger, preventing a race condition where multiple transactions each see
only their own child firing and neither fires the parent.

The lock is automatically released when the transaction commits or rolls back.
"""
bind = session.get_bind()
if bind is None:
return

# Get the engine from either an Engine or Connection
engine: sa.Engine = bind if isinstance(bind, sa.Engine) else bind.engine # type: ignore[union-attr]
dialect = get_dialect(engine)
Comment on lines +30 to +36
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remediation Recommended

3. Lock dialect detection fragile 🐞 Bug

acquire_composite_trigger_lock derives an engine via session.get_bind() and then assumes a
  synchronous Engine/.engine shape; Prefect constructs AsyncSession from an AsyncEngine.
• This is more complex than needed and may be brittle across SQLAlchemy async bind configurations;
  using the existing session.sync_session + get_dialect(...) pattern would be more robust and
  consistent.
Agent Prompt
### Issue description
`acquire_composite_trigger_lock` currently inspects `session.get_bind()` and attempts to normalize it to a synchronous `sa.Engine`. Prefect creates `AsyncSession` from an `AsyncEngine`, and the codebase already uses `session.sync_session` for dialect detection.

### Issue Context
This is a robustness/maintainability improvement that avoids brittle assumptions about async bind object shapes.

### Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[30-37]
- src/prefect/server/events/models/automations.py[125-128]

### Expected change
Replace the bind/engine extraction with something like:
- `dialect = get_dialect(session.sync_session)`
Then keep the existing `if dialect.name == "postgresql": ...` logic.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


if dialect.name == "postgresql":
# Use the trigger's UUID as the lock key
# pg_advisory_xact_lock takes a bigint, so we use the UUID's int representation
# truncated to fit (collision is extremely unlikely and benign)
lock_key = int(trigger.id) % (2**31)
await session.execute(
sa.text("SELECT pg_advisory_xact_lock(:key)"), {"key": lock_key}
)
Comment on lines +39 to +45
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Advisory Comments

4. Small advisory lock key 🐞 Bug

• The lock key uses int(uuid) % 2**31 even though Postgres advisory locks accept a 64-bit key.
• This increases collision probability, which can cause unnecessary contention between unrelated
  composite triggers (performance/head-of-line blocking).
Agent Prompt
### Issue description
The advisory lock key is truncated to 31 bits, increasing collision probability and potential contention.

### Issue Context
This is a performance improvement; collisions are typically benign but avoidable.

### Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[38-45]

### Expected change
Use a 64-bit keyspace, e.g.:
- `lock_key = int(trigger.id) % (2**63)`
(or derive a signed 64-bit integer from 8 bytes of the UUID).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

# SQLite doesn't support advisory locks, but SQLite also serializes writes
# at the database level, so the race condition is less likely to occur


@db_injector
async def upsert_child_firing(
db: PrefectDBInterface,
Expand Down Expand Up @@ -102,11 +136,22 @@ async def clear_child_firings(
session: AsyncSession,
trigger: CompositeTrigger,
firing_ids: Sequence[UUID],
) -> None:
await session.execute(
sa.delete(db.CompositeTriggerChildFiring).filter(
) -> set[UUID]:
"""
Delete the specified child firings and return the IDs that were actually deleted.

Returns the set of child_firing_ids that were successfully deleted. Callers can
compare this to the expected firing_ids to detect races and avoid double-firing
composite triggers.
"""
result = await session.execute(
sa.delete(db.CompositeTriggerChildFiring)
.filter(
db.CompositeTriggerChildFiring.automation_id == trigger.automation.id,
db.CompositeTriggerChildFiring.parent_trigger_id == trigger.id,
db.CompositeTriggerChildFiring.child_firing_id.in_(firing_ids),
)
.returning(db.CompositeTriggerChildFiring.child_firing_id)
)
Comment on lines +147 to 155
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action Required

2. Delete returning on sqlite 🐞 Bug

clear_child_firings now uses DELETE ... RETURNING unconditionally; Prefect supports SQLite and
  the codebase already has dialect-specific handling to avoid RETURNING in SQLite code paths.
• This can break composite triggers on SQLite deployments/environments where RETURNING is
  unsupported, causing evaluation errors and preventing parent triggers from firing.
Agent Prompt
### Issue description
`clear_child_firings` now uses `DELETE ... RETURNING` unconditionally. Prefect supports SQLite and already has dialect-specific logic elsewhere to avoid RETURNING in SQLite paths. This can break composite trigger evaluation on SQLite.

### Issue Context
`evaluate_composite_trigger` only needs to know whether we deleted *all* expected rows to “win” the race. Returning the actual IDs is primarily for logging/debugging.

### Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[133-157]
- src/prefect/server/events/triggers.py[391-411]

### Suggested approach
1. Determine dialect using the established pattern (e.g., `get_dialect(session.sync_session).name`).
2. If Postgres: keep `DELETE ... RETURNING` and return the deleted IDs.
3. If SQLite: execute the DELETE without RETURNING and use `result.rowcount` to decide:
   - if `rowcount == len(firing_ids)`: return `set(firing_ids)`
   - else: return `set()` (or best-effort IDs if you implement a safe alternative)

This preserves the anti-double-fire logic without relying on RETURNING support.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


return set(result.scalars().all())
29 changes: 27 additions & 2 deletions src/prefect/server/events/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
read_automation,
)
from prefect.server.events.models.composite_trigger_child_firing import (
acquire_composite_trigger_lock,
clear_child_firings,
clear_old_child_firings,
get_child_firings,
Expand Down Expand Up @@ -346,6 +347,11 @@ async def evaluate_composite_trigger(session: AsyncSession, firing: Firing) -> N
)
return

# Acquire an advisory lock to serialize concurrent evaluations for this
# compound trigger. This prevents a race condition where multiple child
# triggers fire concurrently and neither transaction sees both firings.
await acquire_composite_trigger_lock(session, trigger.automation)

Comment on lines +350 to +354
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action Required

1. Wrong lock key scope 🐞 Bug

• evaluate_composite_trigger passes trigger.automation into acquire_composite_trigger_lock, even
  though the helper is documented/typed to lock per composite trigger.
• This changes the lock key from the composite trigger UUID to the automation UUID, potentially
  over-serializing unrelated work and making the helper’s intent/signature misleading.
Agent Prompt
### Issue description
`evaluate_composite_trigger` calls `acquire_composite_trigger_lock(session, trigger.automation)` even though the lock helper is designed to lock per composite trigger. This changes lock scoping/keying and can over-serialize unrelated work.

### Issue Context
The helper is documented and typed to accept a `CompositeTrigger` and conceptually should lock by the composite trigger’s UUID.

### Fix Focus Areas
- src/prefect/server/events/triggers.py[350-354]
- src/prefect/server/events/models/composite_trigger_child_firing.py[17-45]

### Expected change
Update the call site to:
- `await acquire_composite_trigger_lock(session, trigger)`

Optionally, add a type check/assert in the helper if you want to prevent incorrect future calls.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

# If we're only looking within a certain time horizon, remove any older firings that
# should no longer be considered as satisfying this trigger
if trigger.within is not None:
Expand Down Expand Up @@ -382,8 +388,27 @@ async def evaluate_composite_trigger(session: AsyncSession, firing: Firing) -> N
},
)

# clear by firing id
await clear_child_firings(session, trigger, firing_ids=list(firing_ids))
# Clear by firing id, and only proceed if we won the race to claim them.
# This prevents double-firing when multiple workers evaluate concurrently.
deleted_ids = await clear_child_firings(
session, trigger, firing_ids=list(firing_ids)
)

if len(deleted_ids) != len(firing_ids):
logger.debug(
"Composite trigger %s skipped fire; expected to delete %s firings, "
"actually deleted %s (another worker likely claimed them)",
trigger.id,
len(firing_ids),
len(deleted_ids),
extra={
"automation": automation.id,
"trigger": trigger.id,
"expected_firing_ids": sorted(str(f) for f in firing_ids),
"deleted_firing_ids": sorted(str(f) for f in deleted_ids),
},
)
return

await fire(
session,
Expand Down
121 changes: 121 additions & 0 deletions tests/events/server/triggers/test_composite_triggers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import datetime
from datetime import timedelta
from typing import List
Expand Down Expand Up @@ -1624,3 +1625,123 @@ async def test_sequence_trigger_identical_event_triggers_only_one_fired_does_not
await triggers.reactive_evaluation(ingredients_buy)

act.assert_not_called()


class TestCompoundTriggerConcurrency:
"""Tests for concurrent child trigger evaluation race condition fix."""

@pytest.fixture
async def compound_automation_concurrent(
self,
automations_session: AsyncSession,
cleared_buckets: None,
cleared_automations: None,
) -> Automation:
"""Compound trigger requiring all child triggers to fire."""
compound_automation = Automation(
name="Compound Automation Concurrency Test",
trigger=CompoundTrigger(
require="all",
within=timedelta(minutes=5),
triggers=[
EventTrigger(
expect={"event.A"},
match={"prefect.resource.id": "*"},
posture=Posture.Reactive,
threshold=1,
),
EventTrigger(
expect={"event.B"},
match={"prefect.resource.id": "*"},
posture=Posture.Reactive,
threshold=1,
),
],
),
actions=[actions.DoNothing()],
)

persisted = await automations.create_automation(
session=automations_session, automation=compound_automation
)
compound_automation.created = persisted.created
compound_automation.updated = persisted.updated
triggers.load_automation(persisted)
await automations_session.commit()

return compound_automation

async def test_compound_trigger_does_not_double_fire_when_children_race(
self,
act: mock.AsyncMock,
compound_automation_concurrent: Automation,
start_of_test: DateTime,
):
"""
Regression test for compound trigger double-firing when child firings race.

Verifies that when two child trigger events are processed concurrently,
the compound trigger fires exactly once. The DELETE ... RETURNING fix
ensures only one worker proceeds to fire the parent trigger.
"""
event_a = ReceivedEvent(
occurred=start_of_test + timedelta(microseconds=1),
event="event.A",
resource={"prefect.resource.id": "test.resource"},
id=uuid4(),
)
event_b = ReceivedEvent(
occurred=start_of_test + timedelta(microseconds=2),
event="event.B",
resource={"prefect.resource.id": "test.resource"},
id=uuid4(),
)

# Process both events concurrently
await asyncio.gather(
triggers.reactive_evaluation(event_a),
triggers.reactive_evaluation(event_b),
)

# The compound trigger should fire exactly once
act.assert_called_once()

firing: Firing = act.call_args.args[0]
assert isinstance(firing.trigger, CompoundTrigger)
assert firing.trigger.id == compound_automation_concurrent.trigger.id

async def test_concurrent_child_firings_still_triggers_parent(
self,
act: mock.AsyncMock,
compound_automation_concurrent: Automation,
start_of_test: DateTime,
):
"""
Verify that when two child trigger events arrive nearly simultaneously,
the compound trigger still fires. This tests that the race condition fix
doesn't prevent legitimate firings.
"""
event_a = ReceivedEvent(
occurred=start_of_test + timedelta(microseconds=1),
event="event.A",
resource={"prefect.resource.id": "test.resource"},
id=uuid4(),
)
event_b = ReceivedEvent(
occurred=start_of_test + timedelta(microseconds=2),
event="event.B",
resource={"prefect.resource.id": "test.resource"},
id=uuid4(),
)

# Process both events concurrently to simulate the race condition
await asyncio.gather(
triggers.reactive_evaluation(event_a),
triggers.reactive_evaluation(event_b),
)

# The compound trigger should fire exactly once
act.assert_called_once()

firing: Firing = act.call_args.args[0]
assert firing.trigger.id == compound_automation_concurrent.trigger.id
Loading