Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/prefect/deployments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@


if TYPE_CHECKING:
from .flow_runs import run_deployment
from .flow_runs import arun_deployment, run_deployment
from .base import initialize_project
from .runner import deploy

_public_api: dict[str, tuple[str, str]] = {
"initialize_project": (__spec__.parent, ".base"),
"arun_deployment": (__spec__.parent, ".flow_runs"),
"run_deployment": (__spec__.parent, ".flow_runs"),
"deploy": (__spec__.parent, ".runner"),
}

# Declare API for type-checkers
__all__ = ["initialize_project", "deploy", "run_deployment"]
__all__ = ["initialize_project", "deploy", "arun_deployment", "run_deployment"]


def __getattr__(attr_name: str) -> object:
Expand Down
35 changes: 26 additions & 9 deletions src/prefect/deployments/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import prefect
from prefect._result_records import ResultRecordMetadata
from prefect.client.schemas import FlowRun, TaskRunResult
from prefect.client.utilities import inject_client
from prefect.client.utilities import get_or_create_client
from prefect.context import FlowRunContext, TaskRunContext
from prefect.logging import get_logger
from prefect.states import Pending, Scheduled
from prefect.tasks import Task
from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry
from prefect.types._datetime import now
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities._engine import dynamic_key_for_task_run
Comment on lines +11 to +18

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. flow_runs.py missing future annotations 📘 Rule violation ✓ Correctness

src/prefect/deployments/flow_runs.py contains type annotations but does not include `from
__future__ import annotations` as the first import. This can break forward references and violates
the required typing import standard.
Agent Prompt
## Issue description
`src/prefect/deployments/flow_runs.py` uses type annotations but does not start with `from __future__ import annotations`, which is required for consistent forward-reference behavior.

## Issue Context
This file contains many annotations (e.g., `Optional["PrefectClient"]`, return type `"FlowRun"`). The compliance rule requires the future import as the first import in annotated `src/` files.

## Fix Focus Areas
- src/prefect/deployments/flow_runs.py[1-3]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

from prefect.utilities.slugify import slugify


Expand Down Expand Up @@ -45,9 +45,7 @@ def _is_instrumentation_enabled() -> bool:
logger: "logging.Logger" = get_logger(__name__)


@sync_compatible
@inject_client
async def run_deployment(
async def arun_deployment(
name: Union[str, UUID],
client: Optional["PrefectClient"] = None,
parameters: Optional[dict[str, Any]] = None,
Expand All @@ -62,7 +60,7 @@ async def run_deployment(
job_variables: Optional[dict[str, Any]] = None,
) -> "FlowRun":
"""
Create a flow run for a deployment and return it after completion or a timeout.
Asynchronously create a flow run for a deployment and return it after completion or a timeout.

By default, this function blocks until the flow run finishes executing.
Specify a timeout (in seconds) to wait for the flow run to execute before
Expand All @@ -79,6 +77,7 @@ async def run_deployment(
Args:
name: The deployment id or deployment name in the form:
`"flow name/deployment name"`
client: An optional PrefectClient to use for API requests.
parameters: Parameter overrides for this flow run. Merged with the deployment
defaults.
scheduled_time: The time to schedule the flow run for, defaults to scheduling
Expand All @@ -100,6 +99,18 @@ async def run_deployment(
job_variables: A dictionary of dot delimited infrastructure overrides that
will be applied at runtime; for example `env.CONFIG_KEY=config_value` or
`namespace='prefect'`

Example:
```python
import asyncio
from prefect.deployments import arun_deployment

async def main():
flow_run = await arun_deployment("my-flow/my-deployment")
print(flow_run.state)

asyncio.run(main())
```
"""
if timeout is not None and timeout < 0:
raise ValueError("`timeout` cannot be negative")
Expand All @@ -119,6 +130,8 @@ async def run_deployment(
except ValueError:
pass

client, _ = get_or_create_client(client)

Comment on lines +133 to +134

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remediation recommended

6. Unclosed created client 🐞 Bug ⛯ Reliability

arun_deployment uses get_or_create_client but ignores the returned "inferred" flag and never
context-manages/close the client when it had to create a new one.
Agent Prompt
### Issue description
When `client` is not provided and no context client exists, `get_or_create_client` creates a new client that should be closed. `arun_deployment` currently discards the inferred/created flag and never closes the created client.

### Issue Context
The previous `@inject_client` decorator handled this via `async with` when `inferred` is False.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[48-139]
- src/prefect/client/utilities.py[31-61]
- src/prefect/client/utilities.py[74-100]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

if deployment_id:
deployment = await client.read_deployment(deployment_id=deployment_id)
else:
Expand All @@ -133,7 +146,7 @@ async def run_deployment(

# This was called from a flow. Link the flow run as a subflow.
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
k: collect_task_run_inputs(v) for k, v in parameters.items()
}
Comment on lines 146 to 150

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

5. Missing await task inputs 🐞 Bug ✓ Correctness

When linking a subflow, task_inputs uses collect_task_run_inputs(v) without await even though
collect_task_run_inputs is async, resulting in coroutine objects being passed as task inputs.
Agent Prompt
### Issue description
The subflow-linking path populates `task_inputs` with coroutine objects because `collect_task_run_inputs` is async but is called without `await`.

### Issue Context
This code runs inside `arun_deployment` (async), so `await` is the correct way to obtain the expected `set[...]` values.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[143-196]
- src/prefect/utilities/engine.py[66-115]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


# Track parent task if this is being called from within a task
Expand Down Expand Up @@ -196,7 +209,7 @@ async def run_deployment(
trace_labels = {LABELS_TRACEPARENT_KEY: traceparent} if traceparent else {}

flow_run = await client.create_flow_run_from_deployment(
deployment.id,
deployment_id,
parameters=parameters,
Comment on lines 211 to 213

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

4. None deployment_id passed 🐞 Bug ✓ Correctness

arun_deployment calls client.create_flow_run_from_deployment(deployment_id, ...) even when
name is a non-UUID deployment name, leaving deployment_id=None and causing an invalid required
UUID argument at runtime.
Agent Prompt
### Issue description
`arun_deployment` passes `deployment_id` into `create_flow_run_from_deployment`, but `deployment_id` remains `None` when `name` is a deployment name like `"flow/deployment"`. This breaks the primary documented use-case.

### Issue Context
You already load `deployment` via `read_deployment_by_name(name)` when `deployment_id` is not provided; the `deployment.id` should be used for creating the flow run.

### Fix Focus Areas
- src/prefect/deployments/flow_runs.py[123-139]
- src/prefect/deployments/flow_runs.py[211-222]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

state=Scheduled(scheduled_time=scheduled_time),
name=flow_run_name,
Expand All @@ -215,10 +228,14 @@ async def run_deployment(

with anyio.move_on_after(timeout):
while True:
await anyio.sleep(poll_interval)
flow_run = await client.read_flow_run(flow_run_id)
flow_state = flow_run.state
if flow_state and flow_state.is_final():
return flow_run
await anyio.sleep(poll_interval)

return flow_run


# Alias for backwards compatibility
run_deployment = arun_deployment
Comment on lines +240 to +241

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

2. run_deployment lacks sync wrapper 📘 Rule violation ⛯ Reliability

run_deployment is now just an alias to the async arun_deployment, so there is no true
sync-compatible wrapper. This violates the requirement that public async user-facing APIs provide
sync compatibility.
Agent Prompt
## Issue description
`run_deployment` is currently just an alias to `arun_deployment` (`run_deployment = arun_deployment`), so it does not provide sync compatibility for user-facing callers.

## Issue Context
The compliance requirement expects public async APIs to offer sync wrappers (either via `@sync_compatible` or an explicit sync function). The codebase already uses `@async_dispatch(async_impl)` for this purpose.

## Fix Focus Areas
- src/prefect/deployments/flow_runs.py[48-61]
- src/prefect/deployments/flow_runs.py[240-241]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Loading
Loading