Skip to content

Commit 1bcfc03

Browse files
committed
Remove unused print statement, unify bulk action types as enums and move them to common.py under datamodels
1 parent c977c80 commit 1bcfc03

File tree

13 files changed

+250
-225
lines changed

13 files changed

+250
-225
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""
18+
Common Data Models for Airflow REST API.
19+
20+
:meta private:
21+
"""
22+
23+
from __future__ import annotations
24+
25+
import enum
26+
27+
from airflow.api_fastapi.core_api.base import BaseModel
28+
29+
30+
# Common Bulk Data Models
31+
class BulkAction(enum.Enum):
32+
"""Bulk Action to be performed on the used model."""
33+
34+
CREATE = "create"
35+
DELETE = "delete"
36+
UPDATE = "update"
37+
38+
39+
class BulkActionOnExistence(enum.Enum):
40+
"""Bulk Action to be taken if the entity already exists or not."""
41+
42+
FAIL = "fail"
43+
SKIP = "skip"
44+
OVERWRITE = "overwrite"
45+
46+
47+
# TODO: Unify All Bulk Operation Related Base Data Models
48+
class BulkBaseAction(BaseModel):
49+
"""Base class for bulk actions."""
50+
51+
action: BulkAction
52+
action_on_existence: BulkActionOnExistence = BulkActionOnExistence.FAIL

airflow/api_fastapi/core_api/datamodels/connections.py

+8-10
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
from __future__ import annotations
1919

2020
import json
21-
from typing import Any, Literal
21+
from typing import Any
2222

2323
from pydantic import Field, field_validator
2424
from pydantic_core.core_schema import ValidationInfo
2525

2626
from airflow.api_fastapi.core_api.base import BaseModel
27+
from airflow.api_fastapi.core_api.datamodels.common import BulkAction, BulkBaseAction
2728
from airflow.utils.log.secrets_masker import redact
2829

2930

@@ -91,28 +92,25 @@ class ConnectionBody(BaseModel):
9192
extra: str | None = Field(default=None)
9293

9394

94-
class ConnectionBulkCreateAction(BaseModel):
95+
class ConnectionBulkCreateAction(BulkBaseAction):
9596
"""Bulk Create Variable serializer for request bodies."""
9697

97-
action: Literal["create"] = "create"
98+
action: BulkAction = BulkAction.CREATE
9899
connections: list[ConnectionBody] = Field(..., description="A list of connections to be created.")
99-
action_if_exists: Literal["skip", "overwrite", "fail"] = "fail"
100100

101101

102-
class ConnectionBulkUpdateAction(BaseModel):
102+
class ConnectionBulkUpdateAction(BulkBaseAction):
103103
"""Bulk Update Connection serializer for request bodies."""
104104

105-
action: Literal["update"] = "update"
105+
action: BulkAction = BulkAction.UPDATE
106106
connections: list[ConnectionBody] = Field(..., description="A list of connections to be updated.")
107-
action_if_not_exists: Literal["skip", "fail"] = "fail"
108107

109108

110-
class ConnectionBulkDeleteAction(BaseModel):
109+
class ConnectionBulkDeleteAction(BulkBaseAction):
111110
"""Bulk Delete Connection serializer for request bodies."""
112111

113-
action: Literal["delete"] = "delete"
112+
action: BulkAction = BulkAction.DELETE
114113
connection_ids: list[str] = Field(..., description="A list of connection IDs to be deleted.")
115-
action_if_not_exists: Literal["skip", "fail"] = "fail"
116114

117115

118116
class ConnectionBulkBody(BaseModel):

airflow/api_fastapi/core_api/datamodels/variables.py

+8-10
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
from __future__ import annotations
1919

2020
import json
21-
from typing import Any, Literal
21+
from typing import Any
2222

2323
from pydantic import ConfigDict, Field, model_validator
2424

2525
from airflow.api_fastapi.core_api.base import BaseModel
26+
from airflow.api_fastapi.core_api.datamodels.common import BulkAction, BulkBaseAction
2627
from airflow.models.base import ID_LEN
2728
from airflow.typing_compat import Self
2829
from airflow.utils.log.secrets_masker import redact
@@ -76,28 +77,25 @@ class VariablesImportResponse(BaseModel):
7677
created_count: int
7778

7879

79-
class VariableBulkCreateAction(BaseModel):
80+
class VariableBulkCreateAction(BulkBaseAction):
8081
"""Bulk Create Variable serializer for request bodies."""
8182

82-
action: Literal["create"] = "create"
83+
action: BulkAction = BulkAction.CREATE
8384
variables: list[VariableBody] = Field(..., description="A list of variables to be created.")
84-
action_if_exists: Literal["skip", "overwrite", "fail"] = "fail"
8585

8686

87-
class VariableBulkUpdateAction(BaseModel):
87+
class VariableBulkUpdateAction(BulkBaseAction):
8888
"""Bulk Update Variable serializer for request bodies."""
8989

90-
action: Literal["update"] = "update"
90+
action: BulkAction = BulkAction.UPDATE
9191
variables: list[VariableBody] = Field(..., description="A list of variables to be updated.")
92-
action_if_not_exists: Literal["skip", "fail"] = "fail"
9392

9493

95-
class VariableBulkDeleteAction(BaseModel):
94+
class VariableBulkDeleteAction(BulkBaseAction):
9695
"""Bulk Delete Variable serializer for request bodies."""
9796

98-
action: Literal["delete"] = "delete"
97+
action: BulkAction = BulkAction.DELETE
9998
keys: list[str] = Field(..., description="A list of variable keys to be deleted.")
100-
action_if_not_exists: Literal["skip", "fail"] = "fail"
10199

102100

103101
class VariableBulkBody(BaseModel):

airflow/api_fastapi/core_api/openapi/v1-generated.yaml

+40-62
Original file line numberDiff line numberDiff line change
@@ -6554,6 +6554,22 @@ components:
65546554
- status
65556555
title: BaseInfoResponse
65566556
description: Base info serializer for responses.
6557+
BulkAction:
6558+
type: string
6559+
enum:
6560+
- create
6561+
- delete
6562+
- update
6563+
title: BulkAction
6564+
description: Bulk Action to be performed on the used model.
6565+
BulkActionOnExistence:
6566+
type: string
6567+
enum:
6568+
- fail
6569+
- skip
6570+
- overwrite
6571+
title: BulkActionOnExistence
6572+
description: Bulk Action to be taken if the entity already exists or not.
65576573
ClearTaskInstancesBody:
65586574
properties:
65596575
dry_run:
@@ -6847,24 +6863,17 @@ components:
68476863
ConnectionBulkCreateAction:
68486864
properties:
68496865
action:
6850-
type: string
6851-
const: create
6852-
title: Action
6866+
$ref: '#/components/schemas/BulkAction'
68536867
default: create
6868+
action_on_existence:
6869+
$ref: '#/components/schemas/BulkActionOnExistence'
6870+
default: fail
68546871
connections:
68556872
items:
68566873
$ref: '#/components/schemas/ConnectionBody'
68576874
type: array
68586875
title: Connections
68596876
description: A list of connections to be created.
6860-
action_if_exists:
6861-
type: string
6862-
enum:
6863-
- skip
6864-
- overwrite
6865-
- fail
6866-
title: Action If Exists
6867-
default: fail
68686877
type: object
68696878
required:
68706879
- connections
@@ -6873,23 +6882,17 @@ components:
68736882
ConnectionBulkDeleteAction:
68746883
properties:
68756884
action:
6876-
type: string
6877-
const: delete
6878-
title: Action
6885+
$ref: '#/components/schemas/BulkAction'
68796886
default: delete
6887+
action_on_existence:
6888+
$ref: '#/components/schemas/BulkActionOnExistence'
6889+
default: fail
68806890
connection_ids:
68816891
items:
68826892
type: string
68836893
type: array
68846894
title: Connection Ids
68856895
description: A list of connection IDs to be deleted.
6886-
action_if_not_exists:
6887-
type: string
6888-
enum:
6889-
- skip
6890-
- fail
6891-
title: Action If Not Exists
6892-
default: fail
68936896
type: object
68946897
required:
68956898
- connection_ids
@@ -6931,23 +6934,17 @@ components:
69316934
ConnectionBulkUpdateAction:
69326935
properties:
69336936
action:
6934-
type: string
6935-
const: update
6936-
title: Action
6937+
$ref: '#/components/schemas/BulkAction'
69376938
default: update
6939+
action_on_existence:
6940+
$ref: '#/components/schemas/BulkActionOnExistence'
6941+
default: fail
69386942
connections:
69396943
items:
69406944
$ref: '#/components/schemas/ConnectionBody'
69416945
type: array
69426946
title: Connections
69436947
description: A list of connections to be updated.
6944-
action_if_not_exists:
6945-
type: string
6946-
enum:
6947-
- skip
6948-
- fail
6949-
title: Action If Not Exists
6950-
default: fail
69516948
type: object
69526949
required:
69536950
- connections
@@ -9969,24 +9966,17 @@ components:
99699966
VariableBulkCreateAction:
99709967
properties:
99719968
action:
9972-
type: string
9973-
const: create
9974-
title: Action
9969+
$ref: '#/components/schemas/BulkAction'
99759970
default: create
9971+
action_on_existence:
9972+
$ref: '#/components/schemas/BulkActionOnExistence'
9973+
default: fail
99769974
variables:
99779975
items:
99789976
$ref: '#/components/schemas/VariableBody'
99799977
type: array
99809978
title: Variables
99819979
description: A list of variables to be created.
9982-
action_if_exists:
9983-
type: string
9984-
enum:
9985-
- skip
9986-
- overwrite
9987-
- fail
9988-
title: Action If Exists
9989-
default: fail
99909980
type: object
99919981
required:
99929982
- variables
@@ -9995,23 +9985,17 @@ components:
99959985
VariableBulkDeleteAction:
99969986
properties:
99979987
action:
9998-
type: string
9999-
const: delete
10000-
title: Action
9988+
$ref: '#/components/schemas/BulkAction'
100019989
default: delete
9990+
action_on_existence:
9991+
$ref: '#/components/schemas/BulkActionOnExistence'
9992+
default: fail
100029993
keys:
100039994
items:
100049995
type: string
100059996
type: array
100069997
title: Keys
100079998
description: A list of variable keys to be deleted.
10008-
action_if_not_exists:
10009-
type: string
10010-
enum:
10011-
- skip
10012-
- fail
10013-
title: Action If Not Exists
10014-
default: fail
100159999
type: object
1001610000
required:
1001710001
- keys
@@ -10053,23 +10037,17 @@ components:
1005310037
VariableBulkUpdateAction:
1005410038
properties:
1005510039
action:
10056-
type: string
10057-
const: update
10058-
title: Action
10040+
$ref: '#/components/schemas/BulkAction'
1005910041
default: update
10042+
action_on_existence:
10043+
$ref: '#/components/schemas/BulkActionOnExistence'
10044+
default: fail
1006010045
variables:
1006110046
items:
1006210047
$ref: '#/components/schemas/VariableBody'
1006310048
type: array
1006410049
title: Variables
1006510050
description: A list of variables to be updated.
10066-
action_if_not_exists:
10067-
type: string
10068-
enum:
10069-
- skip
10070-
- fail
10071-
title: Action If Not Exists
10072-
default: fail
1007310051
type: object
1007410052
required:
1007510053
- variables

airflow/api_fastapi/core_api/routes/public/connections.py

+10-9
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
2828
from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset, SortParam
2929
from airflow.api_fastapi.common.router import AirflowRouter
30+
from airflow.api_fastapi.core_api.datamodels.common import BulkAction
3031
from airflow.api_fastapi.core_api.datamodels.connections import (
3132
ConnectionBody,
3233
ConnectionBulkActionResponse,
@@ -152,15 +153,15 @@ def bulk_connections(
152153
results: dict[str, ConnectionBulkActionResponse] = {}
153154

154155
for action in request.actions:
155-
if action.action not in results:
156-
results[action.action] = ConnectionBulkActionResponse()
157-
158-
if action.action == "create":
159-
handle_bulk_create(session, action, results[action.action])
160-
elif action.action == "update":
161-
handle_bulk_update(session, action, results[action.action])
162-
elif action.action == "delete":
163-
handle_bulk_delete(session, action, results[action.action])
156+
if action.action.value not in results:
157+
results[action.action.value] = ConnectionBulkActionResponse()
158+
159+
if action.action == BulkAction.CREATE:
160+
handle_bulk_create(session, action, results[action.action.value]) # type: ignore
161+
elif action.action == BulkAction.UPDATE:
162+
handle_bulk_update(session, action, results[action.action.value]) # type: ignore
163+
elif action.action == BulkAction.DELETE:
164+
handle_bulk_delete(session, action, results[action.action.value]) # type: ignore
164165

165166
return ConnectionBulkResponse(**results)
166167

0 commit comments

Comments
 (0)