Skip to content

fix: resolve race condition in compound trigger evaluation#138

Open
tomerqodo wants to merge 2 commits intoqodo_combined_20260121_demo_10_base_fix_resolve_race_condition_in_compound_trigger_evaluation_pr543from
qodo_combined_20260121_demo_10_head_fix_resolve_race_condition_in_compound_trigger_evaluation_pr543
Open

fix: resolve race condition in compound trigger evaluation#138
tomerqodo wants to merge 2 commits intoqodo_combined_20260121_demo_10_base_fix_resolve_race_condition_in_compound_trigger_evaluation_pr543from
qodo_combined_20260121_demo_10_head_fix_resolve_race_condition_in_compound_trigger_evaluation_pr543

Conversation

@tomerqodo
Copy link
Copy Markdown

Benchmark PR from qodo-benchmark#543

desertaxle and others added 2 commits January 21, 2026 13:58
Fixes two race conditions in compound trigger evaluation:

1. **Never-firing race** (transactional): When two child triggers fire
   concurrently in separate transactions, each only sees its own
   uncommitted insert due to READ COMMITTED isolation. Neither sees
   enough firings to trigger the parent.

   Fix: Use PostgreSQL advisory locks to serialize concurrent evaluations
   for the same compound trigger.

2. **Double-firing race** (autocommit): When both transactions see all
   firings, both delete and both fire the parent.

   Fix: Use DELETE ... RETURNING to make clearing a claim operation.
   Only the worker that successfully deletes the expected firings
   proceeds; others bail out.

Based on the fix in PrefectHQ/nebula#10716.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@qodo-code-review
Copy link
Copy Markdown

Code Review by Qodo

🐞 Bugs (2) 📘 Rule violations (2) 📎 Requirement gaps (0)

Grey Divider


Action required

1. Missing future annotations import 📘 Rule violation ✓ Correctness
Description
src/prefect/server/events/models/composite_trigger_child_firing.py contains type annotations but
  does not include from __future__ import annotations as the first import.
• This violates the typing import standard required for src/ files and can lead to poorer
  type-checking performance and forward-reference issues.
• Add the future import at the top of the file before any other imports.
Code

src/prefect/server/events/models/composite_trigger_child_firing.py[R17-20]

+async def acquire_composite_trigger_lock(
+    session: AsyncSession,
+    trigger: CompositeTrigger,
+) -> None:
Evidence
The checklist requires from __future__ import annotations as the first import for any src/
Python file with type annotations. The file uses annotations (e.g., session: AsyncSession,
trigger: CompositeTrigger) but starts with other imports and lacks the future import.

AGENTS.md
src/prefect/server/events/models/composite_trigger_child_firing.py[1-6]
src/prefect/server/events/models/composite_trigger_child_firing.py[17-20]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`src/prefect/server/events/models/composite_trigger_child_firing.py` uses type annotations but is missing `from __future__ import annotations` as the first import, violating the required typing standard.

## Issue Context
The file contains annotated function signatures (e.g., `session: AsyncSession`, `trigger: CompositeTrigger`) and return types, but it begins with regular imports.

## Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[1-20]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. Nonstandard logger initialization 📘 Rule violation ✧ Quality
Description
src/prefect/server/events/triggers.py initializes logger with logging.getLogger(__name__)
  and without the required type annotation.
• This violates the standardized logger initialization pattern and can break consistency/type-safety
  expectations across the codebase.
• Replace it with the required logger: "logging.Logger" = get_logger("module_name") pattern.
Code

src/prefect/server/events/triggers.py[R71-73]

+import logging

-logger: "logging.Logger" = get_logger(__name__)
+logger = logging.getLogger(__name__)
Evidence
The checklist requires logger instances to use `logger: "logging.Logger" =
get_logger("module_name"). The new code uses logging.getLogger(__name__)` instead and omits the
required type annotation.

AGENTS.md
src/prefect/server/events/triggers.py[71-74]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`src/prefect/server/events/triggers.py` uses `logging.getLogger(__name__)` for `logger`, violating the required standardized logger initialization and typing pattern.

## Issue Context
The module already imports `get_logger` but does not use it for `logger` initialization.

## Fix Focus Areas
- src/prefect/server/events/triggers.py[27-27]
- src/prefect/server/events/triggers.py[71-76]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


3. Unstable advisory lock key 🐞 Bug ⛯ Reliability
Description
• The advisory lock key is computed with Python’s built-in hash(), which is process-randomized;
  different workers/processes will compute different keys for the same trigger id.
• In multi-worker mode this undermines cross-process serialization, so the original race can still
  occur (only intra-process serialization is achieved).
• This is especially important because Prefect explicitly supports multi-worker server mode against
  Postgres.
Code

src/prefect/server/events/models/composite_trigger_child_firing.py[R38-45]

+    if dialect.name == "postgresql":
+        # Use the trigger's UUID as the lock key
+        # pg_advisory_xact_lock takes a bigint, so we use the UUID's int representation
+        # truncated to fit (collision is extremely unlikely and benign)
+        lock_key = hash(str(trigger.id)) % (2**63)
+        await session.execute(
+            sa.text("SELECT pg_advisory_xact_lock(:key)"), {"key": lock_key}
+        )
Evidence
The lock key is derived from hash(str(trigger.id)), which is not a stable mapping across processes.
Prefect supports multi-worker mode (multiple workers/processes) on Postgres, so the advisory lock
must use a deterministic key for cross-process serialization.

src/prefect/server/events/models/composite_trigger_child_firing.py[38-45]
tests/cli/test_start_server.py[132-177]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`acquire_composite_trigger_lock` uses `hash(str(trigger.id))` to derive a Postgres advisory lock key. Python’s `hash()` is process-randomized, so different workers will compute different keys for the same trigger UUID, defeating cross-process serialization.

### Issue Context
This lock is intended to serialize concurrent composite-trigger evaluations across workers.

### Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[38-45]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

4. Wrong RETURNING column 🐞 Bug ✧ Quality
Description
• clear_child_firings claims to return deleted child_firing_ids, but the DELETE..RETURNING currently
  returns child_trigger_id.
• This makes the returned IDs (and the debug log field deleted_firing_ids) misleading, reducing
  debuggability and risking future misuse if callers rely on the returned values.
• The current race-detection length check may “work by accident” (one row per child trigger) but the
  values are still wrong for the API promised by the docstring/log fields.
Code

src/prefect/server/events/models/composite_trigger_child_firing.py[R139-155]

+) -> set[UUID]:
+    """
+    Delete the specified child firings and return the IDs that were actually deleted.
+
+    Returns the set of child_firing_ids that were successfully deleted. Callers can
+    compare this to the expected firing_ids to detect races and avoid double-firing
+    composite triggers.
+    """
+    result = await session.execute(
+        sa.delete(db.CompositeTriggerChildFiring)
+        .filter(
            db.CompositeTriggerChildFiring.automation_id == trigger.automation.id,
            db.CompositeTriggerChildFiring.parent_trigger_id == trigger.id,
            db.CompositeTriggerChildFiring.child_firing_id.in_(firing_ids),
        )
+        .returning(db.CompositeTriggerChildFiring.child_trigger_id)
    )
Evidence
The function documents returning child_firing_ids but returns child_trigger_id from the ORM. The ORM
model includes both columns, and the caller logs these returned values as deleted_firing_ids, so
logs will be incorrect.

src/prefect/server/events/models/composite_trigger_child_firing.py[140-155]
src/prefect/server/database/orm_models.py[1368-1371]
src/prefect/server/events/triggers.py[390-408]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`clear_child_firings` documents returning `child_firing_ids` and the caller logs them as such, but the DELETE statement returns `child_trigger_id`.

### Issue Context
The returned IDs are used for debug logging and may be used by future callers.

### Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[140-155]
- src/prefect/server/events/triggers.py[390-408]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



ⓘ The new review experience is currently in Beta. Learn more

Qodo Logo

Comment on lines +17 to +20
async def acquire_composite_trigger_lock(
session: AsyncSession,
trigger: CompositeTrigger,
) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Missing future annotations import 📘 Rule violation ✓ Correctness

src/prefect/server/events/models/composite_trigger_child_firing.py contains type annotations but
  does not include from __future__ import annotations as the first import.
• This violates the typing import standard required for src/ files and can lead to poorer
  type-checking performance and forward-reference issues.
• Add the future import at the top of the file before any other imports.
Agent prompt
## Issue description
`src/prefect/server/events/models/composite_trigger_child_firing.py` uses type annotations but is missing `from __future__ import annotations` as the first import, violating the required typing standard.

## Issue Context
The file contains annotated function signatures (e.g., `session: AsyncSession`, `trigger: CompositeTrigger`) and return types, but it begins with regular imports.

## Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[1-20]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines +71 to +73
import logging

logger: "logging.Logger" = get_logger(__name__)
logger = logging.getLogger(__name__)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

2. Nonstandard logger initialization 📘 Rule violation ✧ Quality

src/prefect/server/events/triggers.py initializes logger with logging.getLogger(__name__)
  and without the required type annotation.
• This violates the standardized logger initialization pattern and can break consistency/type-safety
  expectations across the codebase.
• Replace it with the required logger: "logging.Logger" = get_logger("module_name") pattern.
Agent prompt
## Issue description
`src/prefect/server/events/triggers.py` uses `logging.getLogger(__name__)` for `logger`, violating the required standardized logger initialization and typing pattern.

## Issue Context
The module already imports `get_logger` but does not use it for `logger` initialization.

## Fix Focus Areas
- src/prefect/server/events/triggers.py[27-27]
- src/prefect/server/events/triggers.py[71-76]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines +38 to +45
if dialect.name == "postgresql":
# Use the trigger's UUID as the lock key
# pg_advisory_xact_lock takes a bigint, so we use the UUID's int representation
# truncated to fit (collision is extremely unlikely and benign)
lock_key = hash(str(trigger.id)) % (2**63)
await session.execute(
sa.text("SELECT pg_advisory_xact_lock(:key)"), {"key": lock_key}
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

3. Unstable advisory lock key 🐞 Bug ⛯ Reliability

• The advisory lock key is computed with Python’s built-in hash(), which is process-randomized;
  different workers/processes will compute different keys for the same trigger id.
• In multi-worker mode this undermines cross-process serialization, so the original race can still
  occur (only intra-process serialization is achieved).
• This is especially important because Prefect explicitly supports multi-worker server mode against
  Postgres.
Agent prompt
### Issue description
`acquire_composite_trigger_lock` uses `hash(str(trigger.id))` to derive a Postgres advisory lock key. Python’s `hash()` is process-randomized, so different workers will compute different keys for the same trigger UUID, defeating cross-process serialization.

### Issue Context
This lock is intended to serialize concurrent composite-trigger evaluations across workers.

### Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[38-45]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines +139 to 155
) -> set[UUID]:
"""
Delete the specified child firings and return the IDs that were actually deleted.

Returns the set of child_firing_ids that were successfully deleted. Callers can
compare this to the expected firing_ids to detect races and avoid double-firing
composite triggers.
"""
result = await session.execute(
sa.delete(db.CompositeTriggerChildFiring)
.filter(
db.CompositeTriggerChildFiring.automation_id == trigger.automation.id,
db.CompositeTriggerChildFiring.parent_trigger_id == trigger.id,
db.CompositeTriggerChildFiring.child_firing_id.in_(firing_ids),
)
.returning(db.CompositeTriggerChildFiring.child_trigger_id)
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remediation recommended

4. Wrong returning column 🐞 Bug ✧ Quality

• clear_child_firings claims to return deleted child_firing_ids, but the DELETE..RETURNING currently
  returns child_trigger_id.
• This makes the returned IDs (and the debug log field deleted_firing_ids) misleading, reducing
  debuggability and risking future misuse if callers rely on the returned values.
• The current race-detection length check may “work by accident” (one row per child trigger) but the
  values are still wrong for the API promised by the docstring/log fields.
Agent prompt
### Issue description
`clear_child_firings` documents returning `child_firing_ids` and the caller logs them as such, but the DELETE statement returns `child_trigger_id`.

### Issue Context
The returned IDs are used for debug logging and may be used by future callers.

### Fix Focus Areas
- src/prefect/server/events/models/composite_trigger_child_firing.py[140-155]
- src/prefect/server/events/triggers.py[390-408]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants