Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 50 additions & 45 deletions src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from uuid import UUID

import pydantic
import typer
import yaml
from pydantic import TypeAdapter
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
Expand Down Expand Up @@ -44,6 +44,7 @@
)
from prefect.cli.root import app, is_interactive
from prefect.client.base import ServerType
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import DeploymentScheduleCreate
from prefect.client.schemas.filters import WorkerFilter
from prefect.client.schemas.objects import ConcurrencyLimitConfig
Expand All @@ -52,7 +53,6 @@
IntervalSchedule,
RRuleSchedule,
)
from prefect.client.utilities import inject_client
from prefect.deployments import initialize_project
from prefect.deployments.base import (
_format_deployment_for_saving_to_prefect_file,
Expand Down Expand Up @@ -83,11 +83,17 @@
from prefect.client.orchestration import PrefectClient


DeploymentTriggerAdapter: TypeAdapter[DeploymentTriggerTypes] = TypeAdapter(
DeploymentTriggerTypes
)
SlaAdapter: TypeAdapter[SlaTypes] = TypeAdapter(SlaTypes)


@app.command()
async def init(
name: Optional[str] = None,
recipe: Optional[str] = None,
fields: Optional[List[str]] = typer.Option(
fields: Optional[list[str]] = typer.Option(
None,
"-f",
"--field",
Expand All @@ -100,7 +106,7 @@ async def init(
"""
Initialize a new deployment configuration recipe.
"""
inputs = {}
inputs: dict[str, Any] = {}
fields = fields or []
recipe_paths = prefect.__module_path__ / "deployments" / "recipes"

Expand All @@ -110,7 +116,7 @@ async def init(

if not recipe and is_interactive():
recipe_paths = prefect.__module_path__ / "deployments" / "recipes"
recipes = []
recipes: list[dict[str, Any]] = []

for r in recipe_paths.iterdir():
if r.is_dir() and (r / "prefect.yaml").exists():
Expand Down Expand Up @@ -471,14 +477,14 @@ async def deploy(
exit_with_error(str(exc))


@inject_client
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shall not pass

async def _run_single_deploy(
deploy_config: dict[str, Any],
actions: dict[str, Any],
options: Optional[dict[str, Any]] = None,
options: dict[str, Any] | None = None,
client: Optional["PrefectClient"] = None,
prefect_file: Path = Path("prefect.yaml"),
):
client = client or get_client()
deploy_config = deepcopy(deploy_config) if deploy_config else {}
actions = deepcopy(actions) if actions else {}
options = deepcopy(options) if options else {}
Expand Down Expand Up @@ -863,9 +869,9 @@ async def _run_single_deploy(


async def _run_multi_deploy(
deploy_configs: List[Dict],
actions: Dict,
names: Optional[List[str]] = None,
deploy_configs: list[dict[str, Any]],
actions: dict[str, Any],
names: Optional[list[str]] = None,
deploy_all: bool = False,
prefect_file: Path = Path("prefect.yaml"),
):
Expand Down Expand Up @@ -902,8 +908,8 @@ async def _run_multi_deploy(


def _construct_schedules(
deploy_config: Dict,
) -> List[DeploymentScheduleCreate]:
deploy_config: dict[str, Any],
) -> list[DeploymentScheduleCreate]:
"""
Constructs a schedule from a deployment configuration.

Expand All @@ -913,6 +919,7 @@ def _construct_schedules(
Returns:
A list of schedule objects
"""
schedules: list[DeploymentScheduleCreate] = [] # Initialize with empty list
schedule_configs = deploy_config.get("schedules", NotSet) or []

if schedule_configs is not NotSet:
Expand All @@ -923,14 +930,12 @@ def _construct_schedules(
elif schedule_configs is NotSet:
if is_interactive():
schedules = prompt_schedules(app.console)
else:
schedules = []

return schedules


def _schedule_config_to_deployment_schedule(
schedule_config: Dict,
schedule_config: dict[str, Any],
) -> DeploymentScheduleCreate:
anchor_date = schedule_config.get("anchor_date")
timezone = schedule_config.get("timezone")
Expand Down Expand Up @@ -969,7 +974,7 @@ def _schedule_config_to_deployment_schedule(
)


def _merge_with_default_deploy_config(deploy_config: Dict):
def _merge_with_default_deploy_config(deploy_config: dict[str, Any]) -> dict[str, Any]:
"""
Merge a base deploy config with the default deploy config.
If a key is missing in the base deploy config, it will be filled with the
Expand All @@ -983,7 +988,7 @@ def _merge_with_default_deploy_config(deploy_config: Dict):
The merged deploy config.
"""
deploy_config = deepcopy(deploy_config)
DEFAULT_DEPLOY_CONFIG = {
DEFAULT_DEPLOY_CONFIG: dict[str, Any] = {
"name": None,
"version": None,
"tags": [],
Expand Down Expand Up @@ -1014,9 +1019,9 @@ def _merge_with_default_deploy_config(deploy_config: Dict):

async def _generate_git_clone_pull_step(
console: Console,
deploy_config: Dict,
deploy_config: dict[str, Any],
remote_url: str,
):
) -> list[dict[str, Any]]:
branch = get_git_branch() or "main"

if not remote_url:
Expand Down Expand Up @@ -1104,9 +1109,9 @@ async def _generate_git_clone_pull_step(


async def _generate_pull_step_for_build_docker_image(
console: Console, deploy_config: Dict, auto: bool = True
):
pull_step = {}
console: Console, deploy_config: dict[str, Any], auto: bool = True
) -> list[dict[str, Any]]:
pull_step: dict[str, Any] = {}
dir_name = os.path.basename(os.getcwd())
if auto:
pull_step["directory"] = f"/opt/prefect/{dir_name}"
Expand Down Expand Up @@ -1138,9 +1143,9 @@ async def _check_for_build_docker_image_step(


async def _generate_actions_for_remote_flow_storage(
console: Console, deploy_config: dict, actions: List[Dict]
) -> Dict[str, List[Dict[str, Any]]]:
storage_provider_to_collection = {
console: Console, deploy_config: dict[str, Any], actions: list[dict[str, Any]]
) -> dict[str, list[dict[str, Any]]]:
storage_provider_to_collection: dict[str, str] = {
"s3": "prefect_aws",
"gcs": "prefect_gcp",
"azure_blob_storage": "prefect_azure",
Expand Down Expand Up @@ -1195,9 +1200,9 @@ async def _generate_actions_for_remote_flow_storage(

async def _generate_default_pull_action(
console: Console,
deploy_config: Dict,
actions: List[Dict],
):
deploy_config: dict[str, Any],
actions: list[dict[str, Any]],
) -> list[dict[str, Any]]:
build_docker_image_step = await _check_for_build_docker_image_step(
deploy_config.get("build") or actions["build"]
)
Expand All @@ -1224,7 +1229,7 @@ async def _generate_default_pull_action(
entrypoint_path, _ = deploy_config["entrypoint"].split(":")
console.print(
"Your Prefect workers will attempt to load your flow from:"
f" [green]{(Path.cwd()/Path(entrypoint_path)).absolute().resolve()}[/]. To"
f" [green]{(Path.cwd() / Path(entrypoint_path)).absolute().resolve()}[/]. To"
" see more options for managing your flow's code, run:\n\n\t[blue]$"
" prefect init[/]\n"
)
Expand Down Expand Up @@ -1318,10 +1323,12 @@ def _log_missing_deployment_names(missing_names, matched_deploy_configs, names):
)


def _filter_matching_deploy_config(name, deploy_configs):
def _filter_matching_deploy_config(
name: str, deploy_configs: list[dict[str, Any]]
) -> list[dict[str, Any]]:
# Logic to find the deploy_config matching the given name
# This function handles both "flow-name/deployment-name" and just "deployment-name"
matching_deployments = []
matching_deployments: list[dict[str, Any]] = []
if "/" in name:
flow_name, deployment_name = name.split("/")
flow_name = flow_name.replace("-", "_")
Expand Down Expand Up @@ -1637,7 +1644,7 @@ def _check_for_matching_deployment_name_and_entrypoint_in_prefect_file(


def _check_if_identical_deployment_in_prefect_file(
untemplated_deploy_config: Dict, prefect_file: Path = Path("prefect.yaml")
untemplated_deploy_config: dict[str, Any], prefect_file: Path = Path("prefect.yaml")
) -> bool:
"""
Check if the given deploy config is identical to an existing deploy config in the
Expand All @@ -1662,22 +1669,20 @@ def _check_if_identical_deployment_in_prefect_file(


def _initialize_deployment_triggers(
deployment_name: str, triggers_spec: List[Dict[str, Any]]
) -> List[DeploymentTriggerTypes]:
triggers = []
deployment_name: str, triggers_spec: list[dict[str, Any]]
) -> list[DeploymentTriggerTypes]:
triggers: list[DeploymentTriggerTypes] = []
for i, spec in enumerate(triggers_spec, start=1):
spec.setdefault("name", f"{deployment_name}__automation_{i}")
triggers.append(
pydantic.TypeAdapter(DeploymentTriggerTypes).validate_python(spec)
)
triggers.append(DeploymentTriggerAdapter.validate_python(spec))

return triggers


async def _create_deployment_triggers(
client: "PrefectClient",
deployment_id: UUID,
triggers: List[Union[DeploymentTriggerTypes, TriggerTypes]],
triggers: list[DeploymentTriggerTypes | TriggerTypes],
):
try:
# The triggers defined in the deployment spec are, essentially,
Expand All @@ -1701,8 +1706,8 @@ async def _create_deployment_triggers(


def _gather_deployment_trigger_definitions(
trigger_flags: List[str], existing_triggers: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
trigger_flags: list[str], existing_triggers: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Parses trigger flags from CLI and existing deployment config in `prefect.yaml`.

Args:
Expand All @@ -1717,7 +1722,7 @@ def _gather_deployment_trigger_definitions(
"""

if trigger_flags:
trigger_specs = []
trigger_specs: list[dict[str, Any]] = []
for t in trigger_flags:
try:
if t.endswith(".yaml"):
Expand All @@ -1735,7 +1740,7 @@ def _gather_deployment_trigger_definitions(
return existing_triggers


def _handle_deprecated_schedule_fields(deploy_config: Dict):
def _handle_deprecated_schedule_fields(deploy_config: dict[str, Any]):
deploy_config = deepcopy(deploy_config)

legacy_schedule = deploy_config.get("schedule", NotSet)
Expand Down Expand Up @@ -1779,7 +1784,7 @@ def _gather_deployment_sla_definitions(
Prefers CLI-provided SLAs over config in `prefect.yaml`.
"""
if sla_flags is not None:
sla_specs = []
sla_specs: list[dict[str, Any]] = []
for s in sla_flags:
try:
if s.endswith(".yaml"):
Expand Down Expand Up @@ -1812,7 +1817,7 @@ def _initialize_deployment_slas(
if sla_specs == [] or sla_specs == [[]]:
return []

slas = [pydantic.TypeAdapter(SlaTypes).validate_python(spec) for spec in sla_specs]
slas = [SlaAdapter.validate_python(spec) for spec in sla_specs]

for sla in slas:
sla.set_deployment_id(deployment_id)
Expand Down
Loading