44import logging
55import uuid
66from collections import Counter , defaultdict
7- from collections .abc import AsyncIterator , Collection , Mapping , Sequence
7+ from collections .abc import AsyncIterator , Mapping , Sequence
88from contextlib import asynccontextmanager as actxmgr
99from dataclasses import dataclass
1010from datetime import UTC , datetime
7979)
8080from ai .backend .manager .data .deployment_revision_preset .types import ResourceSlotEntryData
8181from ai .backend .manager .data .image .types import ImageIdentifier
82+ from ai .backend .manager .data .model_serving .types import AppProxyRouteEntry
8283from ai .backend .manager .data .permission .types import RBACElementRef
8384from ai .backend .manager .data .resource .types import ScalingGroupProxyTarget
8485from ai .backend .manager .data .session .types import SessionStatus
@@ -980,6 +981,8 @@ async def get_routes_by_endpoint(
980981 async with self ._begin_readonly_session_read_committed () as db_sess :
981982 query = sa .select (RoutingRow ).where (RoutingRow .endpoint == endpoint_id )
982983 result = await db_sess .execute (query )
984+ rows = result .scalars ().all ()
985+
983986 return [
984987 RouteData (
985988 route_id = row .id ,
@@ -994,7 +997,7 @@ async def get_routes_by_endpoint(
994997 replica_port = row .replica_port ,
995998 error_data = row .error_data or {},
996999 )
997- for row in result . scalars (). all ()
1000+ for row in rows
9981001 ]
9991002
10001003 async def update_route_session (
@@ -1581,63 +1584,38 @@ async def scale_routes(
15811584
15821585 # Route operations
15831586
1584- async def search_route_datas (
1587+ async def get_routes_by_statuses (
15851588 self ,
1586- querier : BatchQuerier ,
1589+ statuses : list [RouteStatus ],
1590+ health_statuses : list [RouteHealthStatus ],
15871591 ) -> list [RouteData ]:
1588- """Return :class:`RouteData` rows matched by ``querier`` (no joins).
1589-
1590- Consumers that need session status or revision-level health-check
1591- config call :meth:`fetch_session_statuses_by_route_ids` or
1592- :meth:`fetch_health_check_configs` separately.
1593- """
1592+ """Get routes by lifecycle and health statuses."""
15941593 async with self ._begin_readonly_session_read_committed () as db_sess :
1595- query = sa .select (RoutingRow )
1596- result = await execute_batch_querier (db_sess , query , querier )
1597- return [
1598- RouteData (
1599- route_id = row .RoutingRow .id ,
1600- deployment_id = row .RoutingRow .endpoint ,
1601- session_id = SessionId (row .RoutingRow .session )
1602- if row .RoutingRow .session
1603- else None ,
1604- status = row .RoutingRow .status ,
1605- health_status = row .RoutingRow .health_status ,
1606- traffic_ratio = row .RoutingRow .traffic_ratio ,
1607- created_at = row .RoutingRow .created_at ,
1608- revision_id = DeploymentRevisionID (row .RoutingRow .revision ),
1609- replica_host = row .RoutingRow .replica_host ,
1610- replica_port = row .RoutingRow .replica_port ,
1611- error_data = row .RoutingRow .error_data or {},
1612- )
1613- for row in result .rows
1614- ]
1615-
1616- async def fetch_health_check_configs (
1617- self ,
1618- revision_ids : Collection [DeploymentRevisionID ],
1619- ) -> Mapping [DeploymentRevisionID , ModelHealthCheck | None ]:
1620- """Resolve revision-level ``ModelHealthCheck`` for each revision id.
1621-
1622- Missing revisions are omitted from the result; revisions that opted
1623- out of ``service.health_check`` return ``None``.
1624- """
1625- if not revision_ids :
1626- return {}
1627- unique_ids = {uuid .UUID (str (rid )) for rid in revision_ids }
1628- async with self ._begin_readonly_session_read_committed () as db_sess :
1629- query = sa .select (
1630- DeploymentRevisionRow .id ,
1631- DeploymentRevisionRow .model_definition ,
1632- ).where (DeploymentRevisionRow .id .in_ (unique_ids ))
1594+ query = sa .select (RoutingRow ).where (
1595+ RoutingRow .status .in_ (statuses ),
1596+ RoutingRow .health_status .in_ (health_statuses ),
1597+ )
16331598 result = await db_sess .execute (query )
1634- configs : dict [DeploymentRevisionID , ModelHealthCheck | None ] = {}
1635- for revision_id , model_definition in result .all ():
1636- config = (
1637- model_definition .health_check_config () if model_definition is not None else None
1599+ rows : Sequence [RoutingRow ] = result .scalars ().all ()
1600+
1601+ route_data_list : list [RouteData ] = []
1602+ for row in rows :
1603+ route_data = RouteData (
1604+ route_id = row .id ,
1605+ deployment_id = row .endpoint ,
1606+ session_id = SessionId (row .session ) if row .session else None ,
1607+ status = row .status ,
1608+ health_status = row .health_status ,
1609+ traffic_ratio = row .traffic_ratio ,
1610+ created_at = row .created_at ,
1611+ revision_id = DeploymentRevisionID (row .revision ),
1612+ replica_host = row .replica_host ,
1613+ replica_port = row .replica_port ,
1614+ error_data = row .error_data or {},
16381615 )
1639- configs [DeploymentRevisionID (revision_id )] = config
1640- return configs
1616+ route_data_list .append (route_data )
1617+
1618+ return route_data_list
16411619
16421620 async def update_route_status_bulk (
16431621 self ,
@@ -1879,6 +1857,38 @@ async def update_route_replica_info(
18791857 )
18801858 await db_sess .execute (query )
18811859
1860+ async def fetch_health_check_configs_by_revision_ids (
1861+ self ,
1862+ revision_ids : set [DeploymentRevisionID ],
1863+ ) -> dict [DeploymentRevisionID , ModelHealthCheck | None ]:
1864+ """Fetch health check configurations for revisions.
1865+
1866+ Reads only the ``model_definition`` column — variant-specific
1867+ health check defaults are already baked into that column at
1868+ revision-creation time, so no runtime dispatch by variant name
1869+ and no other row fields are needed. SET NULL state on
1870+ ``image`` / ``model`` does not affect this lookup.
1871+
1872+ Returns:
1873+ Mapping of revision_id to ModelHealthCheck (None if the
1874+ revision has no model_definition or no health_check inside).
1875+ """
1876+ if not revision_ids :
1877+ return {}
1878+
1879+ async with self ._begin_readonly_session_read_committed () as db_sess :
1880+ query = sa .select (
1881+ DeploymentRevisionRow .id ,
1882+ DeploymentRevisionRow .model_definition ,
1883+ ).where (DeploymentRevisionRow .id .in_ (revision_ids ))
1884+ result = await db_sess .execute (query )
1885+ configs : dict [DeploymentRevisionID , ModelHealthCheck | None ] = {}
1886+ for revision_id , model_definition in result .all ():
1887+ configs [DeploymentRevisionID (revision_id )] = (
1888+ model_definition .health_check_config () if model_definition is not None else None
1889+ )
1890+ return configs
1891+
18821892 async def delete_routes_by_route_ids (
18831893 self ,
18841894 route_ids : set [uuid .UUID ],
@@ -2072,6 +2082,80 @@ async def fetch_session_statuses_by_route_ids(
20722082
20732083 return status_map
20742084
2085+ async def fetch_route_connection_infos (
2086+ self ,
2087+ * ,
2088+ route_querier : BatchQuerier ,
2089+ ) -> Mapping [uuid .UUID , list [AppProxyRouteEntry ]]:
2090+ """Resolve routing-table entries grouped by endpoint id.
2091+
2092+ The caller composes ``route_querier`` with every filter that
2093+ applies (lifecycle / health / traffic_status / endpoint id set,
2094+ etc.) — db_source does not impose defaults and does not take a
2095+ separate ``endpoint_ids`` argument. The returned mapping only
2096+ contains endpoints that actually have at least one matching
2097+ route; the caller treats a missing key as "no traffic-receiving
2098+ routes for this endpoint" itself.
2099+
2100+ Internally fetches the filtered ``RoutingRow`` set, then bulk-
2101+ loads the main kernel for each running session and extracts
2102+ the inference port. Sessions that are not RUNNING/CREATING are
2103+ skipped because their kernel host:port is not stable.
2104+ """
2105+ result_map : dict [uuid .UUID , list [AppProxyRouteEntry ]] = {}
2106+
2107+ async with self ._begin_readonly_session_read_committed () as db_sess :
2108+ route_query = sa .select (RoutingRow ).options (
2109+ selectinload (RoutingRow .session_row ),
2110+ )
2111+ route_result = await execute_batch_querier (db_sess , route_query , route_querier )
2112+ route_rows : list [RoutingRow ] = [r .RoutingRow for r in route_result .rows ]
2113+ if not route_rows :
2114+ return result_map
2115+
2116+ # Only sessions whose kernel network address is stable contribute
2117+ # to the routing table; the rest will fall in on the next sync
2118+ # cycle once they reach RUNNING.
2119+ route_by_session : dict [uuid .UUID , RoutingRow ] = {}
2120+ for r in route_rows :
2121+ if r .session is None or r .session_row is None :
2122+ continue
2123+ if r .session_row .status not in (
2124+ SessionStatus .RUNNING ,
2125+ SessionStatus .CREATING ,
2126+ ):
2127+ continue
2128+ route_by_session [r .session ] = r
2129+
2130+ if not route_by_session :
2131+ return result_map
2132+
2133+ kernels = await KernelRow .batch_load_main_kernels_by_session_id (
2134+ db_sess , list (route_by_session .keys ())
2135+ )
2136+
2137+ for kernel in kernels :
2138+ route = route_by_session .get (kernel .session_id )
2139+ if route is None or kernel .service_ports is None or not kernel .kernel_host :
2140+ continue
2141+ # First inference port wins (legacy single-inference-port
2142+ # contract preserved during the row-method removal).
2143+ inference_port = next (
2144+ (p for p in kernel .service_ports if p .get ("is_inference" )),
2145+ None ,
2146+ )
2147+ if inference_port is None or not inference_port .get ("host_ports" ):
2148+ continue
2149+ entry = AppProxyRouteEntry (
2150+ session_id = kernel .session_id ,
2151+ route_id = route .id ,
2152+ kernel_host = kernel .kernel_host ,
2153+ kernel_port = inference_port ["host_ports" ][0 ],
2154+ )
2155+ result_map .setdefault (uuid .UUID (str (route .endpoint )), []).append (entry )
2156+
2157+ return result_map
2158+
20752159 async def search_deployment_ids (self , * , querier : BatchQuerier ) -> list [DeploymentID ]:
20762160 """Search deployment ids using ``BatchQuerier``.
20772161
0 commit comments