Skip to content

Commit

Permalink
Unified Bulk Patch Endpoint for Connections in Rest API (FastAPI) (ap…
Browse files Browse the repository at this point in the history
…ache#45715)

* Initial implementation of Bulk Patch approach for Connections, replace default for mutable fields and fix a comment, Remove unit test for old method

* Comment update,Remove key from model_dump, Update Service logic according to tests, include unit tests, Fix Session Add for Create Action

* Remove unused print statement, unify bulk action types as enums and move them to common.py under datamodels
  • Loading branch information
bugraoz93 authored and got686-yandex committed Jan 30, 2025
1 parent 0a93802 commit fdf7bf8
Show file tree
Hide file tree
Showing 16 changed files with 1,155 additions and 704 deletions.
52 changes: 52 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Common Data Models for Airflow REST API.
:meta private:
"""

from __future__ import annotations

import enum

from airflow.api_fastapi.core_api.base import BaseModel


# Common Bulk Data Models
class BulkAction(enum.Enum):
"""Bulk Action to be performed on the used model."""

CREATE = "create"
DELETE = "delete"
UPDATE = "update"


class BulkActionOnExistence(enum.Enum):
"""Bulk Action to be taken if the entity already exists or not."""

FAIL = "fail"
SKIP = "skip"
OVERWRITE = "overwrite"


# TODO: Unify All Bulk Operation Related Base Data Models
class BulkBaseAction(BaseModel):
"""Base class for bulk actions."""

action: BulkAction
action_on_existence: BulkActionOnExistence = BulkActionOnExistence.FAIL
73 changes: 69 additions & 4 deletions airflow/api_fastapi/core_api/datamodels/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
from __future__ import annotations

import json
from typing import Any

from pydantic import Field, field_validator
from pydantic_core.core_schema import ValidationInfo

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.common import BulkAction, BulkBaseAction
from airflow.utils.log.secrets_masker import redact


Expand Down Expand Up @@ -90,8 +92,71 @@ class ConnectionBody(BaseModel):
extra: str | None = Field(default=None)


class ConnectionBulkBody(BaseModel):
"""Connections Serializer for requests body."""
class ConnectionBulkCreateAction(BulkBaseAction):
"""Bulk Create Variable serializer for request bodies."""

action: BulkAction = BulkAction.CREATE
connections: list[ConnectionBody] = Field(..., description="A list of connections to be created.")


class ConnectionBulkUpdateAction(BulkBaseAction):
"""Bulk Update Connection serializer for request bodies."""

action: BulkAction = BulkAction.UPDATE
connections: list[ConnectionBody] = Field(..., description="A list of connections to be updated.")

connections: list[ConnectionBody]
overwrite: bool | None = Field(default=False)

class ConnectionBulkDeleteAction(BulkBaseAction):
"""Bulk Delete Connection serializer for request bodies."""

action: BulkAction = BulkAction.DELETE
connection_ids: list[str] = Field(..., description="A list of connection IDs to be deleted.")


class ConnectionBulkBody(BaseModel):
"""Request body for bulk Connection operations (create, update, delete)."""

actions: list[ConnectionBulkCreateAction | ConnectionBulkUpdateAction | ConnectionBulkDeleteAction] = (
Field(..., description="A list of Connection actions to perform.")
)


class ConnectionBulkActionResponse(BaseModel):
"""
Serializer for individual bulk action responses.
Represents the outcome of a single bulk operation (create, update, or delete).
The response includes a list of successful connection_ids and any errors encountered during the operation.
This structure helps users understand which key actions succeeded and which failed.
"""

success: list[str] = Field(
default_factory=list, description="A list of connection_ids representing successful operations."
)
errors: list[dict[str, Any]] = Field(
default_factory=list,
description="A list of errors encountered during the operation, each containing details about the issue.",
)


class ConnectionBulkResponse(BaseModel):
"""
Serializer for responses to bulk connection operations.
This represents the results of create, update, and delete actions performed on connections in bulk.
Each action (if requested) is represented as a field containing details about successful connection_ids and any encountered errors.
Fields are populated in the response only if the respective action was part of the request, else are set None.
"""

create: ConnectionBulkActionResponse | None = Field(
default=None,
description="Details of the bulk create operation, including successful connection_ids and errors.",
)
update: ConnectionBulkActionResponse | None = Field(
default=None,
description="Details of the bulk update operation, including successful connection_ids and errors.",
)
delete: ConnectionBulkActionResponse | None = Field(
default=None,
description="Details of the bulk delete operation, including successful connection_ids and errors.",
)
18 changes: 8 additions & 10 deletions airflow/api_fastapi/core_api/datamodels/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
from __future__ import annotations

import json
from typing import Any, Literal
from typing import Any

from pydantic import ConfigDict, Field, model_validator

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.common import BulkAction, BulkBaseAction
from airflow.models.base import ID_LEN
from airflow.typing_compat import Self
from airflow.utils.log.secrets_masker import redact
Expand Down Expand Up @@ -76,28 +77,25 @@ class VariablesImportResponse(BaseModel):
created_count: int


class VariableBulkCreateAction(BaseModel):
class VariableBulkCreateAction(BulkBaseAction):
"""Bulk Create Variable serializer for request bodies."""

action: Literal["create"] = "create"
action: BulkAction = BulkAction.CREATE
variables: list[VariableBody] = Field(..., description="A list of variables to be created.")
action_if_exists: Literal["skip", "overwrite", "fail"] = "fail"


class VariableBulkUpdateAction(BaseModel):
class VariableBulkUpdateAction(BulkBaseAction):
"""Bulk Update Variable serializer for request bodies."""

action: Literal["update"] = "update"
action: BulkAction = BulkAction.UPDATE
variables: list[VariableBody] = Field(..., description="A list of variables to be updated.")
action_if_not_exists: Literal["skip", "fail"] = "fail"


class VariableBulkDeleteAction(BaseModel):
class VariableBulkDeleteAction(BulkBaseAction):
"""Bulk Delete Variable serializer for request bodies."""

action: Literal["delete"] = "delete"
action: BulkAction = BulkAction.DELETE
keys: list[str] = Field(..., description="A list of variable keys to be deleted.")
action_if_not_exists: Literal["skip", "fail"] = "fail"


class VariableBulkBody(BaseModel):
Expand Down
Loading

0 comments on commit fdf7bf8

Please sign in to comment.