Skip to content

Commit 584d934

Browse files
HyeockJinKimclaude
andauthored
feat(BA-6032): implement PROVISIONING sub-status pipeline for routes (#11613)
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent c992f82 commit 584d934

36 files changed

Lines changed: 557 additions & 169 deletions

File tree

changes/11613.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add three-stage PROVISIONING sub-status pipeline (PENDING→STARTING→WARMING_UP→RUNNING) for model service routes with ReplicaID typed identifier.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from typing import NewType
2+
from uuid import UUID
3+
4+
__all__ = ("ReplicaID",)
5+
6+
ReplicaID = NewType("ReplicaID", UUID)

src/ai/backend/manager/api/adapters/scheduling_history/adapter.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
SessionHistoryNode,
2626
)
2727
from ai.backend.common.dto.manager.v2.scheduling_history.types import SubStepResultInfo
28+
from ai.backend.common.identifier.replica import ReplicaID
2829
from ai.backend.manager.api.adapter_options.pagination.pagination import PaginationSpec
2930
from ai.backend.manager.api.adapters.base import BaseAdapter
3031
from ai.backend.manager.data.deployment.types import DeploymentHistoryData, RouteHistoryData
@@ -537,7 +538,7 @@ async def route_scoped_search(
537538
input: AdminSearchRouteHistoriesInput,
538539
) -> AdminSearchRouteHistoriesPayload:
539540
"""Search route histories scoped to a route."""
540-
scope = RouteHistorySearchScope(route_id=route_id)
541+
scope = RouteHistorySearchScope(route_id=ReplicaID(route_id))
541542
querier = self._build_route_querier(input)
542543
action_result = (
543544
await self._processors.scheduling_history.search_route_scoped_history.wait_for_complete(

src/ai/backend/manager/data/deployment/types.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ class RouteHandlerCategory(enum.StrEnum):
110110

111111
LIFECYCLE = "lifecycle"
112112
HEALTH = "health"
113+
SYNC = "sync"
113114

114115

115116
class DeploymentHandlerCategory(enum.StrEnum):
@@ -291,9 +292,6 @@ class RouteTransitionTarget:
291292
class RouteStatusTransitions:
292293
"""Status transitions for route handlers.
293294
294-
Route handlers have success/failure/stale outcomes (no expired/give_up).
295-
Each outcome can change lifecycle status, health status, or both.
296-
297295
Attributes:
298296
success: Target state when handler succeeds, None means no change
299297
failure: Target state when handler fails, None means no change

src/ai/backend/manager/models/routing/row.py

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload
1414

1515
from ai.backend.common.identifier.deployment import DeploymentID
16+
from ai.backend.common.identifier.replica import ReplicaID
1617
from ai.backend.common.types import SessionId
1718
from ai.backend.logging import BraceStyleAdapter
1819
from ai.backend.manager.data.deployment.types import (
@@ -53,7 +54,7 @@ class RoutingRow(Base): # type: ignore[misc]
5354
sa.UniqueConstraint("endpoint", "session", name="uq_routings_endpoint_session"),
5455
)
5556

56-
id: Mapped[uuid.UUID] = mapped_column(
57+
id: Mapped[ReplicaID] = mapped_column(
5758
"id", GUID, primary_key=True, server_default=sa.text("uuid_generate_v4()")
5859
)
5960
endpoint: Mapped[DeploymentID] = mapped_column(
@@ -237,30 +238,6 @@ async def get(
237238
raise NoResultFound
238239
return row
239240

240-
def __init__(
241-
self,
242-
id: uuid.UUID,
243-
endpoint: DeploymentID,
244-
session: uuid.UUID | None,
245-
session_owner: uuid.UUID,
246-
domain: str,
247-
project: uuid.UUID,
248-
revision: uuid.UUID,
249-
status: RouteStatus = RouteStatus.PROVISIONING,
250-
traffic_ratio: float = 1.0,
251-
traffic_status: RouteTrafficStatus = RouteTrafficStatus.ACTIVE,
252-
) -> None:
253-
self.id = id
254-
self.endpoint = endpoint
255-
self.session = session
256-
self.session_owner = session_owner
257-
self.domain = domain
258-
self.project = project
259-
self.status = status
260-
self.traffic_ratio = traffic_ratio
261-
self.revision = revision
262-
self.traffic_status = traffic_status
263-
264241
def delegate_ownership(self, user_uuid: uuid.UUID) -> None:
265242
self.session_owner = user_uuid
266243

src/ai/backend/manager/models/scheduling_history/row.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from sqlalchemy.orm import Mapped, mapped_column
88

99
from ai.backend.common.data.model_deployment.types import ModelDeploymentStatus
10+
from ai.backend.common.identifier.replica import ReplicaID
1011
from ai.backend.common.types import KernelId, SessionId
1112
from ai.backend.manager.data.deployment.types import (
1213
DeploymentHandlerCategory,
@@ -248,7 +249,7 @@ class RouteHistoryRow(Base): # type: ignore[misc]
248249
id: Mapped[uuid.UUID] = mapped_column(
249250
"id", GUID, primary_key=True, server_default=sa.text("uuid_generate_v4()")
250251
)
251-
route_id: Mapped[uuid.UUID] = mapped_column("route_id", GUID, nullable=False, index=True)
252+
route_id: Mapped[ReplicaID] = mapped_column("route_id", GUID, nullable=False, index=True)
252253
deployment_id: Mapped[uuid.UUID] = mapped_column(
253254
"deployment_id", GUID, nullable=False, index=True
254255
)

src/ai/backend/manager/repositories/deployment/creators/route.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ class RouteCreatorSpec(CreatorSpec[RoutingRow]):
3939
@override
4040
def build_row(self) -> RoutingRow:
4141
return RoutingRow(
42-
id=uuid.uuid4(),
4342
endpoint=self.deployment_id,
4443
session=None,
4544
session_owner=self.session_owner_id,
4645
domain=self.domain,
4746
project=self.project_id,
4847
status=RouteStatus.PROVISIONING,
48+
sub_status=RouteSubStatus.PENDING,
4949
traffic_ratio=self.traffic_ratio,
5050
revision=self.revision_id,
5151
traffic_status=self.traffic_status,

src/ai/backend/manager/repositories/deployment/db_source/db_source.py

Lines changed: 86 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from ai.backend.common.identifier.deployment_preset import DeploymentPresetID
3030
from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID
3131
from ai.backend.common.identifier.image import ImageID
32+
from ai.backend.common.identifier.replica import ReplicaID
3233
from ai.backend.common.identifier.resource_group import ResourceGroupName
3334
from ai.backend.common.identifier.runtime_variant import RuntimeVariantID
3435
from ai.backend.common.identifier.vfolder import VFolderUUID
@@ -179,6 +180,8 @@
179180
ProjectDeploymentSearchScope,
180181
RouteData,
181182
RouteServiceDiscoveryInfo,
183+
RouteSessionInfo,
184+
RouteSessionKernelInfo,
182185
)
183186
from ai.backend.manager.repositories.scheduler.types.session_creation import (
184187
ContainerUserContext,
@@ -1208,7 +1211,7 @@ async def get_endpoint_id_by_session(
12081211

12091212
async def fetch_route_service_discovery_info(
12101213
self,
1211-
route_ids: set[uuid.UUID],
1214+
route_ids: set[ReplicaID],
12121215
) -> list[RouteServiceDiscoveryInfo]:
12131216
"""Fetch service discovery information for routes.
12141217
@@ -1770,9 +1773,9 @@ async def update_route_status_bulk_with_history(
17701773
async def _get_last_route_histories_by_category(
17711774
self,
17721775
db_sess: SASession,
1773-
route_ids: list[uuid.UUID],
1776+
route_ids: list[ReplicaID],
17741777
category: RouteHandlerCategory,
1775-
) -> dict[uuid.UUID, RouteHistoryRow]:
1778+
) -> dict[ReplicaID, RouteHistoryRow]:
17761779
"""Get last history records per route filtered by handler category."""
17771780
if not route_ids:
17781781
return {}
@@ -1796,8 +1799,8 @@ async def _get_last_route_histories_by_category(
17961799
async def _get_last_route_histories_bulk(
17971800
self,
17981801
db_sess: SASession,
1799-
route_ids: list[uuid.UUID],
1800-
) -> dict[uuid.UUID, RouteHistoryRow]:
1802+
route_ids: list[ReplicaID],
1803+
) -> dict[ReplicaID, RouteHistoryRow]:
18011804
"""Get last history records for multiple routes efficiently."""
18021805
if not route_ids:
18031806
return {}
@@ -1927,15 +1930,15 @@ async def fetch_kernel_connection_info(
19271930

19281931
async def update_route_replica_info(
19291932
self,
1930-
updates: dict[uuid.UUID, tuple[str, int]],
1933+
updates: dict[ReplicaID, RouteSessionKernelInfo],
19311934
) -> None:
19321935
"""Update replica_host and replica_port for routes."""
19331936
async with self._begin_session_read_committed() as db_sess:
1934-
for route_id, (host, port) in updates.items():
1937+
for route_id, kernel in updates.items():
19351938
query = (
19361939
sa.update(RoutingRow)
19371940
.where(RoutingRow.id == route_id)
1938-
.values(replica_host=host, replica_port=port)
1941+
.values(replica_host=kernel.replica_host, replica_port=kernel.replica_port)
19391942
)
19401943
await db_sess.execute(query)
19411944

@@ -2129,8 +2132,8 @@ async def fetch_deployment_context(
21292132

21302133
async def fetch_session_statuses_by_route_ids(
21312134
self,
2132-
route_ids: set[uuid.UUID],
2133-
) -> Mapping[uuid.UUID, SessionStatus | None]:
2135+
route_ids: set[ReplicaID],
2136+
) -> Mapping[ReplicaID, SessionStatus | None]:
21342137
"""Fetch session statuses for multiple routes.
21352138
21362139
Args:
@@ -2158,12 +2161,83 @@ async def fetch_session_statuses_by_route_ids(
21582161
rows = result.all()
21592162

21602163
# 결과를 매핑으로 변환
2161-
status_map: dict[uuid.UUID, SessionStatus | None] = {}
2164+
status_map: dict[ReplicaID, SessionStatus | None] = {}
21622165
for route_id, session_status in rows:
2163-
status_map[route_id] = session_status
2166+
status_map[ReplicaID(route_id)] = session_status
21642167

21652168
return status_map
21662169

2170+
async def fetch_route_session_kernel_infos(
2171+
self,
2172+
route_ids: set[ReplicaID],
2173+
) -> Mapping[ReplicaID, RouteSessionInfo | None]:
2174+
"""Fetch session status and kernel connection info for multiple routes.
2175+
2176+
Args:
2177+
route_ids: Set of route IDs to fetch information for
2178+
2179+
Returns:
2180+
Mapping of route_id to RouteSessionInfo:
2181+
- None → route has no session linked
2182+
- RouteSessionInfo(status=TERMINAL, kernel=None) → session terminated
2183+
- RouteSessionInfo(status=RUNNING, kernel=RouteSessionKernelInfo(host, port)) → ready
2184+
- RouteSessionInfo(status=PREPARING, kernel=None) → not yet running
2185+
"""
2186+
if not route_ids:
2187+
return {}
2188+
2189+
async with self._begin_readonly_session_read_committed() as db_sess:
2190+
query = (
2191+
sa.select(
2192+
RoutingRow.id,
2193+
SessionRow.status,
2194+
KernelRow.kernel_host,
2195+
KernelRow.service_ports,
2196+
)
2197+
.select_from(RoutingRow)
2198+
.outerjoin(SessionRow, RoutingRow.session == SessionRow.id)
2199+
.outerjoin(
2200+
KernelRow,
2201+
sa.and_(
2202+
KernelRow.session_id == RoutingRow.session,
2203+
KernelRow.cluster_role == "main",
2204+
),
2205+
)
2206+
.where(RoutingRow.id.in_(route_ids))
2207+
)
2208+
2209+
result = await db_sess.execute(query)
2210+
rows = result.all()
2211+
2212+
info_map: dict[ReplicaID, RouteSessionInfo | None] = {}
2213+
for row in rows:
2214+
route_id = ReplicaID(row.id)
2215+
if row.status is None:
2216+
info_map[route_id] = None
2217+
continue
2218+
2219+
kernel: RouteSessionKernelInfo | None = None
2220+
if row.kernel_host and row.service_ports:
2221+
inference_port: int | None = None
2222+
for port_info in row.service_ports:
2223+
if port_info.get("is_inference", False):
2224+
host_ports = port_info.get("host_ports", [])
2225+
if host_ports:
2226+
inference_port = host_ports[0]
2227+
break
2228+
if inference_port is not None:
2229+
kernel = RouteSessionKernelInfo(
2230+
replica_host=row.kernel_host,
2231+
replica_port=inference_port,
2232+
)
2233+
2234+
info_map[route_id] = RouteSessionInfo(
2235+
status=row.status,
2236+
kernel=kernel,
2237+
)
2238+
2239+
return info_map
2240+
21672241
async def fetch_route_connection_infos(
21682242
self,
21692243
*,

src/ai/backend/manager/repositories/deployment/repository.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from ai.backend.common.identifier.deployment_preset import DeploymentPresetID
2222
from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID
2323
from ai.backend.common.identifier.image import ImageID
24+
from ai.backend.common.identifier.replica import ReplicaID
2425
from ai.backend.common.identifier.resource_group import ResourceGroupName
2526
from ai.backend.common.identifier.runtime_variant import RuntimeVariantID
2627
from ai.backend.common.identifier.vfolder import VFolderUUID
@@ -104,7 +105,13 @@
104105

105106
from .db_source import DeploymentDBSource
106107
from .storage_source import DeploymentStorageSource
107-
from .types import ProjectDeploymentSearchScope, RouteData, RouteServiceDiscoveryInfo
108+
from .types import (
109+
ProjectDeploymentSearchScope,
110+
RouteData,
111+
RouteServiceDiscoveryInfo,
112+
RouteSessionInfo,
113+
RouteSessionKernelInfo,
114+
)
108115

109116
log = BraceStyleAdapter(logging.getLogger(__name__))
110117

@@ -758,7 +765,7 @@ async def fetch_kernel_connection_info(
758765
@deployment_repository_resilience.apply()
759766
async def update_route_replica_info(
760767
self,
761-
updates: dict[uuid.UUID, tuple[str, int]],
768+
updates: dict[ReplicaID, RouteSessionKernelInfo],
762769
) -> None:
763770
"""Update replica_host and replica_port for routes."""
764771
await self._db_source.update_route_replica_info(updates)
@@ -1105,11 +1112,27 @@ async def calculate_desired_replicas_for_deployment(
11051112
@deployment_repository_resilience.apply()
11061113
async def fetch_session_statuses_by_route_ids(
11071114
self,
1108-
route_ids: set[uuid.UUID],
1109-
) -> Mapping[uuid.UUID, SessionStatus | None]:
1115+
route_ids: set[ReplicaID],
1116+
) -> Mapping[ReplicaID, SessionStatus | None]:
11101117
"""Fetch session IDs for multiple routes."""
11111118
return await self._db_source.fetch_session_statuses_by_route_ids(route_ids)
11121119

1120+
@deployment_repository_resilience.apply()
1121+
async def fetch_route_session_kernel_infos(
1122+
self,
1123+
route_ids: set[ReplicaID],
1124+
) -> Mapping[ReplicaID, RouteSessionInfo | None]:
1125+
"""Fetch session status and kernel connection info for multiple routes.
1126+
1127+
Returns:
1128+
Mapping of route_id to RouteSessionInfo:
1129+
- None → route has no session linked
1130+
- RouteSessionInfo(status=TERMINAL, kernel=None) → session terminated
1131+
- RouteSessionInfo(status=RUNNING, kernel=RouteSessionKernelInfo(host, port)) → ready
1132+
- RouteSessionInfo(status=PREPARING, kernel=None) → not yet running
1133+
"""
1134+
return await self._db_source.fetch_route_session_kernel_infos(route_ids)
1135+
11131136
@deployment_repository_resilience.apply()
11141137
async def fetch_route_connection_infos(
11151138
self,
@@ -1150,7 +1173,7 @@ async def get_endpoint_id_by_session(
11501173
@deployment_repository_resilience.apply()
11511174
async def fetch_route_service_discovery_info(
11521175
self,
1153-
route_ids: set[uuid.UUID],
1176+
route_ids: set[ReplicaID],
11541177
) -> list[RouteServiceDiscoveryInfo]:
11551178
"""Fetch service discovery information for routes.
11561179

src/ai/backend/manager/repositories/deployment/types/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
ProjectDeploymentSearchScope,
88
RouteData,
99
RouteServiceDiscoveryInfo,
10+
RouteSessionInfo,
11+
RouteSessionKernelInfo,
1012
)
1113

1214
__all__ = [
@@ -16,4 +18,6 @@
1618
"ProjectDeploymentSearchScope",
1719
"RouteData",
1820
"RouteServiceDiscoveryInfo",
21+
"RouteSessionInfo",
22+
"RouteSessionKernelInfo",
1923
]

0 commit comments

Comments
 (0)