Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/prefect/deployments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@


if TYPE_CHECKING:
from .flow_runs import run_deployment
from .flow_runs import arun_deployment, run_deployment
from .base import initialize_project
from .runner import deploy

_public_api: dict[str, tuple[str, str]] = {
"initialize_project": (__spec__.parent, ".base"),
"arun_deployment": (__spec__.parent, ".flow_runs"),
"run_deployment": (__spec__.parent, ".flow_runs"),
"deploy": (__spec__.parent, ".runner"),
}

# Declare API for type-checkers
__all__ = ["initialize_project", "deploy", "run_deployment"]
__all__ = ["initialize_project", "deploy", "arun_deployment", "run_deployment"]


def __getattr__(attr_name: str) -> object:
Expand Down
35 changes: 26 additions & 9 deletions src/prefect/deployments/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import prefect
from prefect._result_records import ResultRecordMetadata
from prefect.client.schemas import FlowRun, TaskRunResult
from prefect.client.utilities import inject_client
from prefect.client.utilities import get_or_create_client
from prefect.context import FlowRunContext, TaskRunContext
from prefect.logging import get_logger
from prefect.states import Pending, Scheduled
from prefect.tasks import Task
from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry
from prefect.types._datetime import now
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities._engine import dynamic_key_for_task_run
from prefect.utilities.slugify import slugify


Expand Down Expand Up @@ -45,9 +45,7 @@ def _is_instrumentation_enabled() -> bool:
logger: "logging.Logger" = get_logger(__name__)


@sync_compatible
@inject_client
async def run_deployment(
async def arun_deployment(
name: Union[str, UUID],
client: Optional["PrefectClient"] = None,
parameters: Optional[dict[str, Any]] = None,
Expand All @@ -62,7 +60,7 @@ async def run_deployment(
job_variables: Optional[dict[str, Any]] = None,
) -> "FlowRun":
"""
Create a flow run for a deployment and return it after completion or a timeout.
Asynchronously create a flow run for a deployment and return it after completion or a timeout.

By default, this function blocks until the flow run finishes executing.
Specify a timeout (in seconds) to wait for the flow run to execute before
Expand All @@ -79,6 +77,7 @@ async def run_deployment(
Args:
name: The deployment id or deployment name in the form:
`"flow name/deployment name"`
client: An optional PrefectClient to use for API requests.
parameters: Parameter overrides for this flow run. Merged with the deployment
defaults.
scheduled_time: The time to schedule the flow run for, defaults to scheduling
Expand All @@ -100,6 +99,18 @@ async def run_deployment(
job_variables: A dictionary of dot delimited infrastructure overrides that
will be applied at runtime; for example `env.CONFIG_KEY=config_value` or
`namespace='prefect'`

Example:
```python
import asyncio
from prefect.deployments import arun_deployment

async def main():
flow_run = await arun_deployment("my-flow/my-deployment")
print(flow_run.state)

asyncio.run(main())
```
"""
if timeout is not None and timeout < 0:
raise ValueError("`timeout` cannot be negative")
Expand All @@ -119,6 +130,8 @@ async def run_deployment(
except ValueError:
pass

client, _ = get_or_create_client(client)

if deployment_id:
deployment = await client.read_deployment(deployment_id=deployment_id)
else:
Expand All @@ -133,7 +146,7 @@ async def run_deployment(

# This was called from a flow. Link the flow run as a subflow.
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
k: collect_task_run_inputs(v) for k, v in parameters.items()
}
Comment on lines 148 to 150
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if collect_task_run_inputs is async or sync
rg -n "def collect_task_run_inputs" --type py

Repository: agentic-review-benchmarks/prefect-combined-coderabbit

Length of output: 236


🏁 Script executed:

#!/bin/bash
# Check imports and context in flow_runs.py around lines 148-150
head -150 src/prefect/deployments/flow_runs.py | tail -50

Repository: agentic-review-benchmarks/prefect-combined-coderabbit

Length of output: 1575


🏁 Script executed:

#!/bin/bash
# Check the function definition and full context around lines 140-170
sed -n '1,20p' src/prefect/deployments/flow_runs.py | head -20
sed -n '100,170p' src/prefect/deployments/flow_runs.py

Repository: agentic-review-benchmarks/prefect-combined-coderabbit

Length of output: 3318


🏁 Script executed:

#!/bin/bash
# Find the function definition
sed -n '1,150p' src/prefect/deployments/flow_runs.py | grep -n "^async def\|^def" | tail -1
# Also get more context
sed -n '40,110p' src/prefect/deployments/flow_runs.py

Repository: agentic-review-benchmarks/prefect-combined-coderabbit

Length of output: 3271


Fix unawaited async function call in dictionary comprehension.

The collect_task_run_inputs(v) function is async but is called without await in the dictionary comprehension at lines 148-150. This will create a dictionary containing coroutine objects instead of actual task inputs. Either await each call (using an async approach to build the dictionary) or switch to the synchronous version collect_task_run_inputs_sync(v).

🤖 Prompt for AI Agents
In `@src/prefect/deployments/flow_runs.py` around lines 148 - 150, The
comprehension builds task_inputs with coroutine objects because
collect_task_run_inputs is async; replace it with either awaited results (e.g.,
map parameters -> await collect_task_run_inputs for each key/value using an
async loop or asyncio.gather to produce actual values) or call the synchronous
helper collect_task_run_inputs_sync(v) instead; update the code that assigns
task_inputs (and any surrounding function to be async if you choose awaiting) so
task_inputs becomes a dict of real inputs rather than coroutines, referencing
the parameters variable and the collect_task_run_inputs /
collect_task_run_inputs_sync helpers.


# Track parent task if this is being called from within a task
Expand Down Expand Up @@ -196,7 +209,7 @@ async def run_deployment(
trace_labels = {LABELS_TRACEPARENT_KEY: traceparent} if traceparent else {}

flow_run = await client.create_flow_run_from_deployment(
deployment.id,
deployment_id,
Comment on lines 211 to +212
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bug: deployment_id may be None when passing to create_flow_run_from_deployment.

When name is a string (not a UUID), deployment_id remains None (set on line 123). The code correctly fetches the deployment by name on line 138, but then passes the still-None deployment_id to create_flow_run_from_deployment instead of deployment.id.

This will cause the API call to fail or behave unexpectedly.

🐛 Proposed fix
     flow_run = await client.create_flow_run_from_deployment(
-        deployment_id,
+        deployment.id,
         parameters=parameters,
         state=Scheduled(scheduled_time=scheduled_time),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
flow_run = await client.create_flow_run_from_deployment(
deployment.id,
deployment_id,
flow_run = await client.create_flow_run_from_deployment(
deployment.id,
🤖 Prompt for AI Agents
In `@src/prefect/deployments/flow_runs.py` around lines 211 - 212, The code passes
deployment_id (which can be None) into create_flow_run_from_deployment after
resolving a deployment by name; update the call to use the fetched deployment's
id (e.g., set deployment_id = deployment.id or directly pass deployment.id) once
you successfully retrieve the deployment object (from the lookup on line with
deployment = ...), and ensure you handle the case where deployment is None
before calling create_flow_run_from_deployment (raise or return an error).

parameters=parameters,
state=Scheduled(scheduled_time=scheduled_time),
name=flow_run_name,
Expand All @@ -215,10 +228,14 @@ async def run_deployment(

with anyio.move_on_after(timeout):
while True:
await anyio.sleep(poll_interval)
flow_run = await client.read_flow_run(flow_run_id)
flow_state = flow_run.state
if flow_state and flow_state.is_final():
return flow_run
await anyio.sleep(poll_interval)

return flow_run


# Alias for backwards compatibility
run_deployment = arun_deployment
Loading