7272 ModelRevisionData ,
7373 ModelRevisionSpec ,
7474 RevisionSearchResult ,
75+ RouteHealthCheckFilter ,
7576 RouteHealthStatus ,
7677 RouteInfo ,
7778 RouteSearchResult ,
7879 RouteStatus ,
80+ RouteTargetStatuses ,
7981 ScalingGroupCleanupConfig ,
8082)
8183from ai .backend .manager .data .deployment_revision_preset .types import ResourceSlotEntryData
8284from ai .backend .manager .data .image .types import ImageIdentifier
83- from ai .backend .manager .data .model_serving .types import AppProxyRouteEntry
8485from ai .backend .manager .data .permission .types import RBACElementRef
8586from ai .backend .manager .data .resource .types import ScalingGroupProxyTarget
8687from ai .backend .manager .data .session .types import SessionStatus
178179 ProjectDeploymentSearchScope ,
179180 RouteData ,
180181 RouteServiceDiscoveryInfo ,
182+ RouteSessionData ,
181183)
182184from ai .backend .manager .repositories .scheduler .types .session_creation import (
183185 ContainerUserContext ,
@@ -203,6 +205,20 @@ class EndpointWithRoutesRawData:
203205log = BraceStyleAdapter (logging .getLogger (__name__ ))
204206
205207
208+ def _build_session_data (
209+ session : uuid .UUID | None ,
210+ session_status : SessionStatus | None ,
211+ ) -> RouteSessionData | None :
212+ """Compose ``RouteSessionData`` from a route's joined session columns.
213+
214+ Returns ``None`` when the route has no session yet (``session IS NULL``)
215+ or when the joined session row is missing.
216+ """
217+ if session is None or session_status is None :
218+ return None
219+ return RouteSessionData (session_id = SessionId (session ), status = session_status )
220+
221+
206222def _project_preset_slots (
207223 preset_row : DeploymentRevisionPresetRow | None ,
208224 slot_entries : list [tuple [str , Decimal ]],
@@ -980,15 +996,17 @@ async def get_routes_by_endpoint(
980996 ) -> list [RouteData ]:
981997 """Get all routes for an endpoint."""
982998 async with self ._begin_readonly_session_read_committed () as db_sess :
983- query = sa .select (RoutingRow ).where (RoutingRow .endpoint == endpoint_id )
999+ query = (
1000+ sa .select (RoutingRow , SessionRow .status )
1001+ .outerjoin (SessionRow , RoutingRow .session == SessionRow .id )
1002+ .where (RoutingRow .endpoint == endpoint_id )
1003+ )
9841004 result = await db_sess .execute (query )
985- rows = result .scalars ().all ()
986-
9871005 return [
9881006 RouteData (
9891007 route_id = row .id ,
9901008 deployment_id = row .endpoint ,
991- session_id = SessionId (row .session ) if row . session else None ,
1009+ session_data = _build_session_data (row .session , session_status ) ,
9921010 status = row .status ,
9931011 health_status = row .health_status ,
9941012 traffic_ratio = row .traffic_ratio ,
@@ -998,7 +1016,7 @@ async def get_routes_by_endpoint(
9981016 replica_port = row .replica_port ,
9991017 error_data = row .error_data or {},
10001018 )
1001- for row in rows
1019+ for row , session_status in result . all ()
10021020 ]
10031021
10041022 async def update_route_session (
@@ -1587,24 +1605,49 @@ async def scale_routes(
15871605
15881606 async def get_routes_by_statuses (
15891607 self ,
1590- statuses : list [ RouteStatus ] ,
1591- health_statuses : list [ RouteHealthStatus ] ,
1608+ target : RouteTargetStatuses ,
1609+ health_check_filter : RouteHealthCheckFilter ,
15921610 ) -> list [RouteData ]:
1593- """Get routes by lifecycle and health statuses."""
1611+ """Routes matching ``(lifecycle, health, traffic_status)`` with
1612+ revision-level ``health_check_enabled`` gating per ``health_check_filter``.
1613+ """
15941614 async with self ._begin_readonly_session_read_committed () as db_sess :
1595- query = sa .select (RoutingRow ).where (
1596- RoutingRow .status .in_ (statuses ),
1597- RoutingRow .health_status .in_ (health_statuses ),
1615+ query = (
1616+ sa .select (
1617+ RoutingRow ,
1618+ DeploymentRevisionRow .health_check_enabled ,
1619+ SessionRow .status ,
1620+ )
1621+ .join (
1622+ DeploymentRevisionRow ,
1623+ DeploymentRevisionRow .id == RoutingRow .revision ,
1624+ )
1625+ .outerjoin (SessionRow , RoutingRow .session == SessionRow .id )
15981626 )
1627+ query = query .where (RoutingRow .status .in_ (target .lifecycle ))
1628+ if health_check_filter .include_health_check_disabled :
1629+ query = query .where (
1630+ sa .or_ (
1631+ RoutingRow .health_status .in_ (target .health ),
1632+ DeploymentRevisionRow .health_check_enabled .is_ (False ),
1633+ )
1634+ )
1635+ else :
1636+ query = query .where (RoutingRow .health_status .in_ (target .health ))
1637+ if health_check_filter .health_check_required is not None :
1638+ query = query .where (
1639+ DeploymentRevisionRow .health_check_enabled .is_ (
1640+ health_check_filter .health_check_required
1641+ )
1642+ )
1643+ if target .traffic_status is not None :
1644+ query = query .where (RoutingRow .traffic_status == target .traffic_status )
15991645 result = await db_sess .execute (query )
1600- rows : Sequence [RoutingRow ] = result .scalars ().all ()
1601-
1602- route_data_list : list [RouteData ] = []
1603- for row in rows :
1604- route_data = RouteData (
1646+ return [
1647+ RouteData (
16051648 route_id = row .id ,
16061649 deployment_id = row .endpoint ,
1607- session_id = SessionId (row .session ) if row . session else None ,
1650+ session_data = _build_session_data (row .session , session_status ) ,
16081651 status = row .status ,
16091652 health_status = row .health_status ,
16101653 traffic_ratio = row .traffic_ratio ,
@@ -1613,10 +1656,46 @@ async def get_routes_by_statuses(
16131656 replica_host = row .replica_host ,
16141657 replica_port = row .replica_port ,
16151658 error_data = row .error_data or {},
1659+ health_check_enabled = bool (health_check_enabled ),
16161660 )
1617- route_data_list .append (route_data )
1661+ for row , health_check_enabled , session_status in result .all ()
1662+ ]
16181663
1619- return route_data_list
1664+ async def get_routes_for_health_observation (self ) -> list [RouteData ]:
1665+ """RUNNING routes whose revision declared ``service.health_check``."""
1666+ async with self ._begin_readonly_session_read_committed () as db_sess :
1667+ query = (
1668+ sa .select (
1669+ RoutingRow ,
1670+ DeploymentRevisionRow .health_check_enabled ,
1671+ SessionRow .status ,
1672+ )
1673+ .join (
1674+ DeploymentRevisionRow ,
1675+ DeploymentRevisionRow .id == RoutingRow .revision ,
1676+ )
1677+ .outerjoin (SessionRow , RoutingRow .session == SessionRow .id )
1678+ .where (RoutingRow .status == RouteStatus .RUNNING )
1679+ .where (DeploymentRevisionRow .health_check_enabled .is_ (True ))
1680+ )
1681+ result = await db_sess .execute (query )
1682+ return [
1683+ RouteData (
1684+ route_id = row .id ,
1685+ deployment_id = row .endpoint ,
1686+ session_data = _build_session_data (row .session , session_status ),
1687+ status = row .status ,
1688+ health_status = row .health_status ,
1689+ traffic_ratio = row .traffic_ratio ,
1690+ created_at = row .created_at ,
1691+ revision_id = DeploymentRevisionID (row .revision ),
1692+ replica_host = row .replica_host ,
1693+ replica_port = row .replica_port ,
1694+ error_data = row .error_data or {},
1695+ health_check_enabled = bool (health_check_enabled ),
1696+ )
1697+ for row , health_check_enabled , session_status in result .all ()
1698+ ]
16201699
16211700 async def update_route_status_bulk (
16221701 self ,
@@ -2083,80 +2162,6 @@ async def fetch_session_statuses_by_route_ids(
20832162
20842163 return status_map
20852164
2086- async def fetch_route_connection_infos (
2087- self ,
2088- * ,
2089- route_querier : BatchQuerier ,
2090- ) -> Mapping [uuid .UUID , list [AppProxyRouteEntry ]]:
2091- """Resolve routing-table entries grouped by endpoint id.
2092-
2093- The caller composes ``route_querier`` with every filter that
2094- applies (lifecycle / health / traffic_status / endpoint id set,
2095- etc.) — db_source does not impose defaults and does not take a
2096- separate ``endpoint_ids`` argument. The returned mapping only
2097- contains endpoints that actually have at least one matching
2098- route; the caller treats a missing key as "no traffic-receiving
2099- routes for this endpoint" itself.
2100-
2101- Internally fetches the filtered ``RoutingRow`` set, then bulk-
2102- loads the main kernel for each running session and extracts
2103- the inference port. Sessions that are not RUNNING/CREATING are
2104- skipped because their kernel host:port is not stable.
2105- """
2106- result_map : dict [uuid .UUID , list [AppProxyRouteEntry ]] = {}
2107-
2108- async with self ._begin_readonly_session_read_committed () as db_sess :
2109- route_query = sa .select (RoutingRow ).options (
2110- selectinload (RoutingRow .session_row ),
2111- )
2112- route_result = await execute_batch_querier (db_sess , route_query , route_querier )
2113- route_rows : list [RoutingRow ] = [r .RoutingRow for r in route_result .rows ]
2114- if not route_rows :
2115- return result_map
2116-
2117- # Only sessions whose kernel network address is stable contribute
2118- # to the routing table; the rest will fall in on the next sync
2119- # cycle once they reach RUNNING.
2120- route_by_session : dict [uuid .UUID , RoutingRow ] = {}
2121- for r in route_rows :
2122- if r .session is None or r .session_row is None :
2123- continue
2124- if r .session_row .status not in (
2125- SessionStatus .RUNNING ,
2126- SessionStatus .CREATING ,
2127- ):
2128- continue
2129- route_by_session [r .session ] = r
2130-
2131- if not route_by_session :
2132- return result_map
2133-
2134- kernels = await KernelRow .batch_load_main_kernels_by_session_id (
2135- db_sess , list (route_by_session .keys ())
2136- )
2137-
2138- for kernel in kernels :
2139- route = route_by_session .get (kernel .session_id )
2140- if route is None or kernel .service_ports is None or not kernel .kernel_host :
2141- continue
2142- # First inference port wins (legacy single-inference-port
2143- # contract preserved during the row-method removal).
2144- inference_port = next (
2145- (p for p in kernel .service_ports if p .get ("is_inference" )),
2146- None ,
2147- )
2148- if inference_port is None or not inference_port .get ("host_ports" ):
2149- continue
2150- entry = AppProxyRouteEntry (
2151- session_id = kernel .session_id ,
2152- route_id = route .id ,
2153- kernel_host = kernel .kernel_host ,
2154- kernel_port = inference_port ["host_ports" ][0 ],
2155- )
2156- result_map .setdefault (uuid .UUID (str (route .endpoint )), []).append (entry )
2157-
2158- return result_map
2159-
21602165 async def search_deployment_ids (self , * , querier : BatchQuerier ) -> list [DeploymentID ]:
21612166 """Search deployment ids using ``BatchQuerier``.
21622167
0 commit comments