Skip to content

Commit 61d6a03

Browse files
fregataaclaude
andcommitted
refactor: replace SQL JOIN gating with BatchQuerier + per-handler hc fetch
``DeploymentRepository.get_routes_by_statuses`` (which carried the ``DeploymentRevisionRow.model_definition`` and ``SessionRow.status`` JOINs and the in-memory ``RouteHealthCheckFilter`` post-filter) is split into two narrower entry points so handlers carry the cost of the data they actually need: - ``search_route_datas(querier: BatchQuerier)`` returns bare ``RouteData`` from ``RoutingRow`` alone; the coordinator builds a ``BatchQuerier`` from each handler's ``RouteTargetStatuses`` (now ``lifecycle``/``health``/optional list ``traffic``) at the call site with ``RouteConditions``. - ``fetch_health_check_configs(revision_ids)`` returns a per-revision ``ModelHealthCheck | None`` map, called from ``HealthCheckRouteHandler``, ``AppProxySyncRouteHandler``, ``RouteHealthObserver``, and ``RouteExecutor.check_running_routes`` to gate their per-route behaviour. ``RouteHealthCheckFilter``, ``RouteHandler.health_check_filter()`` and every override are removed; ``RouteSessionData`` is dropped and ``RouteData.session_id`` becomes a direct field again. In ``check_running_routes``, the ``(RouteData, ModelHealthCheck)`` pairs are bundled into a private ``_RouteWithHealthCheck`` frozen dataclass that flows down through ``_ensure_health_records`` and ``_initialize_health_records``; ``_populate_replica_info`` now also mutates ``replica_host``/``replica_port`` on the in-memory routes so the newly populated rows fall through to the single Phase-4 record init. ``sync_appproxy`` self-fetches ``session_statuses`` instead of relying on the dropped SQL join. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c25b818 commit 61d6a03

28 files changed

Lines changed: 397 additions & 469 deletions

src/ai/backend/manager/data/deployment/types.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -253,23 +253,12 @@ class RouteTargetStatuses:
253253
"""Target statuses for route handler filtering (lifecycle x health x traffic).
254254
255255
``traffic=None`` skips the filter; otherwise the row's
256-
``traffic_status`` column must match.
256+
``traffic_status`` column must be in the given list.
257257
"""
258258

259259
lifecycle: list[RouteStatus]
260260
health: list[RouteHealthStatus]
261-
traffic: RouteTrafficStatus | None = None
262-
263-
264-
@dataclass(frozen=True)
265-
class RouteHealthCheckFilter:
266-
"""In-memory gating on a revision's ``health_check_config``.
267-
268-
``health_check_required=True`` keeps only routes whose revision has
269-
a resolved health-check block; ``False`` (default) skips the filter.
270-
"""
271-
272-
health_check_required: bool = False
261+
traffic: list[RouteTrafficStatus] | None = None
273262

274263

275264
@dataclass(frozen=True)

src/ai/backend/manager/repositories/deployment/db_source/db_source.py

Lines changed: 54 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import logging
55
import uuid
66
from collections import Counter, defaultdict
7-
from collections.abc import AsyncIterator, Mapping, Sequence
7+
from collections.abc import AsyncIterator, Collection, Mapping, Sequence
88
from contextlib import asynccontextmanager as actxmgr
99
from dataclasses import dataclass
1010
from datetime import UTC, datetime
@@ -18,7 +18,7 @@
1818
from sqlalchemy.ext.asyncio import async_sessionmaker
1919
from sqlalchemy.orm import selectinload
2020

21-
from ai.backend.common.config import ModelDefinition, ModelHealthCheck
21+
from ai.backend.common.config import ModelHealthCheck
2222
from ai.backend.common.data.endpoint.types import EndpointLifecycle
2323
from ai.backend.common.data.permission.types import RBACElementType
2424
from ai.backend.common.dto.manager.v2.runtime_variant_preset.types import (
@@ -71,12 +71,10 @@
7171
ModelDeploymentAutoScalingRuleData,
7272
ModelRevisionData,
7373
RevisionSearchResult,
74-
RouteHealthCheckFilter,
7574
RouteHealthStatus,
7675
RouteInfo,
7776
RouteSearchResult,
7877
RouteStatus,
79-
RouteTargetStatuses,
8078
ScalingGroupCleanupConfig,
8179
)
8280
from ai.backend.manager.data.deployment_revision_preset.types import ResourceSlotEntryData
@@ -178,7 +176,6 @@
178176
ProjectDeploymentSearchScope,
179177
RouteData,
180178
RouteServiceDiscoveryInfo,
181-
RouteSessionData,
182179
)
183180
from ai.backend.manager.repositories.scheduler.types.session_creation import (
184181
ContainerUserContext,
@@ -204,34 +201,6 @@ class EndpointWithRoutesRawData:
204201
log = BraceStyleAdapter(logging.getLogger(__name__))
205202

206203

207-
def _build_session_data(
208-
session: uuid.UUID | None,
209-
session_status: SessionStatus | None,
210-
) -> RouteSessionData | None:
211-
"""Compose ``RouteSessionData`` from a route's joined session columns.
212-
213-
Returns ``None`` when the route has no session yet (``session IS NULL``)
214-
or when the joined session row is missing.
215-
"""
216-
if session is None or session_status is None:
217-
return None
218-
return RouteSessionData(session_id=SessionId(session), status=session_status)
219-
220-
221-
def _project_health_check_config(
222-
model_definition: ModelDefinition | None,
223-
) -> ModelHealthCheck | None:
224-
"""Project the joined ``model_definition`` to its health-check block.
225-
226-
``None`` means the revision opted out of HTTP probing — the manager
227-
skips probe scheduling and AppProxy receives traffic as soon as the
228-
route reaches RUNNING.
229-
"""
230-
if model_definition is None:
231-
return None
232-
return model_definition.health_check_config()
233-
234-
235204
def _project_preset_slots(
236205
preset_row: DeploymentRevisionPresetRow | None,
237206
slot_entries: list[tuple[str, Decimal]],
@@ -1009,17 +978,13 @@ async def get_routes_by_endpoint(
1009978
) -> list[RouteData]:
1010979
"""Get all routes for an endpoint."""
1011980
async with self._begin_readonly_session_read_committed() as db_sess:
1012-
query = (
1013-
sa.select(RoutingRow, SessionRow.status)
1014-
.outerjoin(SessionRow, RoutingRow.session == SessionRow.id)
1015-
.where(RoutingRow.endpoint == endpoint_id)
1016-
)
981+
query = sa.select(RoutingRow).where(RoutingRow.endpoint == endpoint_id)
1017982
result = await db_sess.execute(query)
1018983
return [
1019984
RouteData(
1020985
route_id=row.id,
1021986
deployment_id=row.endpoint,
1022-
session_data=_build_session_data(row.session, session_status),
987+
session_id=SessionId(row.session) if row.session else None,
1023988
status=row.status,
1024989
health_status=row.health_status,
1025990
traffic_ratio=row.traffic_ratio,
@@ -1029,7 +994,7 @@ async def get_routes_by_endpoint(
1029994
replica_port=row.replica_port,
1030995
error_data=row.error_data or {},
1031996
)
1032-
for row, session_status in result.all()
997+
for row in result.scalars().all()
1033998
]
1034999

10351000
async def update_route_session(
@@ -1616,61 +1581,63 @@ async def scale_routes(
16161581

16171582
# Route operations
16181583

1619-
async def get_routes_by_statuses(
1584+
async def search_route_datas(
16201585
self,
1621-
target: RouteTargetStatuses,
1622-
health_check_filter: RouteHealthCheckFilter,
1586+
querier: BatchQuerier,
16231587
) -> list[RouteData]:
1624-
"""Routes matching ``(lifecycle, health, traffic)`` with
1625-
revision-level ``health_check_config`` gating applied in memory.
1588+
"""Return :class:`RouteData` rows matched by ``querier`` (no joins).
16261589
1627-
``model_definition`` is selected so the resolved
1628-
``ModelHealthCheck`` (or ``None``) is attached to each
1629-
:class:`RouteData`; ``health_check_filter`` then runs over those
1630-
materialized rows.
1590+
Consumers that need session status or revision-level health-check
1591+
config call :meth:`fetch_session_statuses_by_route_ids` or
1592+
:meth:`fetch_health_check_configs` separately.
16311593
"""
16321594
async with self._begin_readonly_session_read_committed() as db_sess:
1633-
query = (
1634-
sa.select(
1635-
RoutingRow,
1636-
DeploymentRevisionRow.model_definition,
1637-
SessionRow.status,
1638-
)
1639-
.join(
1640-
DeploymentRevisionRow,
1641-
DeploymentRevisionRow.id == RoutingRow.revision,
1642-
)
1643-
.outerjoin(SessionRow, RoutingRow.session == SessionRow.id)
1644-
.where(
1645-
RoutingRow.status.in_(target.lifecycle),
1646-
RoutingRow.health_status.in_(target.health),
1595+
query = sa.select(RoutingRow)
1596+
result = await execute_batch_querier(db_sess, query, querier)
1597+
return [
1598+
RouteData(
1599+
route_id=row.RoutingRow.id,
1600+
deployment_id=row.RoutingRow.endpoint,
1601+
session_id=SessionId(row.RoutingRow.session)
1602+
if row.RoutingRow.session
1603+
else None,
1604+
status=row.RoutingRow.status,
1605+
health_status=row.RoutingRow.health_status,
1606+
traffic_ratio=row.RoutingRow.traffic_ratio,
1607+
created_at=row.RoutingRow.created_at,
1608+
revision_id=DeploymentRevisionID(row.RoutingRow.revision),
1609+
replica_host=row.RoutingRow.replica_host,
1610+
replica_port=row.RoutingRow.replica_port,
1611+
error_data=row.RoutingRow.error_data or {},
16471612
)
1648-
)
1649-
if target.traffic is not None:
1650-
query = query.where(RoutingRow.traffic_status == target.traffic)
1613+
for row in result.rows
1614+
]
1615+
1616+
async def fetch_health_check_configs(
1617+
self,
1618+
revision_ids: Collection[DeploymentRevisionID],
1619+
) -> Mapping[DeploymentRevisionID, ModelHealthCheck | None]:
1620+
"""Resolve revision-level ``ModelHealthCheck`` for each revision id.
1621+
1622+
Missing revisions are omitted from the result; revisions that opted
1623+
out of ``service.health_check`` return ``None``.
1624+
"""
1625+
if not revision_ids:
1626+
return {}
1627+
unique_ids = {uuid.UUID(str(rid)) for rid in revision_ids}
1628+
async with self._begin_readonly_session_read_committed() as db_sess:
1629+
query = sa.select(
1630+
DeploymentRevisionRow.id,
1631+
DeploymentRevisionRow.model_definition,
1632+
).where(DeploymentRevisionRow.id.in_(unique_ids))
16511633
result = await db_sess.execute(query)
1652-
routes: list[RouteData] = []
1653-
for row, model_definition, session_status in result.all():
1654-
health_check_config = _project_health_check_config(model_definition)
1655-
if health_check_filter.health_check_required and health_check_config is None:
1656-
continue
1657-
routes.append(
1658-
RouteData(
1659-
route_id=row.id,
1660-
deployment_id=row.endpoint,
1661-
session_data=_build_session_data(row.session, session_status),
1662-
status=row.status,
1663-
health_status=row.health_status,
1664-
traffic_ratio=row.traffic_ratio,
1665-
created_at=row.created_at,
1666-
revision_id=DeploymentRevisionID(row.revision),
1667-
replica_host=row.replica_host,
1668-
replica_port=row.replica_port,
1669-
error_data=row.error_data or {},
1670-
health_check_config=health_check_config,
1671-
)
1634+
configs: dict[DeploymentRevisionID, ModelHealthCheck | None] = {}
1635+
for revision_id, model_definition in result.all():
1636+
config = (
1637+
model_definition.health_check_config() if model_definition is not None else None
16721638
)
1673-
return routes
1639+
configs[DeploymentRevisionID(revision_id)] = config
1640+
return configs
16741641

16751642
async def update_route_status_bulk(
16761643
self,

src/ai/backend/manager/repositories/deployment/repository.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
import uuid
55
from collections import defaultdict
6-
from collections.abc import Mapping, Sequence
6+
from collections.abc import Collection, Mapping, Sequence
77
from dataclasses import dataclass, field
88
from datetime import UTC, datetime, timedelta
99
from decimal import Decimal, DecimalException
@@ -14,7 +14,7 @@
1414
from ai.backend.common.clients.valkey_client.valkey_live.client import ValkeyLiveClient
1515
from ai.backend.common.clients.valkey_client.valkey_schedule.client import ValkeyScheduleClient
1616
from ai.backend.common.clients.valkey_client.valkey_stat.client import ValkeyStatClient
17-
from ai.backend.common.config import ModelDefinitionDraft
17+
from ai.backend.common.config import ModelDefinitionDraft, ModelHealthCheck
1818
from ai.backend.common.data.endpoint.types import EndpointLifecycle
1919
from ai.backend.common.exception import BackendAIError, InvalidAPIParameters
2020
from ai.backend.common.identifier.deployment import DeploymentID
@@ -67,11 +67,9 @@
6767
ModelDeploymentAutoScalingRuleData,
6868
ModelRevisionData,
6969
RevisionSearchResult,
70-
RouteHealthCheckFilter,
7170
RouteInfo,
7271
RouteSearchResult,
7372
RouteStatus,
74-
RouteTargetStatuses,
7573
ScalingGroupCleanupConfig,
7674
)
7775
from ai.backend.manager.data.image.types import ImageIdentifier
@@ -629,15 +627,30 @@ async def scale_routes(
629627
# Route operations
630628

631629
@deployment_repository_resilience.apply()
632-
async def get_routes_by_statuses(
630+
async def search_route_datas(
633631
self,
634-
target: RouteTargetStatuses,
635-
health_check_filter: RouteHealthCheckFilter,
632+
querier: BatchQuerier,
636633
) -> list[RouteData]:
637-
"""Routes matching ``(lifecycle, health, traffic)`` with the
638-
in-memory ``health_check_config`` post-filter applied.
634+
"""Return :class:`RouteData` rows matched by ``querier`` (no joins).
635+
636+
Session status and revision-level health-check config are not
637+
joined — callers fetch them via
638+
:meth:`fetch_session_statuses_by_route_ids` and
639+
:meth:`fetch_health_check_configs`.
640+
"""
641+
return await self._db_source.search_route_datas(querier)
642+
643+
@deployment_repository_resilience.apply()
644+
async def fetch_health_check_configs(
645+
self,
646+
revision_ids: Collection[DeploymentRevisionID],
647+
) -> Mapping[DeploymentRevisionID, ModelHealthCheck | None]:
648+
"""Resolve revision-level ``ModelHealthCheck`` for each id.
649+
650+
Revisions that opted out of ``service.health_check`` map to
651+
``None``; unknown revision ids are omitted.
639652
"""
640-
return await self._db_source.get_routes_by_statuses(target, health_check_filter)
653+
return await self._db_source.fetch_health_check_configs(revision_ids)
641654

642655
@deployment_repository_resilience.apply()
643656
async def update_route_status_bulk(

src/ai/backend/manager/repositories/deployment/types/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
ProjectDeploymentSearchScope,
88
RouteData,
99
RouteServiceDiscoveryInfo,
10-
RouteSessionData,
1110
)
1211

1312
__all__ = [
@@ -17,5 +16,4 @@
1716
"ProjectDeploymentSearchScope",
1817
"RouteData",
1918
"RouteServiceDiscoveryInfo",
20-
"RouteSessionData",
2119
]

src/ai/backend/manager/repositories/deployment/types/endpoint.py

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,11 @@
1111

1212
import sqlalchemy as sa
1313

14-
from ai.backend.common.config import ModelHealthCheck
1514
from ai.backend.common.data.endpoint.types import EndpointLifecycle
1615
from ai.backend.common.identifier.deployment import DeploymentID
1716
from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID
1817
from ai.backend.common.types import SessionId
1918
from ai.backend.manager.data.deployment.types import RouteHealthStatus, RouteStatus
20-
from ai.backend.manager.data.session.types import SessionStatus
2119
from ai.backend.manager.errors.resource import ProjectNotFound
2220
from ai.backend.manager.models.endpoint.row import EndpointRow
2321
from ai.backend.manager.models.group.row import GroupRow
@@ -60,21 +58,13 @@ class EndpointData:
6058
resource_opts: dict[str, Any] = field(default_factory=dict)
6159

6260

63-
@dataclass(frozen=True)
64-
class RouteSessionData:
65-
"""Session id paired with its current status."""
66-
67-
session_id: SessionId
68-
status: SessionStatus
69-
70-
7161
@dataclass
7262
class RouteData:
7363
"""Data structure for model service route."""
7464

7565
route_id: uuid.UUID
7666
deployment_id: DeploymentID
77-
session_data: RouteSessionData | None
67+
session_id: SessionId | None
7868
status: RouteStatus
7969
health_status: RouteHealthStatus
8070
traffic_ratio: float
@@ -84,12 +74,6 @@ class RouteData:
8474
replica_port: int | None = None
8575
updated_at: datetime | None = None
8676
error_data: dict[str, Any] = field(default_factory=dict)
87-
health_check_config: ModelHealthCheck | None = None
88-
89-
@property
90-
def session_id(self) -> SessionId | None:
91-
"""Convenience accessor; reads from ``session_data``."""
92-
return self.session_data.session_id if self.session_data else None
9377

9478

9579
@dataclass

0 commit comments

Comments
 (0)