fix: resolve race condition in compound trigger evaluation#102
Conversation
Fixes two race conditions in compound trigger evaluation: 1. **Never-firing race** (transactional): When two child triggers fire concurrently in separate transactions, each only sees its own uncommitted insert due to READ COMMITTED isolation. Neither sees enough firings to trigger the parent. Fix: Use PostgreSQL advisory locks to serialize concurrent evaluations for the same compound trigger. 2. **Double-firing race** (autocommit): When both transactions see all firings, both delete and both fire the parent. Fix: Use DELETE ... RETURNING to make clearing a claim operation. Only the worker that successfully deletes the expected firings proceeds; others bail out. Based on the fix in PrefectHQ/nebula#10716. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Code Review by Qodo (Alpha)
1. Wrong lock key scope
|
| # Acquire an advisory lock to serialize concurrent evaluations for this | ||
| # compound trigger. This prevents a race condition where multiple child | ||
| # triggers fire concurrently and neither transaction sees both firings. | ||
| await acquire_composite_trigger_lock(session, trigger.automation) | ||
|
|
There was a problem hiding this comment.
1. Wrong lock key scope 🐞 Bug
• evaluate_composite_trigger passes trigger.automation into acquire_composite_trigger_lock, even though the helper is documented/typed to lock per composite trigger. • This changes the lock key from the composite trigger UUID to the automation UUID, potentially over-serializing unrelated work and making the helper’s intent/signature misleading.
Agent Prompt
### Issue description
`evaluate_composite_trigger` calls `acquire_composite_trigger_lock(session, trigger.automation)` even though the lock helper is designed to lock per composite trigger. This changes lock scoping/keying and can over-serialize unrelated work.
### Issue Context
The helper is documented and typed to accept a `CompositeTrigger` and conceptually should lock by the composite trigger’s UUID.
### Fix Focus Areas
- src/prefect/server/events/triggers.py[350-354]
- src/prefect/server/events/models/composite_trigger_child_firing.py[17-45]
### Expected change
Update the call site to:
- `await acquire_composite_trigger_lock(session, trigger)`
Optionally, add a type check/assert in the helper if you want to prevent incorrect future calls.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| result = await session.execute( | ||
| sa.delete(db.CompositeTriggerChildFiring) | ||
| .filter( | ||
| db.CompositeTriggerChildFiring.automation_id == trigger.automation.id, | ||
| db.CompositeTriggerChildFiring.parent_trigger_id == trigger.id, | ||
| db.CompositeTriggerChildFiring.child_firing_id.in_(firing_ids), | ||
| ) | ||
| .returning(db.CompositeTriggerChildFiring.child_firing_id) | ||
| ) |
There was a problem hiding this comment.
2. Delete returning on sqlite 🐞 Bug
• clear_child_firings now uses DELETE ... RETURNING unconditionally; Prefect supports SQLite and the codebase already has dialect-specific handling to avoid RETURNING in SQLite code paths. • This can break composite triggers on SQLite deployments/environments where RETURNING is unsupported, causing evaluation errors and preventing parent triggers from firing.
Agent Prompt
### Issue description
`clear_child_firings` now uses `DELETE ... RETURNING` unconditionally. Prefect supports SQLite and already has dialect-specific logic elsewhere to avoid RETURNING in SQLite paths. This can break composite trigger evaluation on SQLite.
### Issue Context
`evaluate_composite_trigger` only needs to know whether we deleted *all* expected rows to “win” the race. Returning the actual IDs is primarily for logging/debugging.
### Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[133-157]
- src/prefect/server/events/triggers.py[391-411]
### Suggested approach
1. Determine dialect using the established pattern (e.g., `get_dialect(session.sync_session).name`).
2. If Postgres: keep `DELETE ... RETURNING` and return the deleted IDs.
3. If SQLite: execute the DELETE without RETURNING and use `result.rowcount` to decide:
- if `rowcount == len(firing_ids)`: return `set(firing_ids)`
- else: return `set()` (or best-effort IDs if you implement a safe alternative)
This preserves the anti-double-fire logic without relying on RETURNING support.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| bind = session.get_bind() | ||
| if bind is None: | ||
| return | ||
|
|
||
| # Get the engine from either an Engine or Connection | ||
| engine: sa.Engine = bind if isinstance(bind, sa.Engine) else bind.engine # type: ignore[union-attr] | ||
| dialect = get_dialect(engine) |
There was a problem hiding this comment.
3. Lock dialect detection fragile 🐞 Bug
• acquire_composite_trigger_lock derives an engine via session.get_bind() and then assumes a synchronous Engine/.engine shape; Prefect constructs AsyncSession from an AsyncEngine. • This is more complex than needed and may be brittle across SQLAlchemy async bind configurations; using the existing session.sync_session + get_dialect(...) pattern would be more robust and consistent.
Agent Prompt
### Issue description
`acquire_composite_trigger_lock` currently inspects `session.get_bind()` and attempts to normalize it to a synchronous `sa.Engine`. Prefect creates `AsyncSession` from an `AsyncEngine`, and the codebase already uses `session.sync_session` for dialect detection.
### Issue Context
This is a robustness/maintainability improvement that avoids brittle assumptions about async bind object shapes.
### Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[30-37]
- src/prefect/server/events/models/automations.py[125-128]
### Expected change
Replace the bind/engine extraction with something like:
- `dialect = get_dialect(session.sync_session)`
Then keep the existing `if dialect.name == "postgresql": ...` logic.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| # Use the trigger's UUID as the lock key | ||
| # pg_advisory_xact_lock takes a bigint, so we use the UUID's int representation | ||
| # truncated to fit (collision is extremely unlikely and benign) | ||
| lock_key = int(trigger.id) % (2**31) | ||
| await session.execute( | ||
| sa.text("SELECT pg_advisory_xact_lock(:key)"), {"key": lock_key} | ||
| ) |
There was a problem hiding this comment.
4. Small advisory lock key 🐞 Bug
• The lock key uses int(uuid) % 2**31 even though Postgres advisory locks accept a 64-bit key. • This increases collision probability, which can cause unnecessary contention between unrelated composite triggers (performance/head-of-line blocking).
Agent Prompt
### Issue description
The advisory lock key is truncated to 31 bits, increasing collision probability and potential contention.
### Issue Context
This is a performance improvement; collisions are typically benign but avoidable.
### Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[38-45]
### Expected change
Use a 64-bit keyspace, e.g.:
- `lock_key = int(trigger.id) % (2**63)`
(or derive a signed 64-bit integer from 8 bytes of the UUID).
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
Benchmark PR from qodo-benchmark#281