fix: resolve race condition in compound trigger evaluation#174
Conversation
Fixes two race conditions in compound trigger evaluation: 1. **Never-firing race** (transactional): When two child triggers fire concurrently in separate transactions, each only sees its own uncommitted insert due to READ COMMITTED isolation. Neither sees enough firings to trigger the parent. Fix: Use PostgreSQL advisory locks to serialize concurrent evaluations for the same compound trigger. 2. **Double-firing race** (autocommit): When both transactions see all firings, both delete and both fire the parent. Fix: Use DELETE ... RETURNING to make clearing a claim operation. Only the worker that successfully deletes the expected firings proceeds; others bail out. Based on the fix in PrefectHQ/nebula#10716. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Review Summary by QodoFix race conditions in compound trigger evaluation
WalkthroughsDescription• Resolve race condition preventing compound triggers from firing - Serialize concurrent child trigger evaluations with advisory locks - Prevent double-firing by making deletion a claim operation • Add comprehensive concurrency tests for compound trigger evaluation • Improve logging for race condition detection and debugging Diagramflowchart LR
A["Child Trigger Events"] -->|"concurrent processing"| B["Acquire Advisory Lock"]
B -->|"serialized evaluation"| C["Check Child Firings"]
C -->|"all conditions met"| D["DELETE...RETURNING"]
D -->|"claim operation"| E{"Deleted == Expected?"}
E -->|"yes"| F["Fire Parent Trigger"]
E -->|"no"| G["Skip - Another Worker Won"]
File Changes1. src/prefect/server/events/models/composite_trigger_child_firing.py
|
Code Review by Qodo
1. composite_trigger_child_firing missing future
|
|
|
||
| from prefect.server.database import PrefectDBInterface, db_injector | ||
| from prefect.server.events.schemas.automations import CompositeTrigger, Firing | ||
| from prefect.server.utilities.database import get_dialect |
There was a problem hiding this comment.
1. composite_trigger_child_firing missing future 📘 Rule violation ✓ Correctness
This src/ file contains type annotations but does not include from __future__ import annotations as the first import. This can break forward references and violates the repository typing/import standard.
Agent Prompt
## Issue description
`src/prefect/server/events/models/composite_trigger_child_firing.py` contains type annotations but is missing `from __future__ import annotations` as the first import.
## Issue Context
Compliance requires this import for typed `src/` files to support forward references and improve type-checking behavior.
## Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[1-11]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
|
|
||
| from prefect.server.database.orm_models import ORMAutomationBucket | ||
|
|
||
| import logging |
There was a problem hiding this comment.
2. triggers.py missing future 📘 Rule violation ✓ Correctness
This src/ file contains type annotations but does not include from __future__ import annotations as the first import. This violates the required typing/import convention for typed Python modules.
Agent Prompt
## Issue description
`src/prefect/server/events/triggers.py` includes type annotations but lacks `from __future__ import annotations` as the first import.
## Issue Context
The module begins with a docstring; the `__future__` import should come right after that docstring and before other imports.
## Fix Focus Areas
- src/prefect/server/events/triggers.py[1-24]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| import logging | ||
|
|
||
| logger: "logging.Logger" = get_logger(__name__) | ||
| logger = logging.getLogger(__name__) |
There was a problem hiding this comment.
3. logger uses logging.getlogger 📘 Rule violation ✧ Quality
The logger is now instantiated via logging.getLogger(__name__) and lacks the standardized type-annotated get_logger(...) pattern. This can lead to inconsistent logging configuration and violates the required logger initialization standard.
Agent Prompt
## Issue description
`logger` is initialized with `logging.getLogger(__name__)` and is missing the standardized `get_logger` initialization pattern and type annotation.
## Issue Context
The required pattern is `logger: "logging.Logger" = get_logger("module_name")` to ensure consistent configuration.
## Fix Focus Areas
- src/prefect/server/events/triggers.py[71-73]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| class TestCompoundTriggerConcurrency: | ||
| """Tests for concurrent child trigger evaluation race condition fix.""" | ||
|
|
||
| @pytest.fixture | ||
| async def compound_automation_concurrent( | ||
| self, | ||
| automations_session: AsyncSession, | ||
| cleared_buckets: None, | ||
| cleared_automations: None, | ||
| ) -> Automation: | ||
| """Compound trigger requiring all child triggers to fire.""" | ||
| compound_automation = Automation( | ||
| name="Compound Automation Concurrency Test", | ||
| trigger=CompoundTrigger( | ||
| require="all", | ||
| within=timedelta(minutes=5), | ||
| triggers=[ | ||
| EventTrigger( | ||
| expect={"event.A"}, | ||
| match={"prefect.resource.id": "*"}, | ||
| posture=Posture.Reactive, | ||
| threshold=1, | ||
| ), | ||
| EventTrigger( | ||
| expect={"event.B"}, | ||
| match={"prefect.resource.id": "*"}, | ||
| posture=Posture.Reactive, | ||
| threshold=1, | ||
| ), | ||
| ], | ||
| ), | ||
| actions=[actions.DoNothing()], | ||
| ) | ||
|
|
||
| persisted = await automations.create_automation( | ||
| session=automations_session, automation=compound_automation | ||
| ) | ||
| compound_automation.created = persisted.created | ||
| compound_automation.updated = persisted.updated | ||
| triggers.load_automation(persisted) | ||
| await automations_session.commit() | ||
|
|
||
| return compound_automation | ||
|
|
||
| async def test_compound_trigger_does_not_double_fire_when_children_race( | ||
| self, | ||
| act: mock.AsyncMock, | ||
| compound_automation_concurrent: Automation, | ||
| start_of_test: DateTime, | ||
| ): | ||
| """ | ||
| Regression test for compound trigger double-firing when child firings race. | ||
|
|
||
| Verifies that when two child trigger events are processed concurrently, | ||
| the compound trigger fires exactly once. The DELETE ... RETURNING fix | ||
| ensures only one worker proceeds to fire the parent trigger. | ||
| """ | ||
| event_a = ReceivedEvent( | ||
| occurred=start_of_test + timedelta(microseconds=1), | ||
| event="event.A", | ||
| resource={"prefect.resource.id": "test.resource"}, | ||
| id=uuid4(), | ||
| ) | ||
| event_b = ReceivedEvent( | ||
| occurred=start_of_test + timedelta(microseconds=2), | ||
| event="event.B", | ||
| resource={"prefect.resource.id": "test.resource"}, | ||
| id=uuid4(), | ||
| ) | ||
|
|
||
| # Process both events concurrently | ||
| await asyncio.gather( | ||
| triggers.reactive_evaluation(event_a), | ||
| triggers.reactive_evaluation(event_b), | ||
| ) | ||
|
|
||
| # The compound trigger should fire exactly once | ||
| act.assert_called_once() | ||
|
|
||
| firing: Firing = act.call_args.args[0] | ||
| assert isinstance(firing.trigger, CompoundTrigger) | ||
| assert firing.trigger.id == compound_automation_concurrent.trigger.id | ||
|
|
||
| async def test_concurrent_child_firings_still_triggers_parent( | ||
| self, | ||
| act: mock.AsyncMock, | ||
| compound_automation_concurrent: Automation, | ||
| start_of_test: DateTime, | ||
| ): | ||
| """ | ||
| Verify that when two child trigger events arrive nearly simultaneously, | ||
| the compound trigger still fires. This tests that the race condition fix | ||
| doesn't prevent legitimate firings. | ||
| """ | ||
| event_a = ReceivedEvent( | ||
| occurred=start_of_test + timedelta(microseconds=1), | ||
| event="event.A", | ||
| resource={"prefect.resource.id": "test.resource"}, | ||
| id=uuid4(), | ||
| ) | ||
| event_b = ReceivedEvent( | ||
| occurred=start_of_test + timedelta(microseconds=2), | ||
| event="event.B", | ||
| resource={"prefect.resource.id": "test.resource"}, | ||
| id=uuid4(), | ||
| ) | ||
|
|
||
| # Process both events concurrently to simulate the race condition | ||
| await asyncio.gather( | ||
| triggers.reactive_evaluation(event_a), | ||
| triggers.reactive_evaluation(event_b), | ||
| ) | ||
|
|
||
| # The compound trigger should fire exactly once | ||
| act.assert_called_once() | ||
|
|
||
| firing: Firing = act.call_args.args[0] | ||
| assert firing.trigger.id == compound_automation_concurrent.trigger.id |
There was a problem hiding this comment.
4. New tests not mirrored 📘 Rule violation ⛯ Reliability
The newly added tests for src/prefect/server/events/triggers.py are placed under tests/events/server/triggers/ which does not mirror the source directory structure required by the checklist. This reduces discoverability and violates the test layout convention.
Agent Prompt
## Issue description
New tests for `src/prefect/server/events/triggers.py` were added in a test file path that does not mirror the `src/` directory structure.
## Issue Context
The checklist requires test organization to mirror the source hierarchy to keep tests discoverable.
## Fix Focus Areas
- tests/events/server/triggers/test_composite_triggers.py[1630-1747]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| if dialect.name == "postgresql": | ||
| # Use the trigger's UUID as the lock key | ||
| # pg_advisory_xact_lock takes a bigint, so we use the UUID's int representation | ||
| # truncated to fit (collision is extremely unlikely and benign) | ||
| lock_key = hash(str(trigger.id)) % (2**63) | ||
| await session.execute( | ||
| sa.text("SELECT pg_advisory_xact_lock(:key)"), {"key": lock_key} | ||
| ) |
There was a problem hiding this comment.
5. Unstable lock key 🐞 Bug ✓ Correctness
acquire_composite_trigger_lock uses hash(str(trigger.id)) to derive the Postgres advisory lock key, which is not stable across processes, so concurrent workers may take different locks and the composite-trigger race can still occur. This contradicts the function’s own comment/docstring that it serializes evaluations for the same trigger.
Agent Prompt
### Issue description
`acquire_composite_trigger_lock` uses Python `hash()` to compute the Postgres advisory lock key. Python hash values are not stable across processes, so different workers can compute different lock keys for the same trigger UUID and fail to serialize evaluations.
### Issue Context
The function’s docstring/comment state that it serializes concurrent child trigger evaluations for the same composite trigger.
### Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[17-45]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| """ | ||
| Delete the specified child firings and return the IDs that were actually deleted. | ||
|
|
||
| Returns the set of child_firing_ids that were successfully deleted. Callers can | ||
| compare this to the expected firing_ids to detect races and avoid double-firing | ||
| composite triggers. | ||
| """ | ||
| result = await session.execute( | ||
| sa.delete(db.CompositeTriggerChildFiring) | ||
| .filter( | ||
| db.CompositeTriggerChildFiring.automation_id == trigger.automation.id, | ||
| db.CompositeTriggerChildFiring.parent_trigger_id == trigger.id, | ||
| db.CompositeTriggerChildFiring.child_firing_id.in_(firing_ids), | ||
| ) | ||
| .returning(db.CompositeTriggerChildFiring.child_trigger_id) | ||
| ) |
There was a problem hiding this comment.
6. Wrong ids returned 🐞 Bug ✓ Correctness
clear_child_firings claims to return deleted child_firing_ids but actually RETURNINGs child_trigger_id, while evaluate_composite_trigger treats the returned IDs as firing IDs (including in logs). This breaks the function contract and produces misleading race-diagnosis output.
Agent Prompt
### Issue description
`clear_child_firings` is documented/used as returning deleted firing IDs, but it returns trigger IDs instead, violating its contract and corrupting caller logging/diagnostics.
### Issue Context
`evaluate_composite_trigger` builds `firing_ids` from `Firing.id` and expects `clear_child_firings` to return those IDs.
### Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[133-157]
- src/prefect/server/events/triggers.py[365-410]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| result = await session.execute( | ||
| sa.delete(db.CompositeTriggerChildFiring) | ||
| .filter( | ||
| db.CompositeTriggerChildFiring.automation_id == trigger.automation.id, | ||
| db.CompositeTriggerChildFiring.parent_trigger_id == trigger.id, | ||
| db.CompositeTriggerChildFiring.child_firing_id.in_(firing_ids), | ||
| ) | ||
| .returning(db.CompositeTriggerChildFiring.child_trigger_id) | ||
| ) |
There was a problem hiding this comment.
7. Sqlite returning breakage 🐞 Bug ⛯ Reliability
clear_child_firings uses DELETE … RETURNING without checking the DB dialect, but Prefect explicitly supports SQLite and elsewhere treats .returning() as Postgres-only. On SQLite configurations where RETURNING is unavailable/unsupported, composite trigger evaluation will raise SQL errors at runtime.
Agent Prompt
### Issue description
`clear_child_firings` uses `.returning()` unconditionally even though the codebase supports SQLite and already treats RETURNING as a Postgres-only feature in other write paths.
### Issue Context
Composite trigger evaluation runs on the main automation DB session, which can be configured for SQLite or Postgres.
### Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[133-157]
- src/prefect/server/events/storage/database.py[252-286]
- src/prefect/server/database/configurations.py[411-435]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
Benchmark PR from agentic-review-benchmarks#7