Skip to content

Commit be7a905

Browse files
bugraoz93dauinh
authored andcommitted
Unified Bulk Patch Endpoint for Connections in Rest API (FastAPI) (apache#45715)
* Initial implementation of Bulk Patch approach for Connections, replace default for mutable fields and fix a comment, Remove unit test for old method * Comment update,Remove key from model_dump, Update Service logic according to tests, include unit tests, Fix Session Add for Create Action * Remove unused print statement, unify bulk action types as enums and move them to common.py under datamodels
1 parent a153977 commit be7a905

File tree

16 files changed

+1155
-704
lines changed

16 files changed

+1155
-704
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""
18+
Common Data Models for Airflow REST API.
19+
20+
:meta private:
21+
"""
22+
23+
from __future__ import annotations
24+
25+
import enum
26+
27+
from airflow.api_fastapi.core_api.base import BaseModel
28+
29+
30+
# Common Bulk Data Models
31+
class BulkAction(enum.Enum):
32+
"""Bulk Action to be performed on the used model."""
33+
34+
CREATE = "create"
35+
DELETE = "delete"
36+
UPDATE = "update"
37+
38+
39+
class BulkActionOnExistence(enum.Enum):
40+
"""Bulk Action to be taken if the entity already exists or not."""
41+
42+
FAIL = "fail"
43+
SKIP = "skip"
44+
OVERWRITE = "overwrite"
45+
46+
47+
# TODO: Unify All Bulk Operation Related Base Data Models
48+
class BulkBaseAction(BaseModel):
49+
"""Base class for bulk actions."""
50+
51+
action: BulkAction
52+
action_on_existence: BulkActionOnExistence = BulkActionOnExistence.FAIL

airflow/api_fastapi/core_api/datamodels/connections.py

+69-4
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
from __future__ import annotations
1919

2020
import json
21+
from typing import Any
2122

2223
from pydantic import Field, field_validator
2324
from pydantic_core.core_schema import ValidationInfo
2425

2526
from airflow.api_fastapi.core_api.base import BaseModel
27+
from airflow.api_fastapi.core_api.datamodels.common import BulkAction, BulkBaseAction
2628
from airflow.utils.log.secrets_masker import redact
2729

2830

@@ -90,8 +92,71 @@ class ConnectionBody(BaseModel):
9092
extra: str | None = Field(default=None)
9193

9294

93-
class ConnectionBulkBody(BaseModel):
94-
"""Connections Serializer for requests body."""
95+
class ConnectionBulkCreateAction(BulkBaseAction):
96+
"""Bulk Create Variable serializer for request bodies."""
97+
98+
action: BulkAction = BulkAction.CREATE
99+
connections: list[ConnectionBody] = Field(..., description="A list of connections to be created.")
100+
101+
102+
class ConnectionBulkUpdateAction(BulkBaseAction):
103+
"""Bulk Update Connection serializer for request bodies."""
104+
105+
action: BulkAction = BulkAction.UPDATE
106+
connections: list[ConnectionBody] = Field(..., description="A list of connections to be updated.")
95107

96-
connections: list[ConnectionBody]
97-
overwrite: bool | None = Field(default=False)
108+
109+
class ConnectionBulkDeleteAction(BulkBaseAction):
110+
"""Bulk Delete Connection serializer for request bodies."""
111+
112+
action: BulkAction = BulkAction.DELETE
113+
connection_ids: list[str] = Field(..., description="A list of connection IDs to be deleted.")
114+
115+
116+
class ConnectionBulkBody(BaseModel):
117+
"""Request body for bulk Connection operations (create, update, delete)."""
118+
119+
actions: list[ConnectionBulkCreateAction | ConnectionBulkUpdateAction | ConnectionBulkDeleteAction] = (
120+
Field(..., description="A list of Connection actions to perform.")
121+
)
122+
123+
124+
class ConnectionBulkActionResponse(BaseModel):
125+
"""
126+
Serializer for individual bulk action responses.
127+
128+
Represents the outcome of a single bulk operation (create, update, or delete).
129+
The response includes a list of successful connection_ids and any errors encountered during the operation.
130+
This structure helps users understand which key actions succeeded and which failed.
131+
"""
132+
133+
success: list[str] = Field(
134+
default_factory=list, description="A list of connection_ids representing successful operations."
135+
)
136+
errors: list[dict[str, Any]] = Field(
137+
default_factory=list,
138+
description="A list of errors encountered during the operation, each containing details about the issue.",
139+
)
140+
141+
142+
class ConnectionBulkResponse(BaseModel):
143+
"""
144+
Serializer for responses to bulk connection operations.
145+
146+
This represents the results of create, update, and delete actions performed on connections in bulk.
147+
Each action (if requested) is represented as a field containing details about successful connection_ids and any encountered errors.
148+
Fields are populated in the response only if the respective action was part of the request, else are set None.
149+
"""
150+
151+
create: ConnectionBulkActionResponse | None = Field(
152+
default=None,
153+
description="Details of the bulk create operation, including successful connection_ids and errors.",
154+
)
155+
update: ConnectionBulkActionResponse | None = Field(
156+
default=None,
157+
description="Details of the bulk update operation, including successful connection_ids and errors.",
158+
)
159+
delete: ConnectionBulkActionResponse | None = Field(
160+
default=None,
161+
description="Details of the bulk delete operation, including successful connection_ids and errors.",
162+
)

airflow/api_fastapi/core_api/datamodels/variables.py

+8-10
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
from __future__ import annotations
1919

2020
import json
21-
from typing import Any, Literal
21+
from typing import Any
2222

2323
from pydantic import ConfigDict, Field, model_validator
2424

2525
from airflow.api_fastapi.core_api.base import BaseModel
26+
from airflow.api_fastapi.core_api.datamodels.common import BulkAction, BulkBaseAction
2627
from airflow.models.base import ID_LEN
2728
from airflow.typing_compat import Self
2829
from airflow.utils.log.secrets_masker import redact
@@ -76,28 +77,25 @@ class VariablesImportResponse(BaseModel):
7677
created_count: int
7778

7879

79-
class VariableBulkCreateAction(BaseModel):
80+
class VariableBulkCreateAction(BulkBaseAction):
8081
"""Bulk Create Variable serializer for request bodies."""
8182

82-
action: Literal["create"] = "create"
83+
action: BulkAction = BulkAction.CREATE
8384
variables: list[VariableBody] = Field(..., description="A list of variables to be created.")
84-
action_if_exists: Literal["skip", "overwrite", "fail"] = "fail"
8585

8686

87-
class VariableBulkUpdateAction(BaseModel):
87+
class VariableBulkUpdateAction(BulkBaseAction):
8888
"""Bulk Update Variable serializer for request bodies."""
8989

90-
action: Literal["update"] = "update"
90+
action: BulkAction = BulkAction.UPDATE
9191
variables: list[VariableBody] = Field(..., description="A list of variables to be updated.")
92-
action_if_not_exists: Literal["skip", "fail"] = "fail"
9392

9493

95-
class VariableBulkDeleteAction(BaseModel):
94+
class VariableBulkDeleteAction(BulkBaseAction):
9695
"""Bulk Delete Variable serializer for request bodies."""
9796

98-
action: Literal["delete"] = "delete"
97+
action: BulkAction = BulkAction.DELETE
9998
keys: list[str] = Field(..., description="A list of variable keys to be deleted.")
100-
action_if_not_exists: Literal["skip", "fail"] = "fail"
10199

102100

103101
class VariableBulkBody(BaseModel):

0 commit comments

Comments
 (0)