Skip to content

Add arun_deployment and replace @sync_compatible with @async_dispatch#139

Open
tomerqodo wants to merge 3 commits intoqodo_combined_20260121_demo_10_base_add_arun_deployment_and_replace_sync_compatible_with_async_dispatch_pr545from
qodo_combined_20260121_demo_10_head_add_arun_deployment_and_replace_sync_compatible_with_async_dispatch_pr545
Open

Add arun_deployment and replace @sync_compatible with @async_dispatch#139
tomerqodo wants to merge 3 commits intoqodo_combined_20260121_demo_10_base_add_arun_deployment_and_replace_sync_compatible_with_async_dispatch_pr545from
qodo_combined_20260121_demo_10_head_add_arun_deployment_and_replace_sync_compatible_with_async_dispatch_pr545

Conversation

@tomerqodo
Copy link
Copy Markdown

Benchmark PR from qodo-benchmark#545

desertaxle and others added 3 commits January 21, 2026 13:58
This change follows the intent of issue PrefectHQ#15008 to replace implicit
sync/async conversion with explicit, type-safe alternatives.

Changes:
- Add `arun_deployment` as an explicit async function for running deployments
- Replace `@sync_compatible` with `@async_dispatch` on `run_deployment`
- `run_deployment` now dispatches to `arun_deployment` in async context
- Sync context uses `SyncPrefectClient` directly (no event loop magic)
- Export `arun_deployment` from `prefect.deployments`
- Add comprehensive tests for both sync and async behavior

The `run_deployment.aio` attribute is preserved for backward compatibility.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@qodo-code-review
Copy link
Copy Markdown

Code Review by Qodo

🐞 Bugs (4) 📘 Rule violations (5) 📎 Requirement gaps (0)

Grey Divider


Action required

1. __init__.py missing future import 📘 Rule violation ✓ Correctness
Description
src/prefect/deployments/__init__.py contains type annotations but does not include `from
  __future__ import annotations` as the first import.
• This violates the requirement for forward-reference resolution and consistent type-checking
  behavior across annotated src/ files.
Code

src/prefect/deployments/init.py[10]

_public_api: dict[str, tuple[str, str]] = {
Evidence
The checklist requires from __future__ import annotations as the first import in any src/ Python
file that contains type annotations. The file shows annotated declarations (e.g., `_public_api:
dict[...]`) but begins with a non-future import.

AGENTS.md
src/prefect/deployments/init.py[1-2]
src/prefect/deployments/init.py[10-10]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`src/prefect/deployments/__init__.py` uses type annotations but is missing `from __future__ import annotations` as the first import.

## Issue Context
Compliance requires this import for all annotated Python files under `src/`.

## Fix Focus Areas
- src/prefect/deployments/__init__.py[1-12]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. flow_runs.py missing future import 📘 Rule violation ✓ Correctness
Description
src/prefect/deployments/flow_runs.py contains extensive type annotations but does not include
  from __future__ import annotations as the first import.
• This violates the typing/import standard required for annotated src/ Python modules.
Code

src/prefect/deployments/flow_runs.py[R48-51]

+async def arun_deployment(
    name: Union[str, UUID],
    client: Optional["PrefectClient"] = None,
    parameters: Optional[dict[str, Any]] = None,
Evidence
The checklist mandates from __future__ import annotations as the first import for annotated src/
Python files. The module begins with standard imports and defines an annotated public async API
(arun_deployment(...) -> "FlowRun") without the required future import.

AGENTS.md
src/prefect/deployments/flow_runs.py[1-3]
src/prefect/deployments/flow_runs.py[48-61]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`src/prefect/deployments/flow_runs.py` uses type annotations but is missing `from __future__ import annotations` as the first import.

## Issue Context
Compliance requires this import for all annotated Python files under `src/`.

## Fix Focus Areas
- src/prefect/deployments/flow_runs.py[1-12]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


3. run_deployment lacks sync wrapper 📘 Rule violation ✓ Correctness
Description
run_deployment is now just an alias to the async arun_deployment, which removes a dedicated
  sync-compatibility wrapper for a public async API.
• This forces synchronous users to manage event loops manually (or breaks existing sync usage),
  contrary to the requirement that user-facing async APIs provide sync compatibility.
Code

src/prefect/deployments/flow_runs.py[R240-241]

+# Alias for backwards compatibility
+run_deployment = arun_deployment
Evidence
The compliance rule requires that public async APIs provide sync wrappers (e.g., via
@sync_compatible or an explicit sync function). The code defines arun_deployment as async and
then sets run_deployment = arun_deployment, providing no separate sync-compatible wrapper.

AGENTS.md
src/prefect/deployments/flow_runs.py[48-49]
src/prefect/deployments/flow_runs.py[240-241]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`run_deployment` is currently an alias to `arun_deployment` (async), so there is no dedicated sync-compatibility wrapper for this public API.

## Issue Context
The compliance requirement mandates that public async APIs offer sync-friendly wrappers so users do not need to manage event loops.

## Fix Focus Areas
- src/prefect/deployments/flow_runs.py[48-61]
- src/prefect/deployments/flow_runs.py[240-241]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


View more (5)
4. FlowRun import unguarded 📘 Rule violation ⛯ Reliability
Description
FlowRun is imported at module import-time but appears to be used only for typing (the return
  annotation is quoted as "FlowRun").
• Type-only imports must be moved under if TYPE_CHECKING: (and kept quoted in annotations) to
  avoid runtime overhead and circular-import risk.
Code

src/prefect/deployments/flow_runs.py[10]

from prefect.client.schemas import FlowRun, TaskRunResult
-from prefect.client.utilities import inject_client
Evidence
The rule requires type-only imports to be guarded by TYPE_CHECKING and referenced via quoted
annotations. Here, FlowRun is imported at module scope but the return type uses a quoted
annotation, indicating it can/should be a type-only dependency and thus should not be imported at
runtime.

AGENTS.md
src/prefect/deployments/flow_runs.py[10-10]
src/prefect/deployments/flow_runs.py[61-61]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`FlowRun` is imported at module scope even though it is only used for typing (return type is quoted). This violates the type-only import guarding rule.

## Issue Context
Type-only imports should be placed under `if TYPE_CHECKING:` to reduce runtime overhead and avoid circular imports.

## Fix Focus Areas
- src/prefect/deployments/flow_runs.py[10-13]
- src/prefect/deployments/flow_runs.py[31-34]
- src/prefect/deployments/flow_runs.py[48-61]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


5. tests/deployment structure mismatch 📘 Rule violation ⛯ Reliability
Description
• The tests for src/prefect/deployments/... are placed under tests/deployment/..., which does
  not mirror the src/ directory structure.
• This violates the required src/prefect/<module>/...tests/<module>/... mapping and makes
  tests harder to discover and maintain.
Code

tests/deployment/test_flow_runs.py[18]

+from prefect.deployments import arun_deployment, run_deployment
Evidence
The checklist requires test files to mirror the src/ directory hierarchy. The added/updated test
module imports prefect.deployments APIs while living in tests/deployment/ (singular), not
tests/deployments/ (mirrored).

AGENTS.md
tests/deployment/test_flow_runs.py[13-19]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
Deployment-related tests are located under `tests/deployment/`, which does not mirror the `src/prefect/deployments/` path.

## Issue Context
Compliance requires a mirrored test directory structure for discoverability.

## Fix Focus Areas
- tests/deployment/test_flow_runs.py[1-30]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


6. None deployment_id passed 🐞 Bug ✓ Correctness
Description
• When name is not a UUID, deployment_id remains None but is still passed into
  create_flow_run_from_deployment, which requires a UUID.
• This breaks the primary documented usage of calling deployments by name (e.g. "flow/deployment").
• Impact is an immediate runtime error (or a malformed request) before any flow run is created.
Code

src/prefect/deployments/flow_runs.py[R211-213]

    flow_run = await client.create_flow_run_from_deployment(
-        deployment.id,
+        deployment_id,
        parameters=parameters,
Evidence
arun_deployment only populates deployment_id if name is (or parses as) a UUID, but it does not
set deployment_id = deployment.id after resolving a deployment by name. The subsequent call to
create_flow_run_from_deployment requires a UUID deployment_id, so passing None will fail.

src/prefect/deployments/flow_runs.py[123-150]
src/prefect/deployments/flow_runs.py[211-222]
src/prefect/client/orchestration/_deployments/client.py[1297-1316]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`arun_deployment` can resolve a deployment by name but still passes `deployment_id` (which remains `None`) into `create_flow_run_from_deployment`, causing runtime failure.

### Issue Context
The client API requires a UUID for `deployment_id`. When the user calls `arun_deployment(&quot;flow/deployment&quot;)`, `deployment_id` does not get set.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[123-139]
- src/prefect/deployments/flow_runs.py[211-222]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


7. Unawaited task input collection 🐞 Bug ✓ Correctness
Description
collect_task_run_inputs is async, but arun_deployment now calls it without await.
• This stores coroutine objects in task_inputs and will break placeholder task-run creation and/or
  input serialization.
• Subflow linkage will be incorrect or fail whenever arun_deployment is called from inside a
  flow/task context.
Code

src/prefect/deployments/flow_runs.py[R148-150]

        task_inputs = {
-            k: await collect_task_run_inputs(v) for k, v in parameters.items()
+            k: collect_task_run_inputs(v) for k, v in parameters.items()
        }
Evidence
collect_task_run_inputs is defined as async def and its own documented example requires awaiting
it. The current code calls it directly inside a dict comprehension, producing coroutine objects
instead of a set[...] of inputs.

src/prefect/deployments/flow_runs.py[142-150]
src/prefect/utilities/engine.py[66-80]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`collect_task_run_inputs` is async but is called without awaiting, resulting in coroutine objects being placed into `task_inputs`.

### Issue Context
This path runs when `arun_deployment` is called from within a flow/task and attempts to create a placeholder parent task run.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[142-150]
- src/prefect/utilities/engine.py[66-80]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


8. Client not context-managed 🐞 Bug ⛯ Reliability
Description
get_or_create_client may create a new client via get_client(), and get_client() explicitly
  requires context management.
• arun_deployment does not use async with client and no longer uses @inject_client, so newly
  created clients may leak connections.
• This can surface as unclosed client warnings, resource leaks, or degraded stability in long-lived
  processes.
Code

src/prefect/deployments/flow_runs.py[R133-134]

+    client, _ = get_or_create_client(client)
+
Evidence
When no client is provided or found in context, get_or_create_client returns get_client() (which
must be context-managed). inject_client exists specifically to ensure newly created clients are
used in an async with block. arun_deployment now bypasses inject_client and uses the client
without context management.

src/prefect/client/utilities.py[31-60]
src/prefect/client/orchestration/init.py[189-208]
src/prefect/client/utilities.py[74-100]
src/prefect/deployments/flow_runs.py[133-138]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`arun_deployment` obtains a possibly-new PrefectClient but does not context-manage it, even though `get_client()` requires context management.

### Issue Context
Previously, `inject_client` handled this by wrapping newly created clients in an `async with` block.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[133-138]
- src/prefect/client/utilities.py[31-100]
- src/prefect/client/orchestration/__init__.py[189-208]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

9. Polling adds extra delay 🐞 Bug ➹ Performance
Description
• The polling loop now sleeps before the first read_flow_run call, introducing an avoidable
  initial delay.
• If timeout is smaller than poll_interval, the move-on-after scope may cancel during the
  initial sleep, returning without ever reading updated state.
• This can make timeout behavior less intuitive and increase tail latency.
Code

src/prefect/deployments/flow_runs.py[R229-233]

    with anyio.move_on_after(timeout):
        while True:
+            await anyio.sleep(poll_interval)
            flow_run = await client.read_flow_run(flow_run_id)
            flow_state = flow_run.state
Evidence
The code sleeps before the first read inside a move_on_after(timeout) scope. Therefore, if the
scope times out during the initial sleep, the loop body never performs a read_flow_run and returns
the initial flow_run object.

src/prefect/deployments/flow_runs.py[229-237]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
Polling sleeps before the first status read, adding latency and potentially skipping all reads when `timeout &lt; poll_interval`.

### Issue Context
The polling loop is wrapped in `anyio.move_on_after(timeout)`; cancellation during the initial sleep results in no API read.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[229-237]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



ⓘ The new review experience is currently in Beta. Learn more

Qodo Logo

from .base import initialize_project
from .runner import deploy

_public_api: dict[str, tuple[str, str]] = {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. init.py missing future import 📘 Rule violation ✓ Correctness

src/prefect/deployments/__init__.py contains type annotations but does not include `from
  __future__ import annotations` as the first import.
• This violates the requirement for forward-reference resolution and consistent type-checking
  behavior across annotated src/ files.
Agent prompt
## Issue description
`src/prefect/deployments/__init__.py` uses type annotations but is missing `from __future__ import annotations` as the first import.

## Issue Context
Compliance requires this import for all annotated Python files under `src/`.

## Fix Focus Areas
- src/prefect/deployments/__init__.py[1-12]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines +48 to 51
async def arun_deployment(
name: Union[str, UUID],
client: Optional["PrefectClient"] = None,
parameters: Optional[dict[str, Any]] = None,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

2. flow_runs.py missing future import 📘 Rule violation ✓ Correctness

src/prefect/deployments/flow_runs.py contains extensive type annotations but does not include
  from __future__ import annotations as the first import.
• This violates the typing/import standard required for annotated src/ Python modules.
Agent prompt
## Issue description
`src/prefect/deployments/flow_runs.py` uses type annotations but is missing `from __future__ import annotations` as the first import.

## Issue Context
Compliance requires this import for all annotated Python files under `src/`.

## Fix Focus Areas
- src/prefect/deployments/flow_runs.py[1-12]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines +240 to +241
# Alias for backwards compatibility
run_deployment = arun_deployment
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

3. run_deployment lacks sync wrapper 📘 Rule violation ✓ Correctness

run_deployment is now just an alias to the async arun_deployment, which removes a dedicated
  sync-compatibility wrapper for a public async API.
• This forces synchronous users to manage event loops manually (or breaks existing sync usage),
  contrary to the requirement that user-facing async APIs provide sync compatibility.
Agent prompt
## Issue description
`run_deployment` is currently an alias to `arun_deployment` (async), so there is no dedicated sync-compatibility wrapper for this public API.

## Issue Context
The compliance requirement mandates that public async APIs offer sync-friendly wrappers so users do not need to manage event loops.

## Fix Focus Areas
- src/prefect/deployments/flow_runs.py[48-61]
- src/prefect/deployments/flow_runs.py[240-241]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

@@ -8,14 +8,14 @@
import prefect
from prefect._result_records import ResultRecordMetadata
from prefect.client.schemas import FlowRun, TaskRunResult
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

4. flowrun import unguarded 📘 Rule violation ⛯ Reliability

FlowRun is imported at module import-time but appears to be used only for typing (the return
  annotation is quoted as "FlowRun").
• Type-only imports must be moved under if TYPE_CHECKING: (and kept quoted in annotations) to
  avoid runtime overhead and circular-import risk.
Agent prompt
## Issue description
`FlowRun` is imported at module scope even though it is only used for typing (return type is quoted). This violates the type-only import guarding rule.

## Issue Context
Type-only imports should be placed under `if TYPE_CHECKING:` to reduce runtime overhead and avoid circular imports.

## Fix Focus Areas
- src/prefect/deployments/flow_runs.py[10-13]
- src/prefect/deployments/flow_runs.py[31-34]
- src/prefect/deployments/flow_runs.py[48-61]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

from prefect.client.schemas.responses import DeploymentResponse
from prefect.context import FlowRunContext
from prefect.deployments import run_deployment
from prefect.deployments import arun_deployment, run_deployment
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

5. tests/deployment structure mismatch 📘 Rule violation ⛯ Reliability

• The tests for src/prefect/deployments/... are placed under tests/deployment/..., which does
  not mirror the src/ directory structure.
• This violates the required src/prefect/<module>/...tests/<module>/... mapping and makes
  tests harder to discover and maintain.
Agent prompt
## Issue description
Deployment-related tests are located under `tests/deployment/`, which does not mirror the `src/prefect/deployments/` path.

## Issue Context
Compliance requires a mirrored test directory structure for discoverability.

## Fix Focus Areas
- tests/deployment/test_flow_runs.py[1-30]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines 211 to 213
flow_run = await client.create_flow_run_from_deployment(
deployment.id,
deployment_id,
parameters=parameters,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

6. None deployment_id passed 🐞 Bug ✓ Correctness

• When name is not a UUID, deployment_id remains None but is still passed into
  create_flow_run_from_deployment, which requires a UUID.
• This breaks the primary documented usage of calling deployments by name (e.g. "flow/deployment").
• Impact is an immediate runtime error (or a malformed request) before any flow run is created.
Agent prompt
### Issue description
`arun_deployment` can resolve a deployment by name but still passes `deployment_id` (which remains `None`) into `create_flow_run_from_deployment`, causing runtime failure.

### Issue Context
The client API requires a UUID for `deployment_id`. When the user calls `arun_deployment("flow/deployment")`, `deployment_id` does not get set.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[123-139]
- src/prefect/deployments/flow_runs.py[211-222]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines 148 to 150
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
k: collect_task_run_inputs(v) for k, v in parameters.items()
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

7. Unawaited task input collection 🐞 Bug ✓ Correctness

collect_task_run_inputs is async, but arun_deployment now calls it without await.
• This stores coroutine objects in task_inputs and will break placeholder task-run creation and/or
  input serialization.
• Subflow linkage will be incorrect or fail whenever arun_deployment is called from inside a
  flow/task context.
Agent prompt
### Issue description
`collect_task_run_inputs` is async but is called without awaiting, resulting in coroutine objects being placed into `task_inputs`.

### Issue Context
This path runs when `arun_deployment` is called from within a flow/task and attempts to create a placeholder parent task run.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[142-150]
- src/prefect/utilities/engine.py[66-80]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines +133 to +134
client, _ = get_or_create_client(client)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

8. Client not context-managed 🐞 Bug ⛯ Reliability

get_or_create_client may create a new client via get_client(), and get_client() explicitly
  requires context management.
• arun_deployment does not use async with client and no longer uses @inject_client, so newly
  created clients may leak connections.
• This can surface as unclosed client warnings, resource leaks, or degraded stability in long-lived
  processes.
Agent prompt
### Issue description
`arun_deployment` obtains a possibly-new PrefectClient but does not context-manage it, even though `get_client()` requires context management.

### Issue Context
Previously, `inject_client` handled this by wrapping newly created clients in an `async with` block.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[133-138]
- src/prefect/client/utilities.py[31-100]
- src/prefect/client/orchestration/__init__.py[189-208]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines 229 to 233
with anyio.move_on_after(timeout):
while True:
await anyio.sleep(poll_interval)
flow_run = await client.read_flow_run(flow_run_id)
flow_state = flow_run.state
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remediation recommended

9. Polling adds extra delay 🐞 Bug ➹ Performance

• The polling loop now sleeps before the first read_flow_run call, introducing an avoidable
  initial delay.
• If timeout is smaller than poll_interval, the move-on-after scope may cancel during the
  initial sleep, returning without ever reading updated state.
• This can make timeout behavior less intuitive and increase tail latency.
Agent prompt
### Issue description
Polling sleeps before the first status read, adding latency and potentially skipping all reads when `timeout < poll_interval`.

### Issue Context
The polling loop is wrapped in `anyio.move_on_after(timeout)`; cancellation during the initial sleep results in no API read.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[229-237]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants