|
| 1 | +"""AppConfig (merged view) domain adapter — BEP-1052 §5. |
| 2 | +
|
| 3 | +Reads the per-user merged AppConfig view and writes the underlying USER |
| 4 | +fragments via the same `app_config_fragment` service processors. The |
| 5 | +merged-view surface lives on its own adapter (separate from |
| 6 | +`AppConfigFragmentAdapter`) so each adapter handles a single domain |
| 7 | +DTO surface — convention in this repo. |
| 8 | +""" |
| 9 | + |
| 10 | +from __future__ import annotations |
| 11 | + |
| 12 | +from ai.backend.common.contexts.user import current_user |
| 13 | +from ai.backend.common.dto.manager.v2.app_config.request import ( |
| 14 | + AppConfigFilter, |
| 15 | + AppConfigOrder, |
| 16 | + GetUserAppConfigInput, |
| 17 | + SearchAppConfigsInput, |
| 18 | + SearchMyAppConfigsInput, |
| 19 | +) |
| 20 | +from ai.backend.common.dto.manager.v2.app_config.response import ( |
| 21 | + AppConfigNode, |
| 22 | + BulkCreateMyAppConfigFragmentsPayload, |
| 23 | + BulkUpdateMyAppConfigFragmentsPayload, |
| 24 | + GetUserAppConfigPayload, |
| 25 | + SearchAppConfigsPayload, |
| 26 | +) |
| 27 | +from ai.backend.common.dto.manager.v2.app_config.types import AppConfigOrderField, OrderDirection |
| 28 | +from ai.backend.common.dto.manager.v2.app_config_fragment.request import ( |
| 29 | + BulkCreateMyAppConfigFragmentsInput, |
| 30 | + BulkUpdateMyAppConfigFragmentsInput, |
| 31 | +) |
| 32 | +from ai.backend.common.dto.manager.v2.app_config_fragment.response import ( |
| 33 | + AppConfigFragmentBulkError, |
| 34 | + AppConfigFragmentNode, |
| 35 | +) |
| 36 | +from ai.backend.common.dto.manager.v2.app_config_fragment.types import ( |
| 37 | + AppConfigScopeType as DTOAppConfigScopeType, |
| 38 | +) |
| 39 | +from ai.backend.common.exception import UnreachableError |
| 40 | +from ai.backend.manager.api.adapter_options.pagination.pagination import PaginationSpec |
| 41 | +from ai.backend.manager.data.app_config.types import AppConfigData |
| 42 | +from ai.backend.manager.data.app_config_fragment.bulk_types import ( |
| 43 | + AppConfigFragmentBulkItemError, |
| 44 | + MyAppConfigFragmentBulkItem, |
| 45 | +) |
| 46 | +from ai.backend.manager.data.app_config_fragment.types import AppConfigFragmentData |
| 47 | +from ai.backend.manager.models.app_config_fragment.conditions import AppConfigFragmentConditions |
| 48 | +from ai.backend.manager.models.app_config_fragment.orders import AppConfigFragmentOrders |
| 49 | +from ai.backend.manager.models.app_config_fragment.row import AppConfigFragmentRow |
| 50 | +from ai.backend.manager.repositories.app_config_fragment.types import UserAppConfigSearchScope |
| 51 | +from ai.backend.manager.repositories.base import BatchQuerier, QueryCondition, QueryOrder |
| 52 | +from ai.backend.manager.services.app_config_fragment.actions.admin_search_app_configs import ( |
| 53 | + AdminSearchAppConfigsAction, |
| 54 | +) |
| 55 | +from ai.backend.manager.services.app_config_fragment.actions.bulk_create_my import ( |
| 56 | + BulkCreateMyAppConfigFragmentsAction, |
| 57 | +) |
| 58 | +from ai.backend.manager.services.app_config_fragment.actions.bulk_update_my import ( |
| 59 | + BulkUpdateMyAppConfigFragmentsAction, |
| 60 | +) |
| 61 | +from ai.backend.manager.services.app_config_fragment.actions.get_user_app_config import ( |
| 62 | + GetUserAppConfigAction, |
| 63 | +) |
| 64 | +from ai.backend.manager.services.app_config_fragment.actions.search_user_app_configs import ( |
| 65 | + SearchUserAppConfigsAction, |
| 66 | +) |
| 67 | + |
| 68 | +from .base import BaseAdapter |
| 69 | + |
| 70 | + |
| 71 | +class AppConfigAdapter(BaseAdapter): |
| 72 | + """Adapter for the merged AppConfig view (BEP-1052 §5). |
| 73 | +
|
| 74 | + Backed by the `app_config_fragment` service processors — the merged |
| 75 | + view is computed from raw fragments — but exposed as a separate |
| 76 | + transport-layer surface so the Fragment adapter stays focused on |
| 77 | + raw-row operations. |
| 78 | + """ |
| 79 | + |
| 80 | + # ── Reads ──────────────────────────────────────────────────────── |
| 81 | + |
| 82 | + async def my_app_config(self, name: str) -> GetUserAppConfigPayload: |
| 83 | + """Read the caller's own merged AppConfig for `name`. |
| 84 | +
|
| 85 | + Resolves the current user from the context; there is no way to |
| 86 | + target another user through this method. |
| 87 | + """ |
| 88 | + me = current_user() |
| 89 | + if me is None: |
| 90 | + raise UnreachableError("User context is not available") |
| 91 | + result = await self._processors.app_config_fragment.get_user_app_config.wait_for_complete( |
| 92 | + GetUserAppConfigAction(user_id=me.user_id, config_name=name) |
| 93 | + ) |
| 94 | + return GetUserAppConfigPayload(item=self._data_to_dto(result.app_config)) |
| 95 | + |
| 96 | + async def admin_get_user_app_config( |
| 97 | + self, input: GetUserAppConfigInput |
| 98 | + ) -> GetUserAppConfigPayload: |
| 99 | + """Read a specific user's merged AppConfig (admin only).""" |
| 100 | + result = await self._processors.app_config_fragment.get_user_app_config.wait_for_complete( |
| 101 | + GetUserAppConfigAction(user_id=input.user_id, config_name=input.name) |
| 102 | + ) |
| 103 | + return GetUserAppConfigPayload(item=self._data_to_dto(result.app_config)) |
| 104 | + |
| 105 | + async def my_search_app_configs( |
| 106 | + self, input: SearchMyAppConfigsInput |
| 107 | + ) -> SearchAppConfigsPayload: |
| 108 | + """Paginated merged-view search over the caller's own AppConfigs.""" |
| 109 | + me = current_user() |
| 110 | + if me is None: |
| 111 | + raise UnreachableError("User context is not available") |
| 112 | + querier = self._build_querier_from_input(input) |
| 113 | + result = ( |
| 114 | + await self._processors.app_config_fragment.search_user_app_configs.wait_for_complete( |
| 115 | + SearchUserAppConfigsAction( |
| 116 | + scope=UserAppConfigSearchScope(user_id=me.user_id), |
| 117 | + querier=querier, |
| 118 | + ) |
| 119 | + ) |
| 120 | + ) |
| 121 | + return SearchAppConfigsPayload( |
| 122 | + items=[self._data_to_dto(item) for item in result.items], |
| 123 | + total_count=result.total_count, |
| 124 | + has_next_page=result.has_next_page, |
| 125 | + has_previous_page=result.has_previous_page, |
| 126 | + ) |
| 127 | + |
| 128 | + async def admin_search_app_configs( |
| 129 | + self, input: SearchAppConfigsInput |
| 130 | + ) -> SearchAppConfigsPayload: |
| 131 | + """Cross-user merged-view search (admin only). |
| 132 | +
|
| 133 | + `filter.user_id` pins the query to a single user; otherwise |
| 134 | + pagination walks across every user. |
| 135 | + """ |
| 136 | + querier = self._build_querier_from_input(input) |
| 137 | + result = ( |
| 138 | + await self._processors.app_config_fragment.admin_search_app_configs.wait_for_complete( |
| 139 | + AdminSearchAppConfigsAction(querier=querier) |
| 140 | + ) |
| 141 | + ) |
| 142 | + return SearchAppConfigsPayload( |
| 143 | + items=[self._data_to_dto(item) for item in result.items], |
| 144 | + total_count=result.total_count, |
| 145 | + has_next_page=result.has_next_page, |
| 146 | + has_previous_page=result.has_previous_page, |
| 147 | + ) |
| 148 | + |
| 149 | + # ── Self-service bulk writes (BEP-1052 §3) ─────────────────────── |
| 150 | + # |
| 151 | + # Each bulk processor returns a `BulkProcessResult[T]` whose |
| 152 | + # `.result` field is the underlying `*ActionResult` produced by the |
| 153 | + # service. We discard the validator-decision trail here — RBAC |
| 154 | + # reasons travel back through the per-item `failed` list. |
| 155 | + |
| 156 | + async def my_bulk_create( |
| 157 | + self, input: BulkCreateMyAppConfigFragmentsInput |
| 158 | + ) -> BulkCreateMyAppConfigFragmentsPayload: |
| 159 | + me = current_user() |
| 160 | + if me is None: |
| 161 | + raise UnreachableError("User context is not available") |
| 162 | + items = [ |
| 163 | + MyAppConfigFragmentBulkItem(name=item.name, extra_config=dict(item.extra_config)) |
| 164 | + for item in input.items |
| 165 | + ] |
| 166 | + wrapper = await self._processors.app_config_fragment.bulk_create_my.wait_for_complete( |
| 167 | + BulkCreateMyAppConfigFragmentsAction( |
| 168 | + entity_ids=[], |
| 169 | + user_id=me.user_id, |
| 170 | + items=items, |
| 171 | + ) |
| 172 | + ) |
| 173 | + result = wrapper.result |
| 174 | + return BulkCreateMyAppConfigFragmentsPayload( |
| 175 | + created=[self._data_to_dto(item) for item in result.created], |
| 176 | + failed=[self._bulk_error_to_dto(err) for err in result.failed], |
| 177 | + ) |
| 178 | + |
| 179 | + async def my_bulk_update( |
| 180 | + self, input: BulkUpdateMyAppConfigFragmentsInput |
| 181 | + ) -> BulkUpdateMyAppConfigFragmentsPayload: |
| 182 | + me = current_user() |
| 183 | + if me is None: |
| 184 | + raise UnreachableError("User context is not available") |
| 185 | + items = [ |
| 186 | + MyAppConfigFragmentBulkItem(name=item.name, extra_config=dict(item.extra_config)) |
| 187 | + for item in input.items |
| 188 | + ] |
| 189 | + wrapper = await self._processors.app_config_fragment.bulk_update_my.wait_for_complete( |
| 190 | + BulkUpdateMyAppConfigFragmentsAction( |
| 191 | + entity_ids=[], |
| 192 | + user_id=me.user_id, |
| 193 | + items=items, |
| 194 | + ) |
| 195 | + ) |
| 196 | + result = wrapper.result |
| 197 | + return BulkUpdateMyAppConfigFragmentsPayload( |
| 198 | + updated=[self._data_to_dto(item) for item in result.updated], |
| 199 | + failed=[self._bulk_error_to_dto(err) for err in result.failed], |
| 200 | + ) |
| 201 | + |
| 202 | + # ── Querier / DTO helpers ──────────────────────────────────────── |
| 203 | + |
| 204 | + _PAGINATION_SPEC = PaginationSpec( |
| 205 | + forward_order=AppConfigFragmentOrders.created_at(ascending=False), |
| 206 | + backward_order=AppConfigFragmentOrders.created_at(ascending=True), |
| 207 | + forward_condition_factory=AppConfigFragmentConditions.by_cursor_forward, |
| 208 | + backward_condition_factory=AppConfigFragmentConditions.by_cursor_backward, |
| 209 | + tiebreaker_order=AppConfigFragmentRow.id.asc(), |
| 210 | + ) |
| 211 | + |
| 212 | + def _build_querier_from_input( |
| 213 | + self, |
| 214 | + input: SearchMyAppConfigsInput | SearchAppConfigsInput, |
| 215 | + ) -> BatchQuerier: |
| 216 | + """Querier builder for the merged-view searches. |
| 217 | +
|
| 218 | + The merged-view SQL resolves cursor / order internally via the |
| 219 | + repository layer; this helper forwards only the filter / order / |
| 220 | + pagination fields so cursor tiebreakers stay consistent with |
| 221 | + the raw-fragment querier. |
| 222 | + """ |
| 223 | + conditions = self._convert_filter(input.filter) if input.filter else [] |
| 224 | + orders = self._convert_orders(input.order) if input.order else [] |
| 225 | + return self._build_querier( |
| 226 | + conditions=conditions, |
| 227 | + orders=orders, |
| 228 | + pagination_spec=self._PAGINATION_SPEC, |
| 229 | + first=input.first, |
| 230 | + after=input.after, |
| 231 | + last=input.last, |
| 232 | + before=input.before, |
| 233 | + limit=input.limit, |
| 234 | + offset=input.offset, |
| 235 | + ) |
| 236 | + |
| 237 | + def _convert_filter(self, filter: AppConfigFilter) -> list[QueryCondition]: |
| 238 | + conditions: list[QueryCondition] = [] |
| 239 | + if filter.name is not None: |
| 240 | + condition = self.convert_string_filter( |
| 241 | + filter.name, |
| 242 | + contains_factory=AppConfigFragmentConditions.by_name_contains, |
| 243 | + equals_factory=AppConfigFragmentConditions.by_name_equals, |
| 244 | + starts_with_factory=AppConfigFragmentConditions.by_name_starts_with, |
| 245 | + ends_with_factory=AppConfigFragmentConditions.by_name_ends_with, |
| 246 | + in_factory=AppConfigFragmentConditions.by_name_in, |
| 247 | + ) |
| 248 | + if condition is not None: |
| 249 | + conditions.append(condition) |
| 250 | + # `filter.user_id` handling lives inside the merged-view SQL |
| 251 | + # (repository layer) rather than in a BatchQuerier condition — |
| 252 | + # see `AppConfigFragmentDBSource.admin_search_app_configs`. |
| 253 | + return conditions |
| 254 | + |
| 255 | + @staticmethod |
| 256 | + def _convert_orders(orders: list[AppConfigOrder]) -> list[QueryOrder]: |
| 257 | + result: list[QueryOrder] = [] |
| 258 | + for order in orders: |
| 259 | + ascending = order.direction == OrderDirection.ASC |
| 260 | + match order.field: |
| 261 | + case AppConfigOrderField.NAME: |
| 262 | + result.append(AppConfigFragmentOrders.name(ascending)) |
| 263 | + case AppConfigOrderField.USER_ID: |
| 264 | + # USER_ID ordering is applied inside the merged-view SQL |
| 265 | + # because the raw `app_config_fragments` row does not |
| 266 | + # carry a user_id column directly. |
| 267 | + continue |
| 268 | + return result |
| 269 | + |
| 270 | + def _data_to_dto(self, data: AppConfigData) -> AppConfigNode: |
| 271 | + return AppConfigNode( |
| 272 | + user_id=data.user_id, |
| 273 | + name=data.name, |
| 274 | + fragments=[self._fragment_data_to_dto(fragment) for fragment in data.fragments], |
| 275 | + config=dict(data.config) if data.config is not None else None, |
| 276 | + ) |
| 277 | + |
| 278 | + @staticmethod |
| 279 | + def _fragment_data_to_dto(data: AppConfigFragmentData) -> AppConfigFragmentNode: |
| 280 | + return AppConfigFragmentNode( |
| 281 | + id=data.id, |
| 282 | + scope_type=DTOAppConfigScopeType(data.scope_type.value), |
| 283 | + scope_id=data.scope_id, |
| 284 | + name=data.name, |
| 285 | + extra_config=dict(data.extra_config) if data.extra_config is not None else None, |
| 286 | + created_at=data.created_at, |
| 287 | + updated_at=data.updated_at, |
| 288 | + ) |
| 289 | + |
| 290 | + @staticmethod |
| 291 | + def _bulk_error_to_dto( |
| 292 | + err: AppConfigFragmentBulkItemError, |
| 293 | + ) -> AppConfigFragmentBulkError: |
| 294 | + return AppConfigFragmentBulkError( |
| 295 | + index=err.index, |
| 296 | + scope_type=DTOAppConfigScopeType(err.scope_type), |
| 297 | + scope_id=err.scope_id, |
| 298 | + name=err.name, |
| 299 | + message=err.message, |
| 300 | + ) |
0 commit comments