-
Notifications
You must be signed in to change notification settings - Fork 175
Expand file tree
/
Copy pathrequest.py
More file actions
148 lines (120 loc) · 5.25 KB
/
request.py
File metadata and controls
148 lines (120 loc) · 5.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""
Request DTOs for Domain v2 admin REST API.
Shared between Client SDK and Manager API.
"""
from __future__ import annotations
from pydantic import Field
from ai.backend.common.api_handlers import SENTINEL, BaseRequestModel, Sentinel
from ai.backend.common.dto.manager.defs import DEFAULT_PAGE_LIMIT, MAX_PAGE_LIMIT
from ai.backend.common.dto.manager.query import DateTimeFilter, StringFilter
from ai.backend.common.dto.manager.v2.common import BaseFilter
from ai.backend.common.dto.manager.v2.domain.types import (
DomainOrderField,
DomainProjectFilter,
DomainUserFilter,
OrderDirection,
)
__all__ = (
"AdminSearchDomainsInput",
"CreateDomainInput",
"DeleteDomainInput",
"DomainFilter",
"DomainOrder",
"PurgeDomainInput",
"SearchDomainsRequest",
"UpdateDomainInput",
)
class CreateDomainInput(BaseRequestModel):
"""Input for creating a new domain."""
name: str = Field(
description="Domain name. Must be unique across the system.",
max_length=64,
)
description: str | None = Field(
default=None,
description="Optional description of the domain.",
)
is_active: bool = Field(
default=True,
description="Whether the domain is active upon creation.",
)
allowed_docker_registries: list[str] | None = Field(
default=None,
description="List of allowed Docker registry URLs for this domain.",
)
integration_name: str | None = Field(
default=None,
description="External integration identifier for the domain.",
)
class UpdateDomainInput(BaseRequestModel):
"""Input for updating domain information. All fields optional — only provided fields will be updated."""
name: str | None = Field(
default=None,
description="New domain name.",
max_length=64,
)
description: str | Sentinel | None = Field(
default=SENTINEL,
description="New domain description. Set to null to clear.",
)
is_active: bool | None = Field(
default=None,
description="Updated active status.",
)
allowed_docker_registries: list[str] | Sentinel | None = Field(
default=SENTINEL,
description="New list of allowed Docker registry URLs. Set to null to clear.",
)
integration_name: str | Sentinel | None = Field(
default=SENTINEL,
description="New external integration identifier. Set to null to clear.",
)
class DeleteDomainInput(BaseRequestModel):
"""Input for soft-deleting a domain."""
name: str = Field(description="Name of the domain to soft-delete.")
class PurgeDomainInput(BaseRequestModel):
"""Input for permanently purging a domain and all associated data."""
name: str = Field(description="Name of the domain to permanently purge.")
class DomainFilter(BaseFilter):
"""Filter criteria for searching domains."""
name: StringFilter | None = Field(default=None, description="Filter by domain name.")
description: StringFilter | None = Field(default=None, description="Filter by description.")
is_active: bool | None = Field(default=None, description="Filter by active status.")
created_at: DateTimeFilter | None = Field(default=None, description="Filter by creation time.")
modified_at: DateTimeFilter | None = Field(
default=None, description="Filter by last modification time."
)
project: DomainProjectFilter | None = Field(
default=None, description="Filter by nested project conditions."
)
user: DomainUserFilter | None = Field(
default=None, description="Filter by nested user conditions."
)
class DomainOrder(BaseRequestModel):
"""Order specification for domain search results."""
field: DomainOrderField = Field(description="Field to order by.")
direction: OrderDirection = Field(
default=OrderDirection.ASC,
description="Order direction.",
)
class SearchDomainsRequest(BaseRequestModel):
"""Request body for searching domains with filters, orders, and pagination."""
filter: DomainFilter | None = Field(default=None, description="Filter conditions.")
order: list[DomainOrder] | None = Field(default=None, description="Order specifications.")
limit: int = Field(
default=DEFAULT_PAGE_LIMIT,
ge=1,
le=MAX_PAGE_LIMIT,
description="Maximum items to return.",
)
offset: int = Field(default=0, ge=0, description="Number of items to skip.")
class AdminSearchDomainsInput(BaseRequestModel):
"""Input for admin search of domains with cursor and offset pagination."""
filter: DomainFilter | None = Field(default=None, description="Filter conditions.")
order: list[DomainOrder] | None = Field(default=None, description="Order specifications.")
first: int | None = Field(default=None, description="Cursor pagination: number of items.")
after: str | None = Field(default=None, description="Cursor pagination: after cursor.")
last: int | None = Field(default=None, description="Cursor pagination: last N items.")
before: str | None = Field(default=None, description="Cursor pagination: before cursor.")
limit: int | None = Field(default=None, description="Offset pagination: maximum items.")
offset: int | None = Field(default=None, description="Offset pagination: number to skip.")