Skip to content

Commit a2e1502

Browse files
committed
refactor(BA-5714): drop stray deployment changes and restore dropped comments
- Revert deployment/executor.py, deployment/route/coordinator.py, deployment/route/executor.py, deployment/route/handlers/observer/ health_check.py to the parent branch state — those edits pre-dated main and are unrelated to BA-5650; they slipped in when this slice pulled the whole sokovan/ subtree. - sokovan/data/allocation.py: restore the explanatory comment above ``main_access_key`` and fix ``from_agent_selections`` to pass ``main_access_key=`` (matches the renamed field). - sokovan/data/workload.py: restore the explanatory comment above ``SessionWorkload.main_access_key``.
1 parent 9d05db6 commit a2e1502

6 files changed

Lines changed: 62 additions & 5 deletions

File tree

src/ai/backend/manager/sokovan/data/allocation.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class SessionAllocation:
8080
kernel_allocations: list[KernelAllocation]
8181
# List of agent allocations for this session
8282
agent_allocations: list[AgentAllocation]
83+
# Owner's resolved main_access_key; required for keypair-scoped concurrency tracking and resource policy lookups.
8384
main_access_key: AccessKey
8485
# Phases that passed during scheduling
8586
passed_phases: list[SchedulingPredicate] = field(default_factory=list)
@@ -140,7 +141,7 @@ def from_agent_selections(
140141
scaling_group=scaling_group,
141142
kernel_allocations=kernel_allocations,
142143
agent_allocations=agent_allocations,
143-
access_key=session_workload.main_access_key,
144+
main_access_key=session_workload.main_access_key,
144145
)
145146

146147
def unique_agent_ids(self) -> list[AgentId]:

src/ai/backend/manager/sokovan/data/workload.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ class SessionWorkload:
7777

7878
# Session identifier
7979
session_id: SessionId
80+
# Owner's resolved main_access_key; required for keypair-scoped concurrency tracking and resource policy lookups.
8081
main_access_key: AccessKey
8182
# Resource requirements
8283
requested_slots: ResourceSlot

src/ai/backend/manager/sokovan/deployment/executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
ClientPool,
1313
)
1414
from ai.backend.common.clients.prometheus.client import PrometheusClient
15-
from ai.backend.common.clients.prometheus.preset import MetricPreset
15+
from ai.backend.common.clients.prometheus.preset import LabelMatcher, MetricPreset
1616
from ai.backend.common.clients.valkey_client.valkey_stat.client import ValkeyStatClient
1717
from ai.backend.common.config import ModelHealthCheck
1818
from ai.backend.common.data.permission.types import RBACElementType
@@ -762,7 +762,7 @@ async def _fetch_prometheus_metric(
762762
preset_data: PrometheusQueryPresetData = presets[preset_id]
763763

764764
# Auto-inject deployment-specific label for scoping
765-
labels: dict[str, str] = {"model_service_name": deployment.metadata.name}
765+
labels = {"model_service_name": LabelMatcher.exact(deployment.metadata.name)}
766766

767767
# time_window: preset default → fallback to "5m"
768768
time_window = preset_data.time_window or "5m"

src/ai/backend/manager/sokovan/deployment/route/coordinator.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,15 @@ async def _handle_status_transitions(
370370
batch_updaters, BulkCreator(specs=all_history_specs)
371371
)
372372

373+
# Record running_at in Valkey for routes that just transitioned to RUNNING
374+
if (
375+
transitions.success is not None
376+
and transitions.success.status == RouteStatus.RUNNING
377+
and result.successes
378+
):
379+
for route in result.successes:
380+
await self._valkey_schedule.mark_route_running_at(str(route.route_id))
381+
373382
async def process_if_needed(self, lifecycle_type: RouteLifecycleType) -> None:
374383
"""
375384
Process route lifecycle operation if needed (based on internal state).

src/ai/backend/manager/sokovan/deployment/route/executor.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ async def check_running_routes(self, routes: Sequence[RouteData]) -> RouteExecut
193193
with RouteRecorderContext.shared_step("fetch_kernel_connection_info"):
194194
await self._populate_replica_info(routes_missing_replica)
195195

196+
# Phase 4: Ensure RouteHealthRecords exist in Valkey for routes with replica info
197+
routes_with_replica = [r for r in successes if r.replica_host and r.replica_port]
198+
if routes_with_replica:
199+
await self._ensure_health_records(routes_with_replica)
200+
196201
return RouteExecutionResult(
197202
successes=successes,
198203
errors=errors,
@@ -220,6 +225,29 @@ async def _populate_replica_info(self, routes: Sequence[RouteData]) -> None:
220225
if populated_routes:
221226
await self._initialize_health_records(populated_routes, updates)
222227

228+
async def _ensure_health_records(self, routes: Sequence[RouteData]) -> None:
229+
"""Ensure RouteHealthRecords exist in Valkey for routes that already have replica info.
230+
231+
Routes may already have replica_host/port in DB (set by a previous cycle or legacy code)
232+
but lack a RouteHealthRecord in Valkey. This method checks and initializes missing records.
233+
"""
234+
route_id_strs = [str(r.route_id) for r in routes]
235+
existing = await self._valkey_schedule.get_route_health_records_batch(route_id_strs)
236+
missing = [r for r in routes if existing.get(str(r.route_id)) is None]
237+
if not missing:
238+
return
239+
log.warning(
240+
"RouteHealthRecord missing in Valkey for {} routes, re-initializing: {}",
241+
len(missing),
242+
[str(r.route_id)[:8] for r in missing],
243+
)
244+
replica_info = {
245+
r.route_id: (r.replica_host, r.replica_port)
246+
for r in missing
247+
if r.replica_host and r.replica_port
248+
}
249+
await self._initialize_health_records(missing, replica_info)
250+
223251
async def _initialize_health_records(
224252
self,
225253
routes: Sequence[RouteData],
@@ -230,6 +258,14 @@ async def _initialize_health_records(
230258
health_configs = await self._deployment_repo.fetch_health_check_configs_by_revision_ids(
231259
revision_ids
232260
)
261+
redis_time = await self._valkey_schedule.get_redis_time()
262+
263+
# Read existing running_at values that were set when routes transitioned to RUNNING
264+
# These may be in partial hashes (only running_at field), so read raw field directly
265+
running_at_map = await self._valkey_schedule.get_route_running_at_batch([
266+
str(r.route_id) for r in routes
267+
])
268+
233269
records: list[RouteHealthRecord] = []
234270
for route in routes:
235271
host, port = replica_info[route.route_id]
@@ -238,16 +274,21 @@ async def _initialize_health_records(
238274
health_path = health_config.path if health_config else "/"
239275
initial_delay = health_config.initial_delay if health_config else 60.0
240276
created_at = int(route.created_at.timestamp())
241-
initial_delay_until = created_at + int(initial_delay)
277+
278+
# Use running_at from Valkey (set at RUNNING transition), fallback to redis_time
279+
route_id_str = str(route.route_id)
280+
running_at = running_at_map.get(route_id_str) or redis_time
281+
initial_delay_until = running_at + int(initial_delay)
242282

243283
records.append(
244284
RouteHealthRecord(
245-
route_id=str(route.route_id),
285+
route_id=route_id_str,
246286
created_at=created_at,
247287
initial_delay_until=initial_delay_until,
248288
health_path=health_path,
249289
inference_port=port,
250290
replica_host=host,
291+
running_at=running_at,
251292
)
252293
)
253294

src/ai/backend/manager/sokovan/deployment/route/handlers/observer/health_check.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ async def observe(self, routes: Sequence[RouteData]) -> RouteObservationResult:
7373
targets.append((route_id_str, record))
7474

7575
if not targets:
76+
if checkable:
77+
log.warning(
78+
"Health observer: {} checkable routes but 0 have records in Valkey",
79+
len(checkable),
80+
)
7681
return RouteObservationResult(observed_count=0)
7782

7883
# Perform HTTP health checks in parallel

0 commit comments

Comments
 (0)