Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7703255
refactor(BA-5650-D): switch session repository to owner_id
jopemachine Apr 14, 2026
1dbb5ef
refactor(BA-5650-F): propagate owner_id rename into sokovan
jopemachine Apr 14, 2026
88bdd72
refactor(BA-5715): drop stale misc fragments from intermediate slices
jopemachine Apr 14, 2026
183a263
fix(BA-5650-G): align remaining test fixtures and adapter call sites
jopemachine Apr 14, 2026
8192de4
breaking(BA-5650-H): drop owner_access_key from REST v1 session API
jopemachine Apr 14, 2026
6a3887b
refactor(BA-5650-I): test and remaining ORM updates
jopemachine Apr 14, 2026
0525dcb
docs: rename news fragment to 11051; drop stale 10916.breaking.md
jopemachine Apr 14, 2026
8fe8c7e
chore: update api schema dump
jopemachine Apr 14, 2026
a89a65f
docs(BA-5650): use enhance news fragment type for slice
jopemachine Apr 14, 2026
95b1bba
refactor(BA-5650-I): drop stray non-BA-5650 changes from slice
jopemachine Apr 14, 2026
d95ce09
fix(BA-5650-I): resolve cascaded rebase conflicts and align tests
jopemachine Apr 14, 2026
f248c1c
refactor(BA-5650-D): switch session repository to owner_id
jopemachine Apr 14, 2026
250f12e
refactor(BA-5650-F): propagate owner_id rename into sokovan
jopemachine Apr 14, 2026
a709085
refactor(BA-5715): drop stale misc fragments from intermediate slices
jopemachine Apr 14, 2026
20c339f
fix(BA-5650-G): align remaining test fixtures and adapter call sites
jopemachine Apr 14, 2026
4804704
refactor(BA-5650-I): test and remaining ORM updates
jopemachine Apr 14, 2026
6b5011e
refactor(BA-5650-I): drop stray non-BA-5650 changes from slice
jopemachine Apr 14, 2026
a8e467a
fix(BA-5650-I): resolve cascaded rebase conflicts and align tests
jopemachine Apr 14, 2026
e6b048b
breaking(BA-5650-H): drop owner_access_key from REST v1 session API
jopemachine Apr 14, 2026
a7488d0
refactor(BA-5650-I): test and remaining ORM updates
jopemachine Apr 14, 2026
1a4ed0b
fix(BA-5650-I): resolve cascaded rebase conflicts and align tests
jopemachine Apr 14, 2026
410379e
refactor(BA-5650-I): test and remaining ORM updates
jopemachine Apr 14, 2026
e6c3ff5
fix(BA-5650-I): resolve cascaded rebase conflicts and align tests
jopemachine Apr 14, 2026
53fbe69
refactor(BA-5650-I): test and remaining ORM updates
jopemachine Apr 14, 2026
5573b20
breaking(BA-5653): drop sessions/kernels access_key columns
jopemachine Apr 14, 2026
a64ac5c
fix(BA-5653): address review feedback on schema migration
jopemachine Apr 14, 2026
23aeac6
fix(BA-5653): resolve cascaded rebase conflicts on top of slice I
jopemachine Apr 14, 2026
b41888f
chore: update api schema dump
jopemachine Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/10916.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Change the REST v1 session API's delegation mechanism from the `owner_access_key` query parameter to an `owner_id` (user UUID) field. The `owner_access_key` parameter is removed from all session endpoints (`GET /session/{name}`, `DELETE /session/{name}`, `POST /session/_/create-from-template`, `POST /session/_/create`, `POST /session/_/create-cluster`, `GET /session/{name}/logs`, `GET /session/{name}/status-history`, etc.). Clients that previously passed `owner_access_key=<keypair>` to act on behalf of another user must now pass `owner_id=<user uuid>` (and only on the session-creation endpoints; for read/control endpoints the caller always acts as themselves). Session and kernel `access_key` semantics also change: the value is no longer tied to the keypair used to create the session but resolved at read time from the owner's `main_access_key`.
1 change: 1 addition & 0 deletions changes/11040.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Drop the `access_key` column from `sessions` and `kernels` tables; the owner's keypair is now resolved from `users.main_access_key` at read time, with `user_uuid` remaining as the canonical owner reference.
1 change: 1 addition & 0 deletions changes/11051.enhance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Test updates and remaining ORM/repository touch-ups for the BA-5650 stack: scheduler db_source, keypair and endpoint row cleanup, gql_legacy endpoint/routing, session lifecycle/service tests, sokovan scheduler tests, and dependency-injection tests.
1 change: 0 additions & 1 deletion changes/BA-5650-E.misc.md

This file was deleted.

1 change: 1 addition & 0 deletions changes/BA-5650-H.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
**Breaking**: Remove `owner_access_key` query parameter from REST v1 session endpoints. Delegation is now performed via `owner_id` (user UUID) and only on the session creation endpoints (`/session/_/create-from-template`, `/session/_/create`, `/session/_/create-cluster`). Read/control endpoints always act as the authenticated caller. Clients that previously passed `owner_access_key=<AK>` must migrate to `owner_id=<user-uuid>`.
1 change: 1 addition & 0 deletions changes/BA-5650-I.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Test updates and remaining ORM/repository touch-ups for the BA-5650 stack: scheduler db_source, keypair and endpoint row cleanup, gql_legacy endpoint/routing, session lifecycle/service tests, sokovan scheduler tests, and dependency-injection tests. No external behavior change beyond what earlier slices documented.
30 changes: 6 additions & 24 deletions dev
Original file line number Diff line number Diff line change
Expand Up @@ -149,32 +149,14 @@ cmd_restart() {

cmd_log() {
local svc=$1
local follow=${2:-}
local winname
winname=$(_tmux_window_name "$svc")
local win
win=$(tmux list-windows -t "$TMUX_SESSION" -F "#{window_name}" 2>/dev/null | grep "^${winname}$" | head -1) || true
if [ -z "$win" ]; then
echo "$(_color red "No tmux window found for $svc")"
return 1
fi
if [ "$follow" = "-f" ]; then
local last_hash=""
trap 'exit 0' INT
while true; do
local output
output=$(tmux capture-pane -t "$TMUX_SESSION:$win" -p -S -50 2>/dev/null)
local cur_hash
cur_hash=$(echo "$output" | md5sum | cut -d' ' -f1)
if [ "$cur_hash" != "$last_hash" ]; then
clear
echo "$output"
last_hash=$cur_hash
fi
sleep 1
done
else
if [ -n "$win" ]; then
tmux capture-pane -t "$TMUX_SESSION:$win" -p -S -50
else
echo "$(_color red "No tmux window found for $svc")"
fi
}

Expand All @@ -188,7 +170,7 @@ Commands:
start <service|all> Start a service
stop <service|all> Stop a service
restart <service|all> Restart a service
log <service> [-f] Show recent log output (-f to follow)
log <service> Show recent log output

Services:
mgr, agent, storage, web, proxy-coordinator, proxy-worker, all
Expand Down Expand Up @@ -225,9 +207,9 @@ case "$1" in
"cmd_$1" "$2"
;;
log)
[ $# -lt 2 ] && { echo "$(_color red "Usage: ./dev log <service> [-f]")"; exit 1; }
[ $# -lt 2 ] && { echo "$(_color red "Usage: ./dev log <service>")"; exit 1; }
_validate_service "$2"
cmd_log "$2" "${3:-}"
cmd_log "$2"
;;
*) echo "$(_color red "Unknown command: $1")"; usage; exit 1 ;;
esac
3 changes: 0 additions & 3 deletions docs/manager/graphql-reference/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1836,9 +1836,6 @@ type Routing implements Item {
endpoint: String
session: UUID
status: String

"""Added in 26.4.1."""
health_status: String
traffic_ratio: Float
created_at: DateTime
error_data: JSONString
Expand Down
17 changes: 2 additions & 15 deletions docs/manager/graphql-reference/supergraph.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,6 @@ input AddRevisionInput
extraMounts: [ExtraVFolderMountInput!] = null
}

"""Added in 26.4.1. Options for the add_model_revision mutation."""
input AddRevisionOptions
@join__type(graph: STRAWBERRY)
{
"""
When true, automatically activate the newly added revision immediately after creation.
"""
autoActivate: Boolean! = false
}

"""Added in 25.19.0. Payload for adding a revision."""
type AddRevisionPayload
@join__type(graph: STRAWBERRY)
Expand Down Expand Up @@ -4683,9 +4673,9 @@ input DeploymentOrderBy
enum DeploymentOrderField
@join__type(graph: STRAWBERRY)
{
NAME @join__enumValue(graph: STRAWBERRY)
CREATED_AT @join__enumValue(graph: STRAWBERRY)
UPDATED_AT @join__enumValue(graph: STRAWBERRY)
NAME @join__enumValue(graph: STRAWBERRY)
}

"""Added in 25.19.0. Deployment policy configuration."""
Expand Down Expand Up @@ -10019,7 +10009,7 @@ type Mutation
syncReplicas(input: SyncReplicaInput!): SyncReplicaPayload! @join__field(graph: STRAWBERRY)

"""Added in 25.16.0. Add model revision."""
addModelRevision(input: AddRevisionInput!, options: AddRevisionOptions = null): AddRevisionPayload! @join__field(graph: STRAWBERRY)
addModelRevision(input: AddRevisionInput!): AddRevisionPayload! @join__field(graph: STRAWBERRY)

"""
Added in 26.4.1. Create or update the deployment policy for a given deployment (upsert semantics). If the deployment already has a policy, it is replaced entirely with the new configuration
Expand Down Expand Up @@ -14933,9 +14923,6 @@ type Routing implements Item
endpoint: String
session: UUID
status: String

"""Added in 26.4.1."""
health_status: String
traffic_ratio: Float
created_at: DateTime
error_data: JSONString
Expand Down
12 changes: 2 additions & 10 deletions docs/manager/graphql-reference/v2-schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,6 @@ input AddRevisionInput {
extraMounts: [ExtraVFolderMountInput!] = null
}

"""Added in 26.4.1. Options for the add_model_revision mutation."""
input AddRevisionOptions {
"""
When true, automatically activate the newly added revision immediately after creation.
"""
autoActivate: Boolean! = false
}

"""Added in 25.19.0. Payload for adding a revision."""
type AddRevisionPayload {
"""Added revision"""
Expand Down Expand Up @@ -3012,9 +3004,9 @@ input DeploymentOrderBy {
}

enum DeploymentOrderField {
NAME
CREATED_AT
UPDATED_AT
NAME
}

"""Added in 25.19.0. Deployment policy configuration."""
Expand Down Expand Up @@ -6021,7 +6013,7 @@ type Mutation {
syncReplicas(input: SyncReplicaInput!): SyncReplicaPayload!

"""Added in 25.16.0. Add model revision."""
addModelRevision(input: AddRevisionInput!, options: AddRevisionOptions = null): AddRevisionPayload!
addModelRevision(input: AddRevisionInput!): AddRevisionPayload!

"""
Added in 26.4.1. Create or update the deployment policy for a given deployment (upsert semantics). If the deployment already has a policy, it is replaced entirely with the new configuration
Expand Down
4 changes: 1 addition & 3 deletions src/ai/backend/common/clients/prometheus/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from .client import PrometheusClient
from .preset import LabelMatcher, LabelOperator, MetricPreset
from .preset import MetricPreset
from .querier import ContainerMetricQuerier, MetricQuerier
from .types import ValueType

__all__ = [
"LabelMatcher",
"LabelOperator",
"PrometheusClient",
"MetricPreset",
"MetricQuerier",
Expand Down
32 changes: 2 additions & 30 deletions src/ai/backend/common/clients/prometheus/preset.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,5 @@
from collections.abc import Mapping, Set
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Self


class LabelOperator(StrEnum):
EQUAL = "="
NOT_EQUAL = "!="
REGEX = "=~"
NOT_REGEX = "!~"


@dataclass(frozen=True)
class LabelMatcher:
"""PromQL label matcher with an explicit operator."""

value: str
operator: LabelOperator = LabelOperator.EQUAL

@classmethod
def exact(cls, value: str) -> Self:
return cls(value=value, operator=LabelOperator.EQUAL)

@classmethod
def regex(cls, value: str) -> Self:
return cls(value=value, operator=LabelOperator.REGEX)


def _escape_label_value(value: str) -> str:
Expand All @@ -40,7 +15,7 @@ class MetricPreset:
template: str

# Query labels (injected into {labels} placeholder)
labels: Mapping[str, LabelMatcher] = field(default_factory=dict)
labels: Mapping[str, str] = field(default_factory=dict)

# Group by labels (injected into {group_by} placeholder)
group_by: Set[str] = field(default_factory=frozenset)
Expand All @@ -50,10 +25,7 @@ class MetricPreset:

def render(self) -> str:
"""Render the PromQL query with all values injected."""
label_str = ",".join(
f'{key}{value.operator}"{_escape_label_value(value.value)}"'
for key, value in self.labels.items()
)
label_str = ",".join(f'{k}="{_escape_label_value(v)}"' for k, v in self.labels.items())
return self.template.format(
labels=label_str,
window=self.window,
Expand Down
21 changes: 10 additions & 11 deletions src/ai/backend/common/clients/prometheus/querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from dataclasses import dataclass
from uuid import UUID

from ai.backend.common.clients.prometheus.preset import LabelMatcher
from ai.backend.common.clients.prometheus.types import ValueType


Expand All @@ -15,7 +14,7 @@ class MetricQuerier(ABC):
"""

@abstractmethod
def labels(self) -> Mapping[str, LabelMatcher]:
def labels(self) -> Mapping[str, str]:
"""Return the labels to be used in the Prometheus query."""
...

Expand All @@ -35,22 +34,22 @@ class ContainerMetricQuerier(MetricQuerier):
user_id: UUID | None = None
project_id: UUID | None = None

def labels(self) -> Mapping[str, LabelMatcher]:
def labels(self) -> Mapping[str, str]:
"""Return the labels for the container metric query."""
result: dict[str, LabelMatcher] = {
"container_metric_name": LabelMatcher.exact(self.metric_name),
"value_type": LabelMatcher.exact(self.value_type),
result: dict[str, str] = {
"container_metric_name": self.metric_name,
"value_type": self.value_type,
}
if self.kernel_id is not None:
result["kernel_id"] = LabelMatcher.exact(str(self.kernel_id))
result["kernel_id"] = str(self.kernel_id)
if self.session_id is not None:
result["session_id"] = LabelMatcher.exact(str(self.session_id))
result["session_id"] = str(self.session_id)
if self.agent_id is not None:
result["agent_id"] = LabelMatcher.exact(self.agent_id)
result["agent_id"] = self.agent_id
if self.user_id is not None:
result["user_id"] = LabelMatcher.exact(str(self.user_id))
result["user_id"] = str(self.user_id)
if self.project_id is not None:
result["project_id"] = LabelMatcher.exact(str(self.project_id))
result["project_id"] = str(self.project_id)
return result

def group_by_labels(self) -> frozenset[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,11 @@ class RouteHealthRecord:

route_id: str
created_at: int # Unix timestamp when route was created
initial_delay_until: int # Unix timestamp = running_at + initial_delay
initial_delay_until: int # Unix timestamp = created_at + initial_delay
health_path: str # extracted from model_definition
inference_port: int # extracted from kernel
replica_host: str # extracted from kernel

# Timestamp when route entered RUNNING state (set by coordinator)
running_at: int | None = None

# Agent check results
agent_healthy: bool = False
agent_last_check: int = 0 # Unix timestamp
Expand Down Expand Up @@ -131,7 +128,7 @@ def is_stale(self, current_time: int, staleness_sec: int = MAX_HEALTH_STALENESS_

def to_valkey_hash(self) -> Mapping[str, str]:
"""Serialize to Valkey hash fields."""
data: dict[str, str] = {
return {
"route_id": self.route_id,
"created_at": str(self.created_at),
"initial_delay_until": str(self.initial_delay_until),
Expand All @@ -143,9 +140,6 @@ def to_valkey_hash(self) -> Mapping[str, str]:
"manager_healthy": "1" if self.manager_healthy else "0",
"manager_last_check": str(self.manager_last_check),
}
if self.running_at is not None:
data["running_at"] = str(self.running_at)
return data

@classmethod
def from_valkey_hash(cls, data: Mapping[str, str]) -> RouteHealthRecord:
Expand All @@ -157,7 +151,6 @@ def from_valkey_hash(cls, data: Mapping[str, str]) -> RouteHealthRecord:
health_path=data["health_path"],
inference_port=int(data["inference_port"]),
replica_host=data["replica_host"],
running_at=int(raw) if (raw := data.get("running_at")) and raw != "0" else None,
agent_healthy=data.get("agent_healthy", "0") == "1",
agent_last_check=int(data.get("agent_last_check", "0")),
manager_healthy=data.get("manager_healthy", "0") == "1",
Expand Down Expand Up @@ -675,52 +668,6 @@ async def update_route_liveness(self, route_id: str, liveness: bool) -> None:
async with self._client.client() as conn:
await conn.exec(batch, raise_on_error=True)

@valkey_schedule_resilience.apply()
async def mark_route_running_at(self, route_id: str) -> None:
"""
Record the RUNNING transition timestamp for a route.
Called when a route transitions to RUNNING status.
Uses Redis time for consistency with health check comparisons.

:param route_id: The route ID that entered RUNNING state
"""
key = self._get_route_health_key(route_id)
current_time = str(await self._get_redis_time())
async with self._client.client() as conn:
await conn.hset(key, {"running_at": current_time})
await conn.expire(key, ROUTE_HEALTH_TTL_SEC)

@valkey_schedule_resilience.apply()
async def get_route_running_at_batch(self, route_ids: Sequence[str]) -> dict[str, int | None]:
"""
Batch read running_at field from route health hashes.
Works even on partial hashes (before full RouteHealthRecord is initialized).

:param route_ids: Route IDs to look up
:return: Mapping of route_id to running_at timestamp (None if not set)
"""
if not route_ids:
return {}

batch = Batch(is_atomic=False)
for route_id in route_ids:
key = self._get_route_health_key(route_id)
batch.hget(key, "running_at")

async with self._client.client() as conn:
results = await conn.exec(batch, raise_on_error=False)
if results is None:
return dict.fromkeys(route_ids)

running_at_map: dict[str, int | None] = {}
for i, route_id in enumerate(route_ids):
raw = results[i] if len(results) > i else None
if raw and raw != b"0":
running_at_map[route_id] = int(raw)
else:
running_at_map[route_id] = None
return running_at_map

@valkey_schedule_resilience.apply()
async def refresh_route_health_ttl(self, route_id: str) -> None:
"""
Expand Down Expand Up @@ -881,8 +828,6 @@ async def get_route_health_record(self, route_id: str) -> RouteHealthRecord | No
return None

data = {k.decode(): v.decode() for k, v in result.items()}
if "route_id" not in data:
return None
return RouteHealthRecord.from_valkey_hash(data)

@valkey_schedule_resilience.apply()
Expand Down Expand Up @@ -921,10 +866,6 @@ async def get_route_health_records_batch(
continue

data = {k.decode(): v.decode() for k, v in raw.items()}
if "route_id" not in data:
# Partial hash (e.g., only running_at set by mark_route_running_at)
records[route_id] = None
continue
records[route_id] = RouteHealthRecord.from_valkey_hash(data)

return records
Expand Down
Loading
Loading