Skip to content

Commit 0233669

Browse files
committed
refactor(BA-5650-F): propagate owner_id rename into sokovan
Rename access_key -> main_access_key on sokovan data types (SessionAllocation, PreparedSessionData, SessionDataForPull, SessionDataForStart, SessionWorkload) and update every sokovan caller accordingly. Affects: - sokovan/data/{allocation,lifecycle,workload}.py - sokovan/scheduler/handlers/lifecycle/* - sokovan/scheduler/handlers/maintenance/sweep_sessions.py - sokovan/scheduler/provisioner/{provisioner,sequencers,validators}/* - sokovan/scheduler/launcher/launcher.py - sokovan/scheduler/post_processors/cache_invalidation.py - sokovan/scheduler/fair_share/aggregator.py - sokovan/scheduling_controller/{preparers,scheduling_controller}.py - sokovan/deployment/{executor,route}.py No external behavior change.
1 parent 3b7a216 commit 0233669

27 files changed

Lines changed: 90 additions & 123 deletions

changes/BA-5650-F.misc.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Propagate the `owner_id` / `main_access_key` signature rename into the sokovan data classes (`allocation`, `lifecycle`, `workload`), scheduler handlers, provisioner validators, launcher, scheduling controller, sequencers, and deployment executor. No external behavior change.

src/ai/backend/manager/sokovan/data/allocation.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@ class SessionAllocation:
8080
kernel_allocations: list[KernelAllocation]
8181
# List of agent allocations for this session
8282
agent_allocations: list[AgentAllocation]
83-
# Keypair associated with the session
84-
access_key: AccessKey
83+
main_access_key: AccessKey
8584
# Phases that passed during scheduling
8685
passed_phases: list[SchedulingPredicate] = field(default_factory=list)
8786
# Phases that failed during scheduling (normally empty for successful allocations)
@@ -141,7 +140,7 @@ def from_agent_selections(
141140
scaling_group=scaling_group,
142141
kernel_allocations=kernel_allocations,
143142
agent_allocations=agent_allocations,
144-
access_key=session_workload.access_key,
143+
access_key=session_workload.main_access_key,
145144
)
146145

147146
def unique_agent_ids(self) -> list[AgentId]:

src/ai/backend/manager/sokovan/data/lifecycle.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class SessionDataForPull:
6464

6565
session_id: SessionId
6666
creation_id: str
67-
access_key: AccessKey
67+
main_access_key: AccessKey
6868
kernels: list[KernelBindingData]
6969

7070

@@ -74,12 +74,12 @@ class SessionDataForStart:
7474

7575
session_id: SessionId
7676
creation_id: str
77-
access_key: AccessKey
77+
main_access_key: AccessKey
7878
session_type: SessionTypes
7979
name: str
8080
cluster_mode: ClusterMode
8181
kernels: list[KernelBindingData]
82-
user_uuid: UUID
82+
owner_id: UUID
8383
user_email: str
8484
user_name: str
8585
environ: dict[str, str]
@@ -93,13 +93,13 @@ class ScheduledSessionData:
9393

9494
session_id: SessionId
9595
creation_id: str
96-
access_key: AccessKey
96+
main_access_key: AccessKey
9797
session_type: SessionTypes
9898
name: str
9999
kernels: list[KernelBindingData]
100100
# Additional fields for PREPARED sessions
101101
cluster_mode: ClusterMode | None = None
102-
user_uuid: UUID | None = None
102+
owner_id: UUID | None = None
103103
user_email: str | None = None
104104
user_name: str | None = None
105105
network_type: NetworkType | None = None
@@ -157,12 +157,12 @@ class PreparedSessionData:
157157

158158
session_id: SessionId
159159
creation_id: str
160-
access_key: AccessKey
160+
main_access_key: AccessKey
161161
session_type: SessionTypes
162162
name: str
163163
cluster_mode: ClusterMode
164164
kernels: list[KernelStartData]
165-
user_uuid: UUID
165+
owner_id: UUID
166166
user_email: str
167167
user_name: str
168168
network_type: NetworkType | None = None

src/ai/backend/manager/sokovan/data/workload.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,11 @@ class SessionWorkload:
7777

7878
# Session identifier
7979
session_id: SessionId
80-
# User identification for fairness calculation
81-
access_key: AccessKey
80+
main_access_key: AccessKey
8281
# Resource requirements
8382
requested_slots: ResourceSlot
84-
# User UUID for user resource limit checks
85-
user_uuid: UUID
83+
# Owner (user) UUID for user resource limit checks
84+
owner_id: UUID
8685
# Group ID for group resource limit checks
8786
group_id: UUID
8887
# Domain name for domain resource limit checks

src/ai/backend/manager/sokovan/deployment/executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
ClientPool,
1313
)
1414
from ai.backend.common.clients.prometheus.client import PrometheusClient
15-
from ai.backend.common.clients.prometheus.preset import LabelMatcher, MetricPreset
15+
from ai.backend.common.clients.prometheus.preset import MetricPreset
1616
from ai.backend.common.clients.valkey_client.valkey_stat.client import ValkeyStatClient
1717
from ai.backend.common.config import ModelHealthCheck
1818
from ai.backend.common.data.permission.types import RBACElementType
@@ -762,7 +762,7 @@ async def _fetch_prometheus_metric(
762762
preset_data: PrometheusQueryPresetData = presets[preset_id]
763763

764764
# Auto-inject deployment-specific label for scoping
765-
labels = {"model_service_name": LabelMatcher.exact(deployment.metadata.name)}
765+
labels: dict[str, str] = {"model_service_name": deployment.metadata.name}
766766

767767
# time_window: preset default → fallback to "5m"
768768
time_window = preset_data.time_window or "5m"

src/ai/backend/manager/sokovan/deployment/route/coordinator.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -370,15 +370,6 @@ async def _handle_status_transitions(
370370
batch_updaters, BulkCreator(specs=all_history_specs)
371371
)
372372

373-
# Record running_at in Valkey for routes that just transitioned to RUNNING
374-
if (
375-
transitions.success is not None
376-
and transitions.success.status == RouteStatus.RUNNING
377-
and result.successes
378-
):
379-
for route in result.successes:
380-
await self._valkey_schedule.mark_route_running_at(str(route.route_id))
381-
382373
async def process_if_needed(self, lifecycle_type: RouteLifecycleType) -> None:
383374
"""
384375
Process route lifecycle operation if needed (based on internal state).

src/ai/backend/manager/sokovan/deployment/route/executor.py

Lines changed: 2 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,6 @@ async def check_running_routes(self, routes: Sequence[RouteData]) -> RouteExecut
193193
with RouteRecorderContext.shared_step("fetch_kernel_connection_info"):
194194
await self._populate_replica_info(routes_missing_replica)
195195

196-
# Phase 4: Ensure RouteHealthRecords exist in Valkey for routes with replica info
197-
routes_with_replica = [r for r in successes if r.replica_host and r.replica_port]
198-
if routes_with_replica:
199-
await self._ensure_health_records(routes_with_replica)
200-
201196
return RouteExecutionResult(
202197
successes=successes,
203198
errors=errors,
@@ -225,29 +220,6 @@ async def _populate_replica_info(self, routes: Sequence[RouteData]) -> None:
225220
if populated_routes:
226221
await self._initialize_health_records(populated_routes, updates)
227222

228-
async def _ensure_health_records(self, routes: Sequence[RouteData]) -> None:
229-
"""Ensure RouteHealthRecords exist in Valkey for routes that already have replica info.
230-
231-
Routes may already have replica_host/port in DB (set by a previous cycle or legacy code)
232-
but lack a RouteHealthRecord in Valkey. This method checks and initializes missing records.
233-
"""
234-
route_id_strs = [str(r.route_id) for r in routes]
235-
existing = await self._valkey_schedule.get_route_health_records_batch(route_id_strs)
236-
missing = [r for r in routes if existing.get(str(r.route_id)) is None]
237-
if not missing:
238-
return
239-
log.warning(
240-
"RouteHealthRecord missing in Valkey for {} routes, re-initializing: {}",
241-
len(missing),
242-
[str(r.route_id)[:8] for r in missing],
243-
)
244-
replica_info = {
245-
r.route_id: (r.replica_host, r.replica_port)
246-
for r in missing
247-
if r.replica_host and r.replica_port
248-
}
249-
await self._initialize_health_records(missing, replica_info)
250-
251223
async def _initialize_health_records(
252224
self,
253225
routes: Sequence[RouteData],
@@ -258,14 +230,6 @@ async def _initialize_health_records(
258230
health_configs = await self._deployment_repo.fetch_health_check_configs_by_revision_ids(
259231
revision_ids
260232
)
261-
redis_time = await self._valkey_schedule.get_redis_time()
262-
263-
# Read existing running_at values that were set when routes transitioned to RUNNING
264-
# These may be in partial hashes (only running_at field), so read raw field directly
265-
running_at_map = await self._valkey_schedule.get_route_running_at_batch([
266-
str(r.route_id) for r in routes
267-
])
268-
269233
records: list[RouteHealthRecord] = []
270234
for route in routes:
271235
host, port = replica_info[route.route_id]
@@ -274,21 +238,16 @@ async def _initialize_health_records(
274238
health_path = health_config.path if health_config else "/"
275239
initial_delay = health_config.initial_delay if health_config else 60.0
276240
created_at = int(route.created_at.timestamp())
277-
278-
# Use running_at from Valkey (set at RUNNING transition), fallback to redis_time
279-
route_id_str = str(route.route_id)
280-
running_at = running_at_map.get(route_id_str) or redis_time
281-
initial_delay_until = running_at + int(initial_delay)
241+
initial_delay_until = created_at + int(initial_delay)
282242

283243
records.append(
284244
RouteHealthRecord(
285-
route_id=route_id_str,
245+
route_id=str(route.route_id),
286246
created_at=created_at,
287247
initial_delay_until=initial_delay_until,
288248
health_path=health_path,
289249
inference_port=port,
290250
replica_host=host,
291-
running_at=running_at,
292251
)
293252
)
294253

src/ai/backend/manager/sokovan/deployment/route/handlers/observer/health_check.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,6 @@ async def observe(self, routes: Sequence[RouteData]) -> RouteObservationResult:
7373
targets.append((route_id_str, record))
7474

7575
if not targets:
76-
if checkable:
77-
log.warning(
78-
"Health observer: {} checkable routes but 0 have records in Valkey",
79-
len(checkable),
80-
)
8176
return RouteObservationResult(observed_count=0)
8277

8378
# Perform HTTP health checks in parallel

src/ai/backend/manager/sokovan/scheduler/coordinator.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
)
2626
from ai.backend.common.events.types import AbstractBroadcastEvent
2727
from ai.backend.common.leader.tasks import EventTaskSpec
28-
from ai.backend.common.types import AccessKey, AgentId, SessionId
28+
from ai.backend.common.types import AgentId, SessionId
2929
from ai.backend.logging import BraceStyleAdapter
3030
from ai.backend.manager.config.provider import ManagerConfigProvider
3131
from ai.backend.manager.data.kernel.types import KernelStatus
@@ -831,6 +831,8 @@ async def _process_promotion_scaling_group(
831831
"check_kernel_status",
832832
success_detail=f"All kernels ready for {spec.success_status.value}",
833833
):
834+
# BA-5609: resolve main_access_key for cache invalidation consumer.
835+
access_key_by_id = await self._repository.resolve_main_access_keys(session_ids)
834836
result = SessionExecutionResult()
835837
for session_info in session_infos:
836838
result.successes.append(
@@ -839,7 +841,7 @@ async def _process_promotion_scaling_group(
839841
from_status=session_info.lifecycle.status,
840842
reason=spec.reason,
841843
creation_id=session_info.identity.creation_id,
842-
access_key=AccessKey(session_info.metadata.access_key),
844+
access_key=access_key_by_id.get(session_info.identity.id),
843845
)
844846
)
845847

src/ai/backend/manager/sokovan/scheduler/fair_share/aggregator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ def _generate_slice_specs(
484484
spec = KernelUsageRecordCreatorSpec(
485485
kernel_id=UUID(str(kernel.id)),
486486
session_id=UUID(kernel.session.session_id),
487-
user_uuid=kernel.user_permission.user_uuid,
487+
user_uuid=kernel.user_permission.owner_id,
488488
project_id=kernel.user_permission.group_id,
489489
domain_name=kernel.user_permission.domain_name,
490490
resource_group=scaling_group,

0 commit comments

Comments
 (0)