| Author | HyeokJin Kim (hyeokjin@lablup.com) |
|---|---|
| Status | Draft |
| Created | 2025-01-15 |
| Created-Version | 26.1.0 |
| Target-Version | 26.1.0 |
| Implemented-Version |
- JIRA: BA-3896
- Parent Epic: BA-3060 (Fair Share Scheduler)
- Related BEP: BEP-1026: Fair Share Scheduler
The Sokovan scheduler currently has the SessionLifecycleHandler pattern for handling session state transitions. However, there are use cases that require periodic observation and recording of data without state changes.
- Kernel usage snapshot: Periodically record resource_usage of running kernels
- Usage aggregation: Aggregate kernel records into user/project/domain buckets
- Fair share calculation: Calculate fair_share_factor from aggregated usage
- Service discovery registration: Register kernel state to service discovery (future)
SessionLifecycleHandler requires state transitions (success_status, failure_status, stale_status). The above use cases:
- Do not change state
- Simply read data and record to external systems
- Do not affect session state even on failure
Therefore, a separate handler pattern is needed.
class SessionLifecycleHandler(ABC):
"""Handler for session state transitions."""
@classmethod
@abstractmethod
def name(cls) -> str: ...
@property
@abstractmethod
def target_statuses(self) -> frozenset[SessionStatus]: ...
@property
@abstractmethod
def target_kernel_statuses(self) -> frozenset[KernelStatus]: ...
@property
@abstractmethod
def success_status(self) -> SessionStatus: ... # State to transition on success
@property
@abstractmethod
def failure_status(self) -> SessionStatus: ... # State to transition on failure
@property
@abstractmethod
def stale_status(self) -> SessionStatus: ... # State to transition on stale
@abstractmethod
async def execute(self, targets: Sequence[TTarget]) -> SessionExecutionResult: ...This pattern is centered on state transitions, making it unsuitable for observation-only tasks.
A handler that observes and records data without state transitions.
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Generic, Sequence, TypeVar
from ai.backend.manager.models.kernel import KernelStatus
from ai.backend.manager.models.session import SessionStatus
TTarget = TypeVar("TTarget")
@dataclass
class ObserverResult:
"""Result of observer execution.
Aligned with SessionExecutionResult field naming convention.
Used for logging and Prometheus metrics collection.
"""
success_count: int = 0
failure_count: int = 0
class ObserverHandler(ABC, Generic[TTarget]):
"""Handler that observes and records data without state changes.
Unlike SessionLifecycleHandler:
- No success_status, failure_status, stale_status
- Observation/recording only, no state changes
- No LockID needed (no state changes)
"""
@classmethod
@abstractmethod
def name(cls) -> str:
"""Handler name."""
raise NotImplementedError
@property
@abstractmethod
def target_statuses(self) -> frozenset[SessionStatus]:
"""Target session statuses. Empty set means no filtering by session status."""
raise NotImplementedError
@property
@abstractmethod
def target_kernel_statuses(self) -> frozenset[KernelStatus]:
"""Target kernel statuses. Empty set means no filtering by kernel status."""
raise NotImplementedError
@abstractmethod
async def execute(self, targets: Sequence[TTarget]) -> ObserverResult:
"""Execute observation on targets."""
raise NotImplementedError
async def post_process(self, result: ObserverResult) -> None:
"""Post-processing (optional). Default: no-op."""
passNo separate Coordinator - add observer registration to existing ScheduleCoordinator.
class ScheduleCoordinator:
_observer_handlers: dict[str, ObserverHandler]
def __init__(self, ...):
...
self._observer_handlers = {}
self._init_observers()
def register_observer(self, handler: ObserverHandler) -> None:
"""Register an observer handler."""
self._observer_handlers[handler.name()] = handler
async def process_observer(self, handler_name: str) -> ObserverResult:
"""Execute observer (no lock needed - no state changes)."""
handler = self._observer_handlers.get(handler_name)
if handler is None:
return ObserverResult()
targets = await self._fetch_observer_targets(handler)
if not targets:
return ObserverResult()
result = await handler.execute(targets)
await handler.post_process(result)
return result
def _init_observers(self) -> None:
"""Initialize observer handlers."""
self.register_observer(UsageRecordObserver(self._processors))
self.register_observer(UsageAggregationObserver(self._processors))
self.register_observer(FairShareCalculationObserver(self._processors))- No separate Coordinator: Integrate into existing
ScheduleCoordinator - No LockID needed: No state changes, so no distributed lock required. Single execution at a point in time is guaranteed by the scheduler task itself.
- No short timer: No user interaction, so short timer polling is unnecessary
class UsageRecordObserver(ObserverHandler[KernelRow]):
"""Record resource_usage of running kernels.
Resource Usage = Allocated Resources × Time
Note: measured_usage is NOT stored in DB - use Prometheus instead
"""
@classmethod
def name(cls) -> str:
return "usage-record"
@property
def target_statuses(self) -> frozenset[SessionStatus]:
return frozenset({SessionStatus.RUNNING})
@property
def target_kernel_statuses(self) -> frozenset[KernelStatus]:
return frozenset({KernelStatus.RUNNING})
async def execute(self, targets: Sequence[KernelRow]) -> ObserverResult:
result = ObserverResult()
for kernel in targets:
try:
await self._record_usage(kernel)
result.success_count += 1
except Exception:
log.exception(f"Failed to record usage for kernel {kernel.id}")
result.failure_count += 1
return resultclass UsageAggregationObserver(ObserverHandler[KernelUsageRecordRow]):
"""Aggregate kernel_usage_records into user/project/domain buckets."""
@classmethod
def name(cls) -> str:
return "usage-aggregation"
@property
def target_statuses(self) -> frozenset[SessionStatus]:
return frozenset() # No filtering by session status
@property
def target_kernel_statuses(self) -> frozenset[KernelStatus]:
return frozenset() # No filtering by kernel statusclass FairShareCalculationObserver(ObserverHandler[UsageBucketData]):
"""Calculate fair_share_factor from usage buckets.
Formula (Slurm compatible): F = 2^(-normalized_usage / weight)
"""
@classmethod
def name(cls) -> str:
return "fair-share-calculation"def _get_observer_tasks(self) -> list[SchedulerTaskSpec]:
"""Observer periodic tasks - no short timer (no user interaction)."""
return [
SchedulerTaskSpec(name="usage-record", interval=timedelta(minutes=5)),
SchedulerTaskSpec(name="usage-aggregation", interval=timedelta(hours=1)),
SchedulerTaskSpec(name="fair-share-calculation", interval=timedelta(hours=1)),
]- No impact on existing
SessionLifecycleHandlerpattern ObserverHandleris an additional pattern
- None
- Create
sokovan/scheduler/observers/__init__.py - Create
sokovan/scheduler/observers/base.py-ObserverResult,ObserverHandlerABC
usage_record.py-UsageRecordObserverusage_aggregation.py-UsageAggregationObserverfair_share_calculation.py-FairShareCalculationObserver
- Add
register_observer(),process_observer()toScheduleCoordinator - Register periodic tasks
-
Target fetching: How to fetch data when
target_statusesis empty set?- UsageAggregationObserver: kernel_usage_records within a specific period
- FairShareCalculationObserver: usage_buckets within a specific period
-
Error handling: Stop entire batch on individual target failure vs continue?
- Current design: Continue and log errors
-
Interval configuration: Hardcoded vs config file?
- Can be moved to config file in the future
- BEP-1026: Fair Share Scheduler
- Sokovan Scheduler:
src/ai/backend/manager/sokovan/scheduler/ - SessionLifecycleHandler:
sokovan/scheduler/handlers/base.py