Skip to content

Add arun_deployment and replace @sync_compatible with @async_dispatch#173

Open
tomerqodo wants to merge 3 commits intoqodo_claude_vs_qodo_base_add_arun_deployment_and_replace_sync_compatible_with_async_dispatch_pr6from
qodo_claude_vs_qodo_head_add_arun_deployment_and_replace_sync_compatible_with_async_dispatch_pr6
Open

Add arun_deployment and replace @sync_compatible with @async_dispatch#173
tomerqodo wants to merge 3 commits intoqodo_claude_vs_qodo_base_add_arun_deployment_and_replace_sync_compatible_with_async_dispatch_pr6from
qodo_claude_vs_qodo_head_add_arun_deployment_and_replace_sync_compatible_with_async_dispatch_pr6

Conversation

@tomerqodo
Copy link
Copy Markdown

Benchmark PR from agentic-review-benchmarks#6

desertaxle and others added 3 commits January 25, 2026 12:10
This change follows the intent of issue PrefectHQ#15008 to replace implicit
sync/async conversion with explicit, type-safe alternatives.

Changes:
- Add `arun_deployment` as an explicit async function for running deployments
- Replace `@sync_compatible` with `@async_dispatch` on `run_deployment`
- `run_deployment` now dispatches to `arun_deployment` in async context
- Sync context uses `SyncPrefectClient` directly (no event loop magic)
- Export `arun_deployment` from `prefect.deployments`
- Add comprehensive tests for both sync and async behavior

The `run_deployment.aio` attribute is preserved for backward compatibility.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@qodo-code-review
Copy link
Copy Markdown

Review Summary by Qodo

Replace @sync_compatible with @async_dispatch for explicit async deployment handling

✨ Enhancement

Grey Divider

Walkthroughs

Description
• Replace @sync_compatible with explicit @async_dispatch decorator on run_deployment
• Add new arun_deployment async function for explicit async deployment execution
• Implement proper client initialization using get_or_create_client instead of inject_client
• Add comprehensive test suite for async, sync, and dispatch behavior
• Create backward compatibility alias and .aio attribute for existing code
Diagram
flowchart LR
  A["run_deployment<br/>@async_dispatch"] -->|"async context"| B["arun_deployment<br/>explicit async"]
  A -->|"sync context"| C["SyncPrefectClient<br/>direct execution"]
  D["Backward<br/>Compatibility"] -->|".aio attribute"| B
  E["Client<br/>Initialization"] -->|"get_or_create_client"| F["PrefectClient<br/>instance"]
Loading

Grey Divider

File Changes

1. src/prefect/deployments/__init__.py ✨ Enhancement +3/-2

Export arun_deployment from deployments module

• Export arun_deployment from the deployments module public API
• Add arun_deployment to TYPE_CHECKING imports and __all__ list
• Register arun_deployment in _public_api dictionary for dynamic imports

src/prefect/deployments/init.py


2. src/prefect/deployments/flow_runs.py ✨ Enhancement +26/-9

Refactor to explicit async with @async_dispatch pattern

• Rename run_deployment to arun_deployment and remove @sync_compatible decorator
• Replace @inject_client with get_or_create_client for explicit client handling
• Update docstring to clarify async behavior and add usage example
• Fix bug: move await anyio.sleep(poll_interval) before flow run status check
• Fix bug: use deployment_id instead of deployment.id in API call
• Remove unnecessary await on collect_task_run_inputs call
• Create run_deployment as backward-compatible alias to arun_deployment

src/prefect/deployments/flow_runs.py


3. tests/deployment/test_flow_runs.py 🧪 Tests +334/-1

Add comprehensive tests for async and sync dispatch behavior

• Add TestArunDeployment class with 10 comprehensive async tests covering basic calls, deployment
 IDs, parameters, tags, custom names, job variables, timeout validation, idempotency keys, and parent
 flow linking
• Add TestRunDeploymentSyncContext class with 5 sync context tests using _sync=True parameter
• Add TestAsyncDispatchBehavior class with 2 tests verifying async dispatch routing and .aio
 attribute backward compatibility
• Import arun_deployment alongside run_deployment and add inspect module import

tests/deployment/test_flow_runs.py


Grey Divider

Qodo Logo

@qodo-code-review
Copy link
Copy Markdown

qodo-code-review Bot commented Mar 10, 2026

Code Review by Qodo

🐞 Bugs (3) 📘 Rule violations (3) 📎 Requirement gaps (0)

Grey Divider


Action required

1. flow_runs.py missing future annotations 📘 Rule violation ✓ Correctness
Description
src/prefect/deployments/flow_runs.py contains type annotations but does not include `from
__future__ import annotations` as the first import. This can break forward references and violates
the required typing import standard.
Code

src/prefect/deployments/flow_runs.py[R11-18]

+from prefect.client.utilities import get_or_create_client
from prefect.context import FlowRunContext, TaskRunContext
from prefect.logging import get_logger
from prefect.states import Pending, Scheduled
from prefect.tasks import Task
from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry
from prefect.types._datetime import now
-from prefect.utilities.asyncutils import sync_compatible
+from prefect.utilities._engine import dynamic_key_for_task_run
Evidence
The checklist requires from __future__ import annotations as the first import in src/ Python
files that use annotations. In flow_runs.py, the first import is from datetime import datetime
and the file defines annotated signatures like async def arun_deployment(...).

AGENTS.md
src/prefect/deployments/flow_runs.py[1-3]
src/prefect/deployments/flow_runs.py[48-62]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`src/prefect/deployments/flow_runs.py` uses type annotations but does not start with `from __future__ import annotations`, which is required for consistent forward-reference behavior.

## Issue Context
This file contains many annotations (e.g., `Optional[&quot;PrefectClient&quot;]`, return type `&quot;FlowRun&quot;`). The compliance rule requires the future import as the first import in annotated `src/` files.

## Fix Focus Areas
- src/prefect/deployments/flow_runs.py[1-3]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. run_deployment lacks sync wrapper 📘 Rule violation ⛯ Reliability
Description
run_deployment is now just an alias to the async arun_deployment, so there is no true
sync-compatible wrapper. This violates the requirement that public async user-facing APIs provide
sync compatibility.
Code

src/prefect/deployments/flow_runs.py[R240-241]

+# Alias for backwards compatibility
+run_deployment = arun_deployment
Evidence
The compliance rule requires a sync compatibility wrapper for user-facing async functions. The code
defines arun_deployment as async and then sets run_deployment = arun_deployment, which does not
provide a sync wrapper (and will return a coroutine in sync contexts).

AGENTS.md
src/prefect/deployments/flow_runs.py[48-61]
src/prefect/deployments/flow_runs.py[240-241]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`run_deployment` is currently just an alias to `arun_deployment` (`run_deployment = arun_deployment`), so it does not provide sync compatibility for user-facing callers.

## Issue Context
The compliance requirement expects public async APIs to offer sync wrappers (either via `@sync_compatible` or an explicit sync function). The codebase already uses `@async_dispatch(async_impl)` for this purpose.

## Fix Focus Areas
- src/prefect/deployments/flow_runs.py[48-61]
- src/prefect/deployments/flow_runs.py[240-241]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


3. Tests not in deployments/ 📘 Rule violation ⛯ Reliability
Description
The tests for src/prefect/deployments/flow_runs.py are located under tests/deployment/ instead
of mirroring the deployments/ module path. This violates the required test directory mirroring
convention.
Code

tests/deployment/test_flow_runs.py[18]

+from prefect.deployments import arun_deployment, run_deployment
Evidence
The rule requires tests to mirror the source directory structure
(src/prefect/deployments/flow_runs.py -> tests/deployments/test_flow_runs.py). This PR
adds/updates tests in tests/deployment/test_flow_runs.py, which does not match the deployments
source module path.

AGENTS.md
src/prefect/deployments/flow_runs.py[1-3]
tests/deployment/test_flow_runs.py[1-4]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
Tests for the `deployments/flow_runs.py` source module live under `tests/deployment/` instead of `tests/deployments/`, violating the required mirrored structure.

## Issue Context
The compliance rule expects a mirrored path for discoverability and consistency.

## Fix Focus Areas
- tests/deployment/test_flow_runs.py[1-40]
- src/prefect/deployments/flow_runs.py[1-5]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


View more (2)
4. None deployment_id passed 🐞 Bug ✓ Correctness
Description
arun_deployment calls client.create_flow_run_from_deployment(deployment_id, ...) even when
name is a non-UUID deployment name, leaving deployment_id=None and causing an invalid required
UUID argument at runtime.
Code

src/prefect/deployments/flow_runs.py[R211-213]

    flow_run = await client.create_flow_run_from_deployment(
-        deployment.id,
+        deployment_id,
        parameters=parameters,
Evidence
deployment_id is initialized to None and only set when name is a UUID/UUID-string; when name
is a deployment name, the code reads the deployment by name but never assigns deployment.id back
to deployment_id, then passes the still-None variable into create_flow_run_from_deployment,
whose signature requires a UUID.

src/prefect/deployments/flow_runs.py[123-139]
src/prefect/deployments/flow_runs.py[211-222]
src/prefect/client/orchestration/_deployments/client.py[1297-1317]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`arun_deployment` passes `deployment_id` into `create_flow_run_from_deployment`, but `deployment_id` remains `None` when `name` is a deployment name like `&quot;flow/deployment&quot;`. This breaks the primary documented use-case.

### Issue Context
You already load `deployment` via `read_deployment_by_name(name)` when `deployment_id` is not provided; the `deployment.id` should be used for creating the flow run.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[123-139]
- src/prefect/deployments/flow_runs.py[211-222]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


5. Missing await task inputs 🐞 Bug ✓ Correctness
Description
When linking a subflow, task_inputs uses collect_task_run_inputs(v) without await even though
collect_task_run_inputs is async, resulting in coroutine objects being passed as task inputs.
Code

src/prefect/deployments/flow_runs.py[R146-150]

        # This was called from a flow. Link the flow run as a subflow.
        task_inputs = {
-            k: await collect_task_run_inputs(v) for k, v in parameters.items()
+            k: collect_task_run_inputs(v) for k, v in parameters.items()
        }
Evidence
collect_task_run_inputs is declared as async def and even documents `await
collect_task_run_inputs(v) usage; the updated comprehension in arun_deployment removed await`,
so the dict values become coroutine objects instead of the expected
set[TaskRunResult|FlowRunResult].

src/prefect/deployments/flow_runs.py[145-150]
src/prefect/utilities/engine.py[66-81]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
The subflow-linking path populates `task_inputs` with coroutine objects because `collect_task_run_inputs` is async but is called without `await`.

### Issue Context
This code runs inside `arun_deployment` (async), so `await` is the correct way to obtain the expected `set[...]` values.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[143-196]
- src/prefect/utilities/engine.py[66-115]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

6. Unclosed created client 🐞 Bug ⛯ Reliability
Description
arun_deployment uses get_or_create_client but ignores the returned "inferred" flag and never
context-manages/close the client when it had to create a new one.
Code

src/prefect/deployments/flow_runs.py[R133-134]

+    client, _ = get_or_create_client(client)
+
Evidence
get_or_create_client returns (client, inferred) where inferred=False means a new client was
created; the prior inject_client pattern explicitly uses async with to close created clients.
The new code drops @inject_client and does not replicate its context-management behavior, so
created clients can leak connections/resources.

src/prefect/deployments/flow_runs.py[133-134]
src/prefect/client/utilities.py[31-61]
src/prefect/client/utilities.py[74-100]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
When `client` is not provided and no context client exists, `get_or_create_client` creates a new client that should be closed. `arun_deployment` currently discards the inferred/created flag and never closes the created client.

### Issue Context
The previous `@inject_client` decorator handled this via `async with` when `inferred` is False.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[48-139]
- src/prefect/client/utilities.py[31-61]
- src/prefect/client/utilities.py[74-100]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

ⓘ The new review experience is currently in Beta. Learn more

Grey Divider

Qodo Logo

Comment on lines +11 to +18
from prefect.client.utilities import get_or_create_client
from prefect.context import FlowRunContext, TaskRunContext
from prefect.logging import get_logger
from prefect.states import Pending, Scheduled
from prefect.tasks import Task
from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry
from prefect.types._datetime import now
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities._engine import dynamic_key_for_task_run
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. flow_runs.py missing future annotations 📘 Rule violation ✓ Correctness

src/prefect/deployments/flow_runs.py contains type annotations but does not include `from
__future__ import annotations` as the first import. This can break forward references and violates
the required typing import standard.
Agent Prompt
## Issue description
`src/prefect/deployments/flow_runs.py` uses type annotations but does not start with `from __future__ import annotations`, which is required for consistent forward-reference behavior.

## Issue Context
This file contains many annotations (e.g., `Optional["PrefectClient"]`, return type `"FlowRun"`). The compliance rule requires the future import as the first import in annotated `src/` files.

## Fix Focus Areas
- src/prefect/deployments/flow_runs.py[1-3]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines +240 to +241
# Alias for backwards compatibility
run_deployment = arun_deployment
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

2. run_deployment lacks sync wrapper 📘 Rule violation ⛯ Reliability

run_deployment is now just an alias to the async arun_deployment, so there is no true
sync-compatible wrapper. This violates the requirement that public async user-facing APIs provide
sync compatibility.
Agent Prompt
## Issue description
`run_deployment` is currently just an alias to `arun_deployment` (`run_deployment = arun_deployment`), so it does not provide sync compatibility for user-facing callers.

## Issue Context
The compliance requirement expects public async APIs to offer sync wrappers (either via `@sync_compatible` or an explicit sync function). The codebase already uses `@async_dispatch(async_impl)` for this purpose.

## Fix Focus Areas
- src/prefect/deployments/flow_runs.py[48-61]
- src/prefect/deployments/flow_runs.py[240-241]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

from prefect.client.schemas.responses import DeploymentResponse
from prefect.context import FlowRunContext
from prefect.deployments import run_deployment
from prefect.deployments import arun_deployment, run_deployment
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

3. Tests not in deployments/ 📘 Rule violation ⛯ Reliability

The tests for src/prefect/deployments/flow_runs.py are located under tests/deployment/ instead
of mirroring the deployments/ module path. This violates the required test directory mirroring
convention.
Agent Prompt
## Issue description
Tests for the `deployments/flow_runs.py` source module live under `tests/deployment/` instead of `tests/deployments/`, violating the required mirrored structure.

## Issue Context
The compliance rule expects a mirrored path for discoverability and consistency.

## Fix Focus Areas
- tests/deployment/test_flow_runs.py[1-40]
- src/prefect/deployments/flow_runs.py[1-5]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines 211 to 213
flow_run = await client.create_flow_run_from_deployment(
deployment.id,
deployment_id,
parameters=parameters,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

4. None deployment_id passed 🐞 Bug ✓ Correctness

arun_deployment calls client.create_flow_run_from_deployment(deployment_id, ...) even when
name is a non-UUID deployment name, leaving deployment_id=None and causing an invalid required
UUID argument at runtime.
Agent Prompt
### Issue description
`arun_deployment` passes `deployment_id` into `create_flow_run_from_deployment`, but `deployment_id` remains `None` when `name` is a deployment name like `"flow/deployment"`. This breaks the primary documented use-case.

### Issue Context
You already load `deployment` via `read_deployment_by_name(name)` when `deployment_id` is not provided; the `deployment.id` should be used for creating the flow run.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[123-139]
- src/prefect/deployments/flow_runs.py[211-222]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines 146 to 150

# This was called from a flow. Link the flow run as a subflow.
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
k: collect_task_run_inputs(v) for k, v in parameters.items()
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

5. Missing await task inputs 🐞 Bug ✓ Correctness

When linking a subflow, task_inputs uses collect_task_run_inputs(v) without await even though
collect_task_run_inputs is async, resulting in coroutine objects being passed as task inputs.
Agent Prompt
### Issue description
The subflow-linking path populates `task_inputs` with coroutine objects because `collect_task_run_inputs` is async but is called without `await`.

### Issue Context
This code runs inside `arun_deployment` (async), so `await` is the correct way to obtain the expected `set[...]` values.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[143-196]
- src/prefect/utilities/engine.py[66-115]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines +133 to +134
client, _ = get_or_create_client(client)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remediation recommended

6. Unclosed created client 🐞 Bug ⛯ Reliability

arun_deployment uses get_or_create_client but ignores the returned "inferred" flag and never
context-manages/close the client when it had to create a new one.
Agent Prompt
### Issue description
When `client` is not provided and no context client exists, `get_or_create_client` creates a new client that should be closed. `arun_deployment` currently discards the inferred/created flag and never closes the created client.

### Issue Context
The previous `@inject_client` decorator handled this via `async with` when `inferred` is False.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[48-139]
- src/prefect/client/utilities.py[31-61]
- src/prefect/client/utilities.py[74-100]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants