fix: resolve race condition in compound trigger evaluation#148
Conversation
Fixes two race conditions in compound trigger evaluation: 1. **Never-firing race** (transactional): When two child triggers fire concurrently in separate transactions, each only sees its own uncommitted insert due to READ COMMITTED isolation. Neither sees enough firings to trigger the parent. Fix: Use PostgreSQL advisory locks to serialize concurrent evaluations for the same compound trigger. 2. **Double-firing race** (autocommit): When both transactions see all firings, both delete and both fire the parent. Fix: Use DELETE ... RETURNING to make clearing a claim operation. Only the worker that successfully deletes the expected firings proceeds; others bail out. Based on the fix in PrefectHQ/nebula#10716. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Code Review by Qodo
1. Nonstandard logger initialization
|
| import logging | ||
|
|
||
| logger: "logging.Logger" = get_logger(__name__) | ||
| logger = logging.getLogger(__name__) |
There was a problem hiding this comment.
1. Nonstandard logger initialization 📘 Rule violation ✧ Quality
• src/prefect/server/events/triggers.py initializes logger via logging.getLogger(__name__)
without the standardized type-annotated get_logger("module_name") pattern.
• This breaks consistency and type safety expectations for logging setup across the codebase.
Agent prompt
## Issue description
`src/prefect/server/events/triggers.py` initializes `logger` with `logging.getLogger(__name__)`, which violates the required standardized initialization pattern.
## Issue Context
Compliance requires all logger instances to follow `logger: "logging.Logger" = get_logger("module_name")` for consistent configuration and type safety.
## Fix Focus Areas
- src/prefect/server/events/triggers.py[27-74]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| async def acquire_composite_trigger_lock( | ||
| session: AsyncSession, | ||
| trigger: CompositeTrigger, | ||
| ) -> None: |
There was a problem hiding this comment.
2. Missing future annotations import 📘 Rule violation ✓ Correctness
• src/prefect/server/events/models/composite_trigger_child_firing.py contains type annotations but does not include from __future__ import annotations as the first import. • This violates the required convention for forward references and consistent type-checking performance.
Agent prompt
## Issue description
The file uses type annotations but is missing `from __future__ import annotations` as the first import.
## Issue Context
This import is required for all annotated Python files under `src/`.
## Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[1-21]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| import logging | ||
|
|
||
| logger: "logging.Logger" = get_logger(__name__) | ||
| logger = logging.getLogger(__name__) |
There was a problem hiding this comment.
3. triggers.py missing future annotations 📘 Rule violation ✓ Correctness
• src/prefect/server/events/triggers.py contains type annotations but does not include `from __future__ import annotations` as the first import. • This violates the required convention for annotated src/ Python files.
Agent prompt
## Issue description
The file contains type annotations but lacks `from __future__ import annotations` as required.
## Issue Context
All annotated Python files in `src/` must include this import as the first import statement.
## Fix Focus Areas
- src/prefect/server/events/triggers.py[1-30]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| ) -> set[UUID]: | ||
| """ | ||
| Delete the specified child firings and return the IDs that were actually deleted. | ||
|
|
||
| Returns the set of child_firing_ids that were successfully deleted. Callers can | ||
| compare this to the expected firing_ids to detect races and avoid double-firing | ||
| composite triggers. | ||
| """ | ||
| result = await session.execute( | ||
| sa.delete(db.CompositeTriggerChildFiring) | ||
| .filter( | ||
| db.CompositeTriggerChildFiring.automation_id == trigger.automation.id, | ||
| db.CompositeTriggerChildFiring.parent_trigger_id == trigger.id, | ||
| db.CompositeTriggerChildFiring.child_firing_id.in_(firing_ids), | ||
| ) | ||
| .returning(db.CompositeTriggerChildFiring.child_trigger_id) | ||
| ) |
There was a problem hiding this comment.
4. clear_child_firings returns wrong ids 📘 Rule violation ✓ Correctness
• clear_child_firings documents that it returns deleted child_firing_id values, but the SQL RETURNING clause returns child_trigger_id instead. • This makes the function misleading and can cause downstream logic/logging to reason about the wrong identifiers.
Agent prompt
## Issue description
`clear_child_firings` claims to return deleted `child_firing_id` values, but the query returns `child_trigger_id`, making the function misleading.
## Issue Context
Callers compare the returned IDs to the expected `firing_ids` to detect races; returning a different identifier type undermines that intent.
## Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[133-157]
- src/prefect/server/events/triggers.py[390-410]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| bind = session.get_bind() | ||
| if bind is None: | ||
| return | ||
|
|
There was a problem hiding this comment.
5. Lock acquisition silently skipped 📘 Rule violation ⛯ Reliability
• acquire_composite_trigger_lock returns early when session.get_bind() is None, with no logging or error signaling. • This can silently disable the race-condition protection and make concurrency issues harder to diagnose.
Agent prompt
## Issue description
Lock acquisition is skipped silently when `session.get_bind()` is `None`, which can undermine the intended race-condition mitigation.
## Issue Context
If this path occurs in production, concurrent evaluations may proceed without serialization and there will be no logs explaining why.
## Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[17-48]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| if dialect.name == "postgresql": | ||
| # Use the trigger's UUID as the lock key | ||
| # pg_advisory_xact_lock takes a bigint, so we use the UUID's int representation | ||
| # truncated to fit (collision is extremely unlikely and benign) | ||
| lock_key = hash(str(trigger.id)) % (2**63) | ||
| await session.execute( | ||
| sa.text("SELECT pg_advisory_xact_lock(:key)"), {"key": lock_key} | ||
| ) |
There was a problem hiding this comment.
6. Unstable lock key 🐞 Bug ✓ Correctness
• The advisory lock key is computed with Python’s hash(str(trigger.id)), which can differ across worker processes. • Since pg_advisory_xact_lock requires all contenders to use the same key, different keys mean no actual serialization and the “missed parent firing” race can still occur in multi-worker deployments. • This undermines the stated purpose of the new lock in evaluate_composite_trigger.
Agent prompt
## Issue description
`acquire_composite_trigger_lock` computes the advisory lock key using Python’s `hash()`, which can vary across worker processes. This defeats the goal of serializing concurrent evaluations across multiple workers.
## Issue Context
The lock is intended to prevent a race where multiple transactions each see only their own child firing and neither fires the parent.
## Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[17-46]
## Suggested approach
- Replace `hash(str(trigger.id))` with a deterministic value derived from the UUID, e.g.:
- `uuid_int = trigger.id.int`
- Convert to a signed 64-bit integer (Postgres bigint) deterministically, e.g. mask to 63 bits or map into `[-2**63, 2**63-1]`.
- Add a small unit test or update the concurrency regression test to validate behavior across processes if feasible (or at least validate the key derivation is deterministic).
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| result = await session.execute( | ||
| sa.delete(db.CompositeTriggerChildFiring) | ||
| .filter( | ||
| db.CompositeTriggerChildFiring.automation_id == trigger.automation.id, | ||
| db.CompositeTriggerChildFiring.parent_trigger_id == trigger.id, | ||
| db.CompositeTriggerChildFiring.child_firing_id.in_(firing_ids), | ||
| ) | ||
| .returning(db.CompositeTriggerChildFiring.child_trigger_id) | ||
| ) |
There was a problem hiding this comment.
7. Sqlite returning usage 🐞 Bug ⛯ Reliability
• clear_child_firings now uses DELETE ... RETURNING without checking the database dialect. • The codebase documents that SQLite may not support RETURNING in supported configurations, and it already uses dialect-specific logic elsewhere to avoid RETURNING on SQLite. • This can cause composite trigger evaluation to fail at runtime on SQLite deployments.
Agent prompt
## Issue description
`clear_child_firings` uses `DELETE .. RETURNING` unconditionally. The codebase already treats SQLite as potentially lacking `RETURNING` support, so this can break composite trigger evaluation on SQLite.
## Issue Context
Prefect supports SQLite with minimum version 3.24.0, and elsewhere explicitly avoids `RETURNING` on SQLite.
## Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[133-157]
## Suggested approach
- Determine dialect (similar to other code paths) and:
- For Postgres: keep `RETURNING` if you need deleted IDs.
- For SQLite: avoid `RETURNING` and instead rely on `result.rowcount` to detect whether all expected rows were deleted.
- Alternatively, remove `RETURNING` entirely and use `rowcount` on all dialects if you only need the count for race-detection.
- Ensure callers in `evaluate_composite_trigger` continue to correctly detect partial deletes without needing returned IDs.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
Benchmark PR from qodo-benchmark#563