From 3012b1bada10efe1e7b41a1685a2899be6e70f77 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Fri, 20 Mar 2026 16:21:45 +0900 Subject: [PATCH 01/15] refactor: Move deploying timeout, coordinator, executor changes from rolling update PR Move non-rolling-update-evaluator changes to the base refactoring PR: - Coordinator: sub_step filtering, expired transition for skipped deployments - Deploying handlers: expired transition, rolling_back post_process - Executor: route creation refactoring - Repository/DB source: sub_steps filter parameter - Strategy applier: remove clear_deploying_revision (moved to repo) - Strategy types: docstring updates - BEP-1049 proposal updates - Test fixtures updates Co-Authored-By: Claude Opus 4.6 (1M context) --- .../manager/sokovan/deployment/coordinator.py | 27 +++++++++++++++++++ .../manager/sokovan/deployment/executor.py | 1 + 2 files changed, 28 insertions(+) diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index 611fbb742e7..16165094b7c 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -461,6 +461,33 @@ async def _handle_status_transitions( all_history_specs.extend(transition.history_specs) notification_events.extend(transition.notification_events) + # Expired transitions for skipped deployments — check timeout even when + # no execution error occurred (e.g. deployment is just waiting for routes). + if result.skipped and transitions.expired is not None: + current_dbtime = await self._deployment_repository.get_db_now() + timed_out: list[DeploymentWithHistory] = [] + for deployment in result.skipped: + lifecycle = deployment.deployment_info.state.lifecycle + if _is_transition_timed_out(deployment.phase_started_at, lifecycle, current_dbtime): + timed_out.append(deployment) + if timed_out: + log.warning( + "handler {}: {} skipped deployments timed out — transitioning to expired", + handler_name, + len(timed_out), + ) + transition = self._build_success_transition( + handler_name=handler_name, + deployments=timed_out, + lifecycle_status=transitions.expired, + target_lifecycles=target_statuses, + records=records, + timestamp_now=timestamp_now, + ) + batch_updaters.append(transition.updater) + all_history_specs.extend(transition.history_specs) + notification_events.extend(transition.notification_events) + # Failure transitions — classify into need_retry/expired/give_up if result.failures: current_dbtime = await self._deployment_repository.get_db_now() diff --git a/src/ai/backend/manager/sokovan/deployment/executor.py b/src/ai/backend/manager/sokovan/deployment/executor.py index e8a77d81a77..2f66c288666 100644 --- a/src/ai/backend/manager/sokovan/deployment/executor.py +++ b/src/ai/backend/manager/sokovan/deployment/executor.py @@ -29,6 +29,7 @@ from ai.backend.manager.data.deployment.scale import AutoScalingRule from ai.backend.manager.data.deployment.types import ( DeploymentInfo, + ModelRevisionSpec, RouteInfo, RouteStatus, RouteTrafficStatus, From 7728afdb97dc340a6b588907a5ff0241c7b36a88 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Fri, 20 Mar 2026 16:57:35 +0900 Subject: [PATCH 02/15] refactor: Align DeploymentExecutionResult with session pattern (successes/failures/skipped) Replace the 4-field result (successes, errors, skipped, need_retry) with the 3-field pattern used by session coordinator: successes, failures, skipped. Handlers now report all non-success outcomes as failures (DeploymentExecutionError). The coordinator classifies failures into need_retry/expired/give_up based on retry count and timeout policy, matching the session side's approach. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../manager/sokovan/deployment/coordinator.py | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index 16165094b7c..611fbb742e7 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -461,33 +461,6 @@ async def _handle_status_transitions( all_history_specs.extend(transition.history_specs) notification_events.extend(transition.notification_events) - # Expired transitions for skipped deployments — check timeout even when - # no execution error occurred (e.g. deployment is just waiting for routes). - if result.skipped and transitions.expired is not None: - current_dbtime = await self._deployment_repository.get_db_now() - timed_out: list[DeploymentWithHistory] = [] - for deployment in result.skipped: - lifecycle = deployment.deployment_info.state.lifecycle - if _is_transition_timed_out(deployment.phase_started_at, lifecycle, current_dbtime): - timed_out.append(deployment) - if timed_out: - log.warning( - "handler {}: {} skipped deployments timed out — transitioning to expired", - handler_name, - len(timed_out), - ) - transition = self._build_success_transition( - handler_name=handler_name, - deployments=timed_out, - lifecycle_status=transitions.expired, - target_lifecycles=target_statuses, - records=records, - timestamp_now=timestamp_now, - ) - batch_updaters.append(transition.updater) - all_history_specs.extend(transition.history_specs) - notification_events.extend(transition.notification_events) - # Failure transitions — classify into need_retry/expired/give_up if result.failures: current_dbtime = await self._deployment_repository.get_db_now() From a3ca01868a998a08b728ccfdfffa76aa8bacffab Mon Sep 17 00:00:00 2001 From: jopemachine Date: Thu, 19 Mar 2026 17:25:17 +0900 Subject: [PATCH 03/15] feat(BA-3435): Implement Rolling Update deployment strategy Co-Authored-By: Claude Opus 4.6 (1M context) --- changes/9997.feature.md | 1 + .../manager/sokovan/deployment/coordinator.py | 1 + .../sokovan/deployment/strategy/evaluator.py | 43 +- .../deployment/strategy/rolling_update.py | 253 +++++- .../deployment/strategy/test_applier.py | 2 +- .../strategy/test_rolling_update.py | 817 ++++++++++++++++++ 6 files changed, 1104 insertions(+), 13 deletions(-) create mode 100644 changes/9997.feature.md create mode 100644 tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py diff --git a/changes/9997.feature.md b/changes/9997.feature.md new file mode 100644 index 00000000000..1065196f9bd --- /dev/null +++ b/changes/9997.feature.md @@ -0,0 +1 @@ +Implement Rolling Update deployment strategy diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index 611fbb742e7..6d38d02da54 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -149,6 +149,7 @@ class HandlerRegistry: handlers: dict[HandlerKey, DeploymentHandler] + @dataclass class DeploymentTaskSpec: """Specification for a deployment lifecycle periodic task.""" diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py b/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py index 3a9973dab8a..c88369afb21 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py @@ -25,9 +25,16 @@ InvalidDeploymentStrategy, InvalidDeploymentStrategySpec, ) -from ai.backend.manager.models.deployment_policy.conditions import DeploymentPolicyConditions -from ai.backend.manager.models.routing.conditions import RouteConditions -from ai.backend.manager.repositories.base import BatchQuerier, NoPagination +from ai.backend.manager.repositories.base import ( + BatchQuerier, + NoPagination, + QueryCondition, + combine_conditions_or, +) +from ai.backend.manager.repositories.deployment.options import ( + DeploymentPolicyConditions, + RouteConditions, +) from ai.backend.manager.repositories.deployment.repository import DeploymentRepository from ai.backend.manager.sokovan.deployment.recorder import DeploymentRecorderContext @@ -79,15 +86,33 @@ async def evaluate( ) ) policy_map = {policy.endpoint: policy for policy in policy_search.items} - # Fetch all non-terminated routes so the strategy can detect rollback - # conditions (e.g. FAILED_TO_START routes after a coordinator crash). + # Fetch non-terminated routes + terminated routes belonging to a + # deploying revision. The FSM needs terminated new-revision routes + # to count accumulated failures for rollback detection, but old + # terminated routes are irrelevant and would bloat the result set. + deploying_revision_ids = { + deployment.deploying_revision_id + for deployment in deployments + if deployment.deploying_revision_id is not None + } + route_conditions: list[QueryCondition] = [ + RouteConditions.by_endpoint_ids(endpoint_ids), + ] + if deploying_revision_ids: + route_conditions.append( + combine_conditions_or([ + RouteConditions.exclude_statuses([RouteStatus.TERMINATED]), + RouteConditions.by_revision_ids(deploying_revision_ids), + ]) + ) + else: + route_conditions.append( + RouteConditions.exclude_statuses([RouteStatus.TERMINATED]), + ) route_search = await self._deployment_repo.search_routes( BatchQuerier( pagination=NoPagination(), - conditions=[ - RouteConditions.by_endpoint_ids(endpoint_ids), - RouteConditions.exclude_statuses([RouteStatus.TERMINATED]), - ], + conditions=route_conditions, ) ) route_map: defaultdict[UUID, list[RouteInfo]] = defaultdict(list) diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py b/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py index dc2ff159203..81cc6356e23 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py @@ -6,21 +6,62 @@ from __future__ import annotations +import logging from collections.abc import Sequence +from dataclasses import dataclass, field from typing import override +from uuid import UUID +from ai.backend.common.data.permission.types import RBACElementType +from ai.backend.logging import BraceStyleAdapter from ai.backend.manager.data.deployment.types import ( DeploymentInfo, + DeploymentLifecycleSubStep, RouteInfo, + RouteStatus, ) +from ai.backend.manager.data.permission.types import RBACElementRef from ai.backend.manager.models.deployment_policy import RollingUpdateSpec +from ai.backend.manager.models.routing import RoutingRow +from ai.backend.manager.repositories.base.rbac.entity_creator import RBACEntityCreator +from ai.backend.manager.repositories.deployment.creators import RouteCreatorSpec +from ai.backend.manager.sokovan.deployment.exceptions import InvalidEndpointState -from .types import AbstractDeploymentStrategy, StrategyCycleResult +from .types import AbstractDeploymentStrategy, RouteChanges, StrategyCycleResult + +log = BraceStyleAdapter(logging.getLogger(__name__)) + + +@dataclass +class _ClassifiedRoutes: + """Routes classified by revision and status. + + Only ``old_active`` retains full RouteInfo (needed for drain ordering). + Other buckets store counts only. + """ + + old_active: list[RouteInfo] = field(default_factory=list) + new_provisioning_count: int = 0 + new_healthy_count: int = 0 + new_unhealthy_count: int = 0 + new_failed_count: int = 0 + + @property + def total_new_running(self) -> int: + """Count of new-revision routes whose processes are still running. + + Includes UNHEALTHY and DEGRADED to prevent duplicate route creation + in surge calculation. (They are excluded from ``available_count`` + in termination calculation, since they cannot serve traffic.) + """ + return self.new_provisioning_count + self.new_healthy_count + self.new_unhealthy_count class RollingUpdateStrategy(AbstractDeploymentStrategy): """Rolling update deployment strategy FSM.""" + _spec: RollingUpdateSpec + def __init__(self, spec: RollingUpdateSpec) -> None: super().__init__(spec) self._spec = spec @@ -31,5 +72,211 @@ def evaluate_cycle( deployment: DeploymentInfo, routes: Sequence[RouteInfo], ) -> StrategyCycleResult: - """Evaluate one cycle of rolling update for a single deployment.""" - raise NotImplementedError("Rolling update strategy is not yet implemented") + """Evaluate one cycle of rolling update for a single deployment. + + FSM flow: + 1. Classify routes into old / new by revision_id. + 2. If any new route is PROVISIONING → PROVISIONING (wait). + 3. If no old routes remain and new_healthy >= desired → COMPLETED. + 4. Compute allowed surge/unavailable, decide create/terminate + → PROVISIONING (with route mutations). + + Rollback is not decided by the FSM — the coordinator's timeout + sweep handles it by transitioning to ROLLING_BACK when the + deploying timeout is exceeded. + """ + desired = deployment.replica_spec.target_replica_count + deploying_revision_id = deployment.deploying_revision_id + if deploying_revision_id is None: + raise InvalidEndpointState( + f"Deployment {deployment.id} has DEPLOYING lifecycle but deploying_revision_id is None. " + "This indicates an inconsistent state — the deployment will be skipped." + ) + classified = self._classify_routes(routes, deploying_revision_id) + log.info( + "deployment {}: sub_step={}, routes total={}, " + "old_active={}, new_prov={}, new_healthy={}, new_unhealthy={}, new_failed={}", + deployment.id, + deployment.sub_step, + len(routes), + len(classified.old_active), + classified.new_provisioning_count, + classified.new_healthy_count, + classified.new_unhealthy_count, + classified.new_failed_count, + ) + + if result := self._check_provisioning(deployment, classified): + return result + if result := self._check_completed(deployment, classified, desired): + return result + return self._compute_progressing(deployment, classified, desired) + + def _classify_routes( + self, + routes: Sequence[RouteInfo], + deploying_revision_id: UUID, + ) -> _ClassifiedRoutes: + """Classify routes into old/new by revision and status.""" + classified = _ClassifiedRoutes() + for route in routes: + if route.revision_id != deploying_revision_id: + if route.status.is_active(): + classified.old_active.append(route) + continue + + if route.status.is_provisioning(): + classified.new_provisioning_count += 1 + elif route.status.is_inactive(): + classified.new_failed_count += 1 + elif route.status == RouteStatus.HEALTHY: + classified.new_healthy_count += 1 + elif route.status == RouteStatus.UNHEALTHY: + classified.new_unhealthy_count += 1 + return classified + + def _check_provisioning( + self, + deployment: DeploymentInfo, + classified: _ClassifiedRoutes, + ) -> StrategyCycleResult | None: + """Return PROVISIONING result if any new routes are still being provisioned.""" + if not classified.new_provisioning_count: + return None + log.debug( + "deployment {}: {} new routes still provisioning", + deployment.id, + classified.new_provisioning_count, + ) + return StrategyCycleResult(sub_step=DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING) + + def _check_completed( + self, + deployment: DeploymentInfo, + classified: _ClassifiedRoutes, + desired: int, + ) -> StrategyCycleResult | None: + """Return COMPLETED result if all old routes are replaced and enough new are healthy.""" + if classified.old_active or classified.new_healthy_count < desired: + return None + log.info( + "deployment {}: rolling update complete ({} healthy routes)", + deployment.id, + classified.new_healthy_count, + ) + return StrategyCycleResult(sub_step=DeploymentLifecycleSubStep.DEPLOYING_COMPLETED) + + def _compute_progressing( + self, + deployment: DeploymentInfo, + classified: _ClassifiedRoutes, + desired: int, + ) -> StrategyCycleResult: + """Compute surge/unavailable budget and return PROVISIONING with route mutations.""" + max_surge = self._spec.max_surge # extra routes allowed above desired + max_unavailable = self._spec.max_unavailable # routes allowed to be down + + max_total = desired + max_surge # upper bound on simultaneous routes + current_total = ( + len(classified.old_active) + classified.total_new_running + ) # routes running now + min_available = max(0, desired - max_unavailable) # floor for traffic-serving routes + + route_changes = RouteChanges() + + to_create = self._compute_routes_to_create(desired, max_total, current_total, classified) + if to_create > 0: + route_changes.rollout_specs = _build_route_creators(deployment, to_create) + + to_terminate = self._compute_routes_to_terminate(classified, min_available) + if to_terminate > 0: + sorted_old = sorted( + classified.old_active, key=lambda route: route.status.termination_priority() + ) + for route in sorted_old[:to_terminate]: + route_changes.drain_route_ids.append(route.route_id) + + log.debug( + "deployment {}: PROVISIONING create={}, terminate={}, " + "old_active={}, new_healthy={}, new_unhealthy={}, new_prov={}", + deployment.id, + to_create, + to_terminate, + len(classified.old_active), + classified.new_healthy_count, + classified.new_unhealthy_count, + classified.new_provisioning_count, + ) + + return StrategyCycleResult( + sub_step=DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING, + route_changes=route_changes, + ) + + def _compute_routes_to_create( + self, + desired: int, + max_total: int, + current_total: int, + classified: _ClassifiedRoutes, + ) -> int: + """Decide how many new routes to create within surge budget. + + Takes the smaller of two constraints: + - ``can_create``: surge headroom (max_total - current_total) + - ``still_needed``: new routes remaining to reach desired + + Example (desired=4, max_surge=1 → max_total=5): + old=4, new_running=0 → can_create=1, still_needed=4 → 1 (surge-limited) + old=0, new_running=3 → can_create=2, still_needed=1 → 1 (goal-limited) + """ + can_create = max_total - current_total + still_needed = desired - classified.total_new_running + return max(0, min(can_create, still_needed)) + + def _compute_routes_to_terminate( + self, + classified: _ClassifiedRoutes, + min_available: int, + ) -> int: + """Decide how many old routes to terminate within unavailability budget. + + Only counts truly healthy routes as available (not UNHEALTHY/DEGRADED). + Takes the smaller of two constraints: + - ``can_terminate``: availability headroom (available_count - min_available) + - ``old_active``: cannot terminate more old routes than exist + + Example (desired=4, max_unavailable=1 → min_available=3): + new_healthy=2, old=3 → can_terminate=2, old=3 → 2 (budget-limited) + new_healthy=4, old=1 → can_terminate=2, old=1 → 1 (old-count-limited) + """ + available_count = classified.new_healthy_count + len(classified.old_active) + can_terminate = available_count - min_available + return max(0, min(can_terminate, len(classified.old_active))) # clamp to actual old count + + +def _build_route_creators( + deployment: DeploymentInfo, + count: int, +) -> list[RBACEntityCreator[RoutingRow]]: + """Build route creator specs for new revision routes.""" + creators: list[RBACEntityCreator[RoutingRow]] = [] + for _ in range(count): + spec = RouteCreatorSpec( + endpoint_id=deployment.id, + session_owner_id=deployment.metadata.session_owner, + domain=deployment.metadata.domain, + project_id=deployment.metadata.project, + revision_id=deployment.deploying_revision_id, + ) + creators.append( + RBACEntityCreator( + spec=spec, + element_type=RBACElementType.ROUTING, + scope_ref=RBACElementRef( + element_type=RBACElementType.MODEL_DEPLOYMENT, + element_id=str(deployment.id), + ), + ) + ) + return creators diff --git a/tests/unit/manager/sokovan/deployment/strategy/test_applier.py b/tests/unit/manager/sokovan/deployment/strategy/test_applier.py index 13c216587e7..638088e60c1 100644 --- a/tests/unit/manager/sokovan/deployment/strategy/test_applier.py +++ b/tests/unit/manager/sokovan/deployment/strategy/test_applier.py @@ -221,7 +221,7 @@ async def test_mixed_assignments_handles_all( mock_deployment_repo.apply_strategy_mutations.assert_called_once() kwargs = mock_deployment_repo.apply_strategy_mutations.call_args.kwargs assert kwargs["completed_ids"] == {completed_id} - assert len(kwargs["rollout"]) == 1 + assert len(kwargs["rollout"].specs) == 1 assert kwargs["drain"] is not None assert result.completed_ids == {completed_id} assert result.routes_created == 1 diff --git a/tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py b/tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py new file mode 100644 index 00000000000..0347db5ba22 --- /dev/null +++ b/tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py @@ -0,0 +1,817 @@ +"""Tests for the rolling update FSM evaluation (BEP-1049). + +Tests cover: +- FSM state transitions: PROVISIONING and COMPLETED +- max_surge / max_unavailable budget calculations +- Multi-cycle progression and termination priority +- Edge cases and boundary conditions + +Note: Rollback is not decided by the FSM — the coordinator's timeout +sweep handles it. The FSM only returns PROVISIONING (with or without +route mutations) or COMPLETED. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import UTC, datetime +from uuid import UUID, uuid4 + +import pytest + +from ai.backend.common.data.endpoint.types import EndpointLifecycle +from ai.backend.common.types import SessionId +from ai.backend.manager.data.deployment.types import ( + DeploymentInfo, + DeploymentLifecycleSubStep, + DeploymentMetadata, + DeploymentNetworkSpec, + DeploymentState, + ReplicaSpec, + RouteInfo, + RouteStatus, + RouteTrafficStatus, +) +from ai.backend.manager.models.deployment_policy import RollingUpdateSpec +from ai.backend.manager.repositories.deployment.creators import RouteCreatorSpec +from ai.backend.manager.sokovan.deployment.strategy.rolling_update import ( + RollingUpdateStrategy, +) + +ENDPOINT_ID = UUID("aaaaaaaa-0000-0000-0000-aaaaaaaaaaaa") +OLD_REV = UUID("11111111-1111-1111-1111-111111111111") +NEW_REV = UUID("22222222-2222-2222-2222-222222222222") +PROJECT_ID = UUID("cccccccc-cccc-cccc-cccc-cccccccccccc") +USER_ID = UUID("dddddddd-dddd-dddd-dddd-dddddddddddd") + + +# --------------------------------------------------------------------------- +# Test scenario types +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class RollingUpdateInput: + """Input conditions for a rolling update cycle test.""" + + desired_replicas: int + max_surge: int + max_unavailable: int + old_count: int + + +@dataclass(frozen=True) +class RollingUpdateExpected: + """Expected outcomes of a rolling update cycle.""" + + create: int + terminate: int + + +@dataclass(frozen=True) +class RollingUpdateScenario: + """A single test case for the rolling update strategy with only old routes.""" + + description: str + input: RollingUpdateInput + expected: RollingUpdateExpected + + +def make_deployment( + *, + desired: int = 1, + deploying_revision_id: UUID = NEW_REV, + current_revision_id: UUID = OLD_REV, + endpoint_id: UUID = ENDPOINT_ID, +) -> DeploymentInfo: + return DeploymentInfo( + id=endpoint_id, + metadata=DeploymentMetadata( + name="test-deploy", + domain="default", + project=PROJECT_ID, + resource_group="default", + created_user=USER_ID, + session_owner=USER_ID, + created_at=datetime.now(UTC), + revision_history_limit=5, + ), + state=DeploymentState( + lifecycle=EndpointLifecycle.DEPLOYING, + retry_count=0, + ), + replica_spec=ReplicaSpec( + replica_count=desired, + ), + network=DeploymentNetworkSpec(open_to_public=False), + model_revisions=[], + current_revision_id=current_revision_id, + deploying_revision_id=deploying_revision_id, + ) + + +def make_route( + *, + revision_id: UUID, + status: RouteStatus = RouteStatus.HEALTHY, + endpoint_id: UUID = ENDPOINT_ID, + route_id: UUID | None = None, +) -> RouteInfo: + return RouteInfo( + route_id=route_id or uuid4(), + endpoint_id=endpoint_id, + session_id=SessionId(uuid4()), + status=status, + traffic_ratio=1.0 if status.is_active() else 0.0, + created_at=datetime.now(UTC), + revision_id=revision_id, + traffic_status=RouteTrafficStatus.ACTIVE + if status.is_active() + else RouteTrafficStatus.INACTIVE, + ) + + +# =========================================================================== +# 1. Basic FSM states +# =========================================================================== + + +class TestBasicFSMStates: + """Test fundamental FSM transitions.""" + + def test_no_routes_creates_new(self) -> None: + """First cycle with 0 routes → PROVISIONING with route creation.""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, []) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result.route_changes.rollout_specs) == 1 + assert len(result.route_changes.drain_route_ids) == 0 + + def test_new_provisioning_waits(self) -> None: + """New routes in PROVISIONING → wait (PROVISIONING sub-step).""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.PROVISIONING), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result.route_changes.rollout_specs) == 0 + assert len(result.route_changes.drain_route_ids) == 0 + + def test_completed_when_all_new_healthy_and_no_old(self) -> None: + """All old gone + new_healthy >= desired → completed.""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED + + @pytest.mark.parametrize( + "failed_status", + [ + pytest.param(RouteStatus.FAILED_TO_START, id="failed_to_start"), + pytest.param(RouteStatus.TERMINATED, id="terminated"), + ], + ) + def test_all_new_failed_retries_creation(self, failed_status: RouteStatus) -> None: + """All new routes failed → FSM retries by creating new routes. + + Rollback is handled by the coordinator's timeout sweep, not the FSM. + """ + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=failed_status), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + +# =========================================================================== +# 2. Surge and unavailability budget +# =========================================================================== + + +ROLLING_UPDATE_SCENARIOS = [ + # -- max_surge controls creation -- + RollingUpdateScenario( + description=( + "max_surge=1 allows exactly 1 extra route beyond desired." + " With 1 old route and desired=1, max_total=2 so 1 new route is created." + " No termination because max_unavailable=0 and no new healthy routes exist yet." + ), + input=RollingUpdateInput(desired_replicas=1, max_surge=1, max_unavailable=0, old_count=1), + expected=RollingUpdateExpected(create=1, terminate=0), + ), + RollingUpdateScenario( + description=( + "max_surge=2 with desired=3 and 3 old routes: max_total=5, current=3," + " so can_create=2. still_needed=3 (no new yet), min(2,3)=2 routes created." + ), + input=RollingUpdateInput(desired_replicas=3, max_surge=2, max_unavailable=0, old_count=3), + expected=RollingUpdateExpected(create=2, terminate=0), + ), + RollingUpdateScenario( + description=( + "max_surge=3 exceeds desired=2: max_total=5, can_create=3," + " but still_needed=2 (only 2 new routes needed), so creation is capped at 2." + ), + input=RollingUpdateInput(desired_replicas=2, max_surge=3, max_unavailable=0, old_count=2), + expected=RollingUpdateExpected(create=2, terminate=0), + ), + # -- max_unavailable controls termination -- + RollingUpdateScenario( + description=( + "max_unavailable=0 means zero downtime: min_available equals desired=2." + " With 2 old routes and 0 new healthy, available=2=min_available," + " so no old route can be terminated yet." + ), + input=RollingUpdateInput(desired_replicas=2, max_surge=1, max_unavailable=0, old_count=2), + expected=RollingUpdateExpected(create=1, terminate=0), + ), + RollingUpdateScenario( + description=( + "max_unavailable=1 allows 1 route to be unavailable: min_available=3-1=2." + " With 3 old routes and 0 new healthy, available=3, can_terminate=3-2=1." + ), + input=RollingUpdateInput(desired_replicas=3, max_surge=1, max_unavailable=1, old_count=3), + expected=RollingUpdateExpected(create=1, terminate=1), + ), + # -- combined surge + unavailable -- + RollingUpdateScenario( + description=( + "Both parameters act simultaneously: max_surge=2 allows 2 new creations" + " (max_total=6, current=4, can_create=2) while max_unavailable=1" + " allows terminating 1 old (min_available=3, available=4, can_terminate=1)." + ), + input=RollingUpdateInput(desired_replicas=4, max_surge=2, max_unavailable=1, old_count=4), + expected=RollingUpdateExpected(create=2, terminate=1), + ), + RollingUpdateScenario( + description=( + "Aggressive rollout: max_surge=3 and max_unavailable=2 with desired=3." + " All 3 new routes created at once (max_total=6, can_create=3=still_needed)" + " and 2 old terminated immediately (min_available=1, available=3, can_terminate=2)." + ), + input=RollingUpdateInput(desired_replicas=3, max_surge=3, max_unavailable=2, old_count=3), + expected=RollingUpdateExpected(create=3, terminate=2), + ), + # -- boundary: unavailable > desired -- + RollingUpdateScenario( + description=( + "max_unavailable=5 exceeds desired=1: min_available=max(0, 1-5)=0." + " The operator has opted into full unavailability," + " so the single old route can be terminated immediately." + ), + input=RollingUpdateInput(desired_replicas=1, max_surge=0, max_unavailable=5, old_count=1), + expected=RollingUpdateExpected(create=0, terminate=1), + ), +] + + +class TestSurgeAndUnavailabilityBudget: + """Test max_surge and max_unavailable parameter controls.""" + + @pytest.mark.parametrize( + "scenario", + ROLLING_UPDATE_SCENARIOS, + ids=[s.description for s in ROLLING_UPDATE_SCENARIOS], + ) + def test_budget_with_old_routes_only(self, scenario: RollingUpdateScenario) -> None: + """Verify creation/termination counts based on surge and unavailability budgets.""" + deployment = make_deployment(desired=scenario.input.desired_replicas) + spec = RollingUpdateSpec( + max_surge=scenario.input.max_surge, + max_unavailable=scenario.input.max_unavailable, + ) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) + for _ in range(scenario.input.old_count) + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert len(result.route_changes.rollout_specs) == scenario.expected.create + assert len(result.route_changes.drain_route_ids) == scenario.expected.terminate + + def test_surge_already_at_max_no_create(self) -> None: + """Already at max_total → no new creates.""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert len(result.route_changes.rollout_specs) == 0 + + def test_new_healthy_allows_more_termination(self) -> None: + """With new healthy routes, more old can be terminated.""" + deployment = make_deployment(desired=3) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert len(result.route_changes.drain_route_ids) == 1 + + def test_surge_and_unavailable_both_zero_rejected(self) -> None: + """surge=0, unavailable=0: rejected by Pydantic validation.""" + with pytest.raises(ValueError, match="max_surge or max_unavailable must be positive"): + RollingUpdateSpec(max_surge=0, max_unavailable=0) + + +# =========================================================================== +# 3. Multi-cycle progression +# =========================================================================== + + +class TestMultiCycleProgression: + """Simulate multiple evaluation cycles.""" + + def test_new_healthy_enables_further_creation(self) -> None: + """After new routes become healthy, more new routes can be created.""" + deployment = make_deployment(desired=3) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert len(result.route_changes.rollout_specs) == 1 + assert len(result.route_changes.drain_route_ids) == 0 + + def test_multiple_new_healthy_enables_old_termination(self) -> None: + """2 new healthy, 2 old: can terminate 1 old.""" + deployment = make_deployment(desired=3) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert len(result.route_changes.rollout_specs) == 0 + assert len(result.route_changes.drain_route_ids) == 1 + + def test_not_completed_when_old_still_exists(self) -> None: + """Even with enough new healthy, old still exists → not completed.""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result.route_changes.drain_route_ids) == 1 + + +# =========================================================================== +# 4. Route status classification +# =========================================================================== + + +class TestRouteStatusClassification: + """Test how different route statuses affect classification.""" + + def test_degraded_new_waits_provisioning(self) -> None: + """DEGRADED new routes are treated as PROVISIONING (still warming up).""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.DEGRADED), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + def test_unhealthy_new_retries(self) -> None: + """All new UNHEALTHY → PROVISIONING (retries, timeout handles rollback).""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.UNHEALTHY), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + @pytest.mark.parametrize( + "inactive_status", + [ + pytest.param(RouteStatus.TERMINATING, id="terminating"), + pytest.param(RouteStatus.TERMINATED, id="terminated"), + ], + ) + def test_old_inactive_not_counted_as_active(self, inactive_status: RouteStatus) -> None: + """Old routes in terminal states are not counted as old_active.""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=inactive_status), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED + + def test_partial_new_failure_continues_progress(self) -> None: + """Some new failed, some healthy → no rollback (live routes exist).""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=2, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.FAILED_TO_START), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + def test_old_provisioning_counted_as_active(self) -> None: + """Old routes in PROVISIONING are counted as old_active.""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.PROVISIONING), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + +# =========================================================================== +# 5. Termination priority ordering +# =========================================================================== + + +class TestTerminationPriority: + """Test that old routes are terminated in priority order.""" + + def test_full_priority_order(self) -> None: + """Termination order: UNHEALTHY → DEGRADED → PROVISIONING → HEALTHY.""" + unhealthy_id = UUID("00000000-0000-0000-0000-000000000001") + degraded_id = UUID("00000000-0000-0000-0000-000000000002") + provisioning_id = UUID("00000000-0000-0000-0000-000000000003") + healthy_id = UUID("00000000-0000-0000-0000-000000000004") + + deployment = make_deployment(desired=4) + spec = RollingUpdateSpec(max_surge=0, max_unavailable=3) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY, route_id=healthy_id), + make_route( + revision_id=OLD_REV, + status=RouteStatus.PROVISIONING, + route_id=provisioning_id, + ), + make_route(revision_id=OLD_REV, status=RouteStatus.DEGRADED, route_id=degraded_id), + make_route( + revision_id=OLD_REV, + status=RouteStatus.UNHEALTHY, + route_id=unhealthy_id, + ), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + terminated = result.route_changes.drain_route_ids + assert len(terminated) == 3 + assert terminated[0] == unhealthy_id + assert terminated[1] == degraded_id + assert terminated[2] == provisioning_id + + +# =========================================================================== +# 6. Edge cases +# =========================================================================== + + +class TestEdgeCases: + """Edge cases and boundary conditions.""" + + def test_desired_0_no_routes_completed(self) -> None: + """desired=0, no routes → completed (vacuously true).""" + deployment = make_deployment(desired=0) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, []) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED + + def test_more_new_healthy_than_desired_still_completes(self) -> None: + """new_healthy > desired and no old → completed.""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=2, max_unavailable=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED + + def test_only_failed_new_no_old_rolls_back(self) -> None: + """Only failed new routes, no old → PROVISIONING (retries creation).""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.FAILED_TO_START), + make_route(revision_id=NEW_REV, status=RouteStatus.FAILED_TO_START), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + def test_all_old_inactive_no_new_creates_desired(self) -> None: + """All old routes are terminated, no new → create desired.""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.TERMINATED), + make_route(revision_id=OLD_REV, status=RouteStatus.TERMINATED), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert len(result.route_changes.rollout_specs) == 2 + + def test_deploying_rev_none_rejected(self) -> None: + """If deploying_revision_id is None, evaluate_cycle raises.""" + deployment = make_deployment(desired=1, deploying_revision_id=None) # type: ignore[arg-type] + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY)] + + with pytest.raises(Exception): # InvalidEndpointState + RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + def test_route_without_revision_classified_as_old(self) -> None: + """Routes with revision_id=None are classified as old.""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [make_route(revision_id=None, status=RouteStatus.HEALTHY)] # type: ignore[arg-type] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert len(result.route_changes.rollout_specs) == 1 + + def test_provisioning_prioritized_over_completion_check(self) -> None: + """PROVISIONING check comes before completion check in FSM.""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.PROVISIONING), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + +# =========================================================================== +# 7. Route creator specs validation +# =========================================================================== + + +class TestRouteCreatorSpecs: + """Validate that route creator specs have correct fields.""" + + def test_creator_specs_use_deploying_revision(self) -> None: + """Created routes should use the deploying revision metadata.""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, []) + + assert len(result.route_changes.rollout_specs) == 1 + creator_spec = result.route_changes.rollout_specs[0].spec + assert isinstance(creator_spec, RouteCreatorSpec) + assert creator_spec.revision_id == NEW_REV + assert creator_spec.endpoint_id == ENDPOINT_ID + assert creator_spec.session_owner_id == USER_ID + assert creator_spec.domain == "default" + assert creator_spec.project_id == PROJECT_ID + + +# =========================================================================== +# 8. Realistic multi-step scenario (desired=5) +# =========================================================================== + + +class TestRealisticScenario: + """Simulate a realistic rolling update with desired=5, surge=2, unavail=1.""" + + def test_step_by_step_rolling_update(self) -> None: + """Full simulation of a rolling update across multiple cycles.""" + deployment = make_deployment(desired=5) + spec = RollingUpdateSpec(max_surge=2, max_unavailable=1) + + # Cycle 1: 5 old → create 2, terminate 1 + old_routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(5)] + r1 = RollingUpdateStrategy(spec).evaluate_cycle(deployment, old_routes) + assert len(r1.route_changes.rollout_specs) == 2 + assert len(r1.route_changes.drain_route_ids) == 1 + + # Cycle 2: 4 old, 2 new healthy → create 1, terminate 2 + routes_c2 = [ + *[make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(4)], + *[make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY) for _ in range(2)], + ] + r2 = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes_c2) + assert len(r2.route_changes.rollout_specs) == 1 + assert len(r2.route_changes.drain_route_ids) == 2 + + # Cycle 3: 2 old, 3 new healthy → create 2, terminate 1 + routes_c3 = [ + *[make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(2)], + *[make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY) for _ in range(3)], + ] + r3 = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes_c3) + assert len(r3.route_changes.rollout_specs) == 2 + assert len(r3.route_changes.drain_route_ids) == 1 + + # Cycle 4: 1 old, 5 new healthy → create 0, terminate 1 + routes_c4 = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + *[make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY) for _ in range(5)], + ] + r4 = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes_c4) + assert len(r4.route_changes.rollout_specs) == 0 + assert len(r4.route_changes.drain_route_ids) == 1 + assert r4.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + # Cycle 5: 0 old, 5 new healthy → completed + routes_c5 = [make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY) for _ in range(5)] + r5 = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes_c5) + assert r5.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED + + +# =========================================================================== +# 9. Deadlock prevention +# =========================================================================== + + +class TestDeadlockPrevention: + """Test scenarios where the FSM could potentially stall.""" + + def test_surge_0_terminates_first_then_creates(self) -> None: + """surge=0, unavailable=1 → terminate first, next cycle creates.""" + deployment = make_deployment(desired=3) + spec = RollingUpdateSpec(max_surge=0, max_unavailable=1) + + # Cycle 1: 3 old → terminate 1, create 0 + routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(3)] + r1 = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + assert len(r1.route_changes.rollout_specs) == 0 + assert len(r1.route_changes.drain_route_ids) == 1 + + # Cycle 2: 2 old → create 1, terminate 0 + routes_c2 = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(2)] + r2 = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes_c2) + assert len(r2.route_changes.rollout_specs) == 1 + assert len(r2.route_changes.drain_route_ids) == 0 + + def test_new_routes_exceed_desired_no_extra_create(self) -> None: + """More new_live than desired → no extra creation (still_needed < 0).""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=2, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert len(result.route_changes.rollout_specs) == 0 + assert len(result.route_changes.drain_route_ids) == 1 + + def test_provisioning_blocks_all_further_actions(self) -> None: + """Any new route in PROVISIONING → wait, even if old can be terminated.""" + deployment = make_deployment(desired=3) + spec = RollingUpdateSpec(max_surge=2, max_unavailable=1) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.PROVISIONING), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result.route_changes.rollout_specs) == 0 + assert len(result.route_changes.drain_route_ids) == 0 + + +# =========================================================================== +# 10. desired_replica_count resolution +# =========================================================================== + + +class TestDesiredReplicaCount: + """Test that the correct desired count is used.""" + + def test_desired_replica_count_overrides_replica_count(self) -> None: + """When desired_replica_count is set, it takes precedence.""" + deployment = make_deployment(desired=3) + deployment.replica_spec = ReplicaSpec( + replica_count=1, + desired_replica_count=3, + ) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY)] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert len(result.route_changes.rollout_specs) == 3 + + def test_replica_count_used_when_no_desired(self) -> None: + """When desired_replica_count is None, uses replica_count.""" + deployment = make_deployment(desired=2) + deployment.replica_spec = ReplicaSpec( + replica_count=2, + desired_replica_count=None, + ) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED + + +# =========================================================================== +# 11. Scale changes during rolling update +# =========================================================================== + + +class TestScaleChangeDuringRollingUpdate: + """Test behavior when desired changes during rolling update.""" + + def test_desired_reduced_terminates_excess_old(self) -> None: + """If desired is lowered, more old can be terminated.""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(3)] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert len(result.route_changes.rollout_specs) == 0 + assert len(result.route_changes.drain_route_ids) == 2 + + def test_desired_increased_creates_more(self) -> None: + """If desired is raised, more new routes are created.""" + deployment = make_deployment(desired=5) + spec = RollingUpdateSpec(max_surge=2, max_unavailable=0) + routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(2)] + + result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + + assert len(result.route_changes.rollout_specs) == 5 From 02f882efdff07be21b6b36c58b522f869f10ed9a Mon Sep 17 00:00:00 2001 From: jopemachine Date: Fri, 20 Mar 2026 13:08:30 +0900 Subject: [PATCH 04/15] wip --- src/ai/backend/manager/sokovan/deployment/coordinator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index 6d38d02da54..b3c54a1877c 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -85,8 +85,8 @@ # Timeout thresholds for deployment lifecycle statuses (seconds). _DEPLOYMENT_STATUS_TIMEOUT_MAP: dict[EndpointLifecycle, float] = { - EndpointLifecycle.DEPLOYING: 1800.0, # 30 minutes - EndpointLifecycle.SCALING: 1800.0, # 30 minutes + EndpointLifecycle.DEPLOYING: 300.0, # 5 minutes (temporarily reduced for testing) + EndpointLifecycle.SCALING: 300.0, # 5 minutes (temporarily reduced for testing) } From 9b0733a962e0ed07c2a9648f3466898c2dd82d7d Mon Sep 17 00:00:00 2001 From: jopemachine Date: Fri, 20 Mar 2026 13:37:57 +0900 Subject: [PATCH 05/15] wipp --- .../manager/event_dispatcher/handlers/schedule.py | 1 - .../manager/sokovan/deployment/coordinator.py | 14 +++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/ai/backend/manager/event_dispatcher/handlers/schedule.py b/src/ai/backend/manager/event_dispatcher/handlers/schedule.py index e1c06cc0f25..05a9ae63bbb 100644 --- a/src/ai/backend/manager/event_dispatcher/handlers/schedule.py +++ b/src/ai/backend/manager/event_dispatcher/handlers/schedule.py @@ -17,7 +17,6 @@ from ai.backend.common.events.hub.hub import EventHub from ai.backend.common.types import AgentId from ai.backend.logging.utils import BraceStyleAdapter -from ai.backend.manager.data.deployment.types import DeploymentLifecycleSubStep from ai.backend.manager.scheduler.types import ScheduleType from ai.backend.manager.sokovan.deployment.coordinator import DeploymentCoordinator from ai.backend.manager.sokovan.deployment.route.coordinator import RouteCoordinator diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index b3c54a1877c..e9558d220c7 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -149,6 +149,15 @@ class HandlerRegistry: handlers: dict[HandlerKey, DeploymentHandler] + def resolve_sub_step( + self, lifecycle_type: DeploymentLifecycleType, raw: str + ) -> DeploymentLifecycleSubStep: + """Resolve a raw sub-step string using registered handler keys.""" + for lt, sub_step in self.handlers: + if lt == lifecycle_type and sub_step is not None and sub_step.value == raw: + return sub_step + raise ValueError(f"Unknown sub-step {raw!r} for lifecycle type {lifecycle_type!r}") + @dataclass class DeploymentTaskSpec: @@ -328,8 +337,11 @@ def _init_handlers( async def process_deployment_lifecycle( self, lifecycle_type: DeploymentLifecycleType, - sub_step: DeploymentLifecycleSubStep | None = None, + raw_sub_step: str | None = None, ) -> None: + sub_step = ( + self._registry.resolve_sub_step(lifecycle_type, raw_sub_step) if raw_sub_step else None + ) handler = self._registry.handlers.get((lifecycle_type, sub_step)) if handler is None: log.warning( From f4eb3d4219a62725241bf8ba9e2274b06834300c Mon Sep 17 00:00:00 2001 From: jopemachine Date: Fri, 20 Mar 2026 14:15:15 +0900 Subject: [PATCH 06/15] wip --- tests/unit/manager/sokovan/deployment/strategy/test_applier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/manager/sokovan/deployment/strategy/test_applier.py b/tests/unit/manager/sokovan/deployment/strategy/test_applier.py index 638088e60c1..13c216587e7 100644 --- a/tests/unit/manager/sokovan/deployment/strategy/test_applier.py +++ b/tests/unit/manager/sokovan/deployment/strategy/test_applier.py @@ -221,7 +221,7 @@ async def test_mixed_assignments_handles_all( mock_deployment_repo.apply_strategy_mutations.assert_called_once() kwargs = mock_deployment_repo.apply_strategy_mutations.call_args.kwargs assert kwargs["completed_ids"] == {completed_id} - assert len(kwargs["rollout"].specs) == 1 + assert len(kwargs["rollout"]) == 1 assert kwargs["drain"] is not None assert result.completed_ids == {completed_id} assert result.routes_created == 1 From 2119e3256ac628d3c278d3e6131484d3fa99fdef Mon Sep 17 00:00:00 2001 From: jopemachine Date: Fri, 20 Mar 2026 14:47:25 +0900 Subject: [PATCH 07/15] wip --- src/ai/backend/manager/models/endpoint/row.py | 8 ++++++-- src/ai/backend/manager/sokovan/deployment/coordinator.py | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/ai/backend/manager/models/endpoint/row.py b/src/ai/backend/manager/models/endpoint/row.py index 46b2b1d5015..7db3871acfa 100644 --- a/src/ai/backend/manager/models/endpoint/row.py +++ b/src/ai/backend/manager/models/endpoint/row.py @@ -864,7 +864,9 @@ def _to_deployment_info_with_revisions( model_revisions=list(model_revisions), current_revision_id=self.current_revision, deploying_revision_id=self.deploying_revision, - sub_step=self.sub_step, + sub_step=DeploymentLifecycleSubStep(self.sub_step) + if self.sub_step is not None + else None, ) def build_revision_spec_from_endpoint(self) -> ModelRevisionSpec: @@ -932,7 +934,9 @@ def _to_deployment_info_legacy(self) -> DeploymentInfo: model_revisions=[self.build_revision_spec_from_endpoint()], current_revision_id=self.current_revision, deploying_revision_id=self.deploying_revision, - sub_step=self.sub_step, + sub_step=DeploymentLifecycleSubStep(self.sub_step) + if self.sub_step is not None + else None, ) diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index e9558d220c7..d39f28e23ac 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -85,8 +85,8 @@ # Timeout thresholds for deployment lifecycle statuses (seconds). _DEPLOYMENT_STATUS_TIMEOUT_MAP: dict[EndpointLifecycle, float] = { - EndpointLifecycle.DEPLOYING: 300.0, # 5 minutes (temporarily reduced for testing) - EndpointLifecycle.SCALING: 300.0, # 5 minutes (temporarily reduced for testing) + EndpointLifecycle.DEPLOYING: 1800.0, # 30 minutes + EndpointLifecycle.SCALING: 1800.0, # 30 minutes } From cbbb215bebeb1aace8546e7d9dd4a58824bb0418 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Fri, 20 Mar 2026 14:55:23 +0900 Subject: [PATCH 08/15] wip --- .../backend/manager/event_dispatcher/handlers/schedule.py | 1 + src/ai/backend/manager/sokovan/deployment/coordinator.py | 6 +----- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/ai/backend/manager/event_dispatcher/handlers/schedule.py b/src/ai/backend/manager/event_dispatcher/handlers/schedule.py index 05a9ae63bbb..e1c06cc0f25 100644 --- a/src/ai/backend/manager/event_dispatcher/handlers/schedule.py +++ b/src/ai/backend/manager/event_dispatcher/handlers/schedule.py @@ -17,6 +17,7 @@ from ai.backend.common.events.hub.hub import EventHub from ai.backend.common.types import AgentId from ai.backend.logging.utils import BraceStyleAdapter +from ai.backend.manager.data.deployment.types import DeploymentLifecycleSubStep from ai.backend.manager.scheduler.types import ScheduleType from ai.backend.manager.sokovan.deployment.coordinator import DeploymentCoordinator from ai.backend.manager.sokovan.deployment.route.coordinator import RouteCoordinator diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index d39f28e23ac..2875202f748 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -158,7 +158,6 @@ def resolve_sub_step( return sub_step raise ValueError(f"Unknown sub-step {raw!r} for lifecycle type {lifecycle_type!r}") - @dataclass class DeploymentTaskSpec: """Specification for a deployment lifecycle periodic task.""" @@ -337,11 +336,8 @@ def _init_handlers( async def process_deployment_lifecycle( self, lifecycle_type: DeploymentLifecycleType, - raw_sub_step: str | None = None, + sub_step: DeploymentLifecycleSubStep | None = None, ) -> None: - sub_step = ( - self._registry.resolve_sub_step(lifecycle_type, raw_sub_step) if raw_sub_step else None - ) handler = self._registry.handlers.get((lifecycle_type, sub_step)) if handler is None: log.warning( From 45755f0f07437b49f5a88952929f2024f5e27a1a Mon Sep 17 00:00:00 2001 From: jopemachine Date: Fri, 20 Mar 2026 14:59:08 +0900 Subject: [PATCH 09/15] wip --- src/ai/backend/manager/models/endpoint/row.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/ai/backend/manager/models/endpoint/row.py b/src/ai/backend/manager/models/endpoint/row.py index 7db3871acfa..46b2b1d5015 100644 --- a/src/ai/backend/manager/models/endpoint/row.py +++ b/src/ai/backend/manager/models/endpoint/row.py @@ -864,9 +864,7 @@ def _to_deployment_info_with_revisions( model_revisions=list(model_revisions), current_revision_id=self.current_revision, deploying_revision_id=self.deploying_revision, - sub_step=DeploymentLifecycleSubStep(self.sub_step) - if self.sub_step is not None - else None, + sub_step=self.sub_step, ) def build_revision_spec_from_endpoint(self) -> ModelRevisionSpec: @@ -934,9 +932,7 @@ def _to_deployment_info_legacy(self) -> DeploymentInfo: model_revisions=[self.build_revision_spec_from_endpoint()], current_revision_id=self.current_revision, deploying_revision_id=self.deploying_revision, - sub_step=DeploymentLifecycleSubStep(self.sub_step) - if self.sub_step is not None - else None, + sub_step=self.sub_step, ) From e537e5100595f5ab1f15f489f31f78634805c5c3 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Fri, 20 Mar 2026 15:55:42 +0900 Subject: [PATCH 10/15] fix: Apply formatting to coordinator after rebase Co-Authored-By: Claude Opus 4.6 (1M context) --- src/ai/backend/manager/sokovan/deployment/coordinator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index 2875202f748..5223d3905da 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -148,7 +148,6 @@ class HandlerRegistry: handlers: dict[HandlerKey, DeploymentHandler] - def resolve_sub_step( self, lifecycle_type: DeploymentLifecycleType, raw: str ) -> DeploymentLifecycleSubStep: @@ -158,6 +157,7 @@ def resolve_sub_step( return sub_step raise ValueError(f"Unknown sub-step {raw!r} for lifecycle type {lifecycle_type!r}") + @dataclass class DeploymentTaskSpec: """Specification for a deployment lifecycle periodic task.""" From f4e10a446f605a70b7446e25bf7a03c5c0a918f5 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Fri, 20 Mar 2026 16:23:39 +0900 Subject: [PATCH 11/15] fix: Remove unused resolve_sub_step from HandlerRegistry Co-Authored-By: Claude Opus 4.6 (1M context) --- src/ai/backend/manager/sokovan/deployment/coordinator.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index 5223d3905da..611fbb742e7 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -148,15 +148,6 @@ class HandlerRegistry: handlers: dict[HandlerKey, DeploymentHandler] - def resolve_sub_step( - self, lifecycle_type: DeploymentLifecycleType, raw: str - ) -> DeploymentLifecycleSubStep: - """Resolve a raw sub-step string using registered handler keys.""" - for lt, sub_step in self.handlers: - if lt == lifecycle_type and sub_step is not None and sub_step.value == raw: - return sub_step - raise ValueError(f"Unknown sub-step {raw!r} for lifecycle type {lifecycle_type!r}") - @dataclass class DeploymentTaskSpec: From 85383d3b0f0e86ebab6515e52c808b14025f2f6a Mon Sep 17 00:00:00 2001 From: Gyubong Date: Mon, 23 Mar 2026 10:57:17 +0900 Subject: [PATCH 12/15] bep --- src/ai/backend/manager/sokovan/deployment/executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ai/backend/manager/sokovan/deployment/executor.py b/src/ai/backend/manager/sokovan/deployment/executor.py index 2f66c288666..e8a77d81a77 100644 --- a/src/ai/backend/manager/sokovan/deployment/executor.py +++ b/src/ai/backend/manager/sokovan/deployment/executor.py @@ -29,7 +29,6 @@ from ai.backend.manager.data.deployment.scale import AutoScalingRule from ai.backend.manager.data.deployment.types import ( DeploymentInfo, - ModelRevisionSpec, RouteInfo, RouteStatus, RouteTrafficStatus, From 963f91ebe43f3468ddaee9e46883752a87e7a8c1 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Mon, 23 Mar 2026 11:24:05 +0900 Subject: [PATCH 13/15] feat: Make `RollingUpdateSpec` `evaluate_cycle`'s argument --- .../models/deployment_policy/__init__.py | 2 + .../manager/models/deployment_policy/row.py | 6 +- .../sokovan/deployment/strategy/blue_green.py | 5 +- .../sokovan/deployment/strategy/evaluator.py | 4 +- .../deployment/strategy/rolling_update.py | 14 ++--- .../sokovan/deployment/strategy/types.py | 7 +-- .../strategy/test_rolling_update.py | 62 +++++++++---------- 7 files changed, 49 insertions(+), 51 deletions(-) diff --git a/src/ai/backend/manager/models/deployment_policy/__init__.py b/src/ai/backend/manager/models/deployment_policy/__init__.py index efb2d32aee2..1a6e2de699a 100644 --- a/src/ai/backend/manager/models/deployment_policy/__init__.py +++ b/src/ai/backend/manager/models/deployment_policy/__init__.py @@ -1,11 +1,13 @@ from .row import ( BlueGreenSpec, DeploymentPolicyRow, + DeploymentStrategySpec, RollingUpdateSpec, ) __all__ = ( "BlueGreenSpec", "DeploymentPolicyRow", + "DeploymentStrategySpec", "RollingUpdateSpec", ) diff --git a/src/ai/backend/manager/models/deployment_policy/row.py b/src/ai/backend/manager/models/deployment_policy/row.py index 5a701851d2c..ade92872d5d 100644 --- a/src/ai/backend/manager/models/deployment_policy/row.py +++ b/src/ai/backend/manager/models/deployment_policy/row.py @@ -3,7 +3,7 @@ import logging import uuid from datetime import datetime -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Union import sqlalchemy as sa from pydantic import BaseModel, Field, model_validator @@ -26,6 +26,7 @@ __all__ = ( "BlueGreenSpec", "DeploymentPolicyRow", + "DeploymentStrategySpec", "RollingUpdateSpec", ) @@ -61,6 +62,9 @@ class BlueGreenSpec(BaseModel): promote_delay_seconds: int = 0 +DeploymentStrategySpec = Union[RollingUpdateSpec, BlueGreenSpec] + + def _get_endpoint_join_condition() -> sa.ColumnElement[bool]: from ai.backend.manager.models.endpoint import EndpointRow diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py b/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py index c0b6eb6463d..1a790984e47 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py @@ -21,15 +21,12 @@ class BlueGreenStrategy(AbstractDeploymentStrategy): """Blue-green deployment strategy FSM.""" - def __init__(self, spec: BlueGreenSpec) -> None: - super().__init__(spec) - self._spec = spec - @override def evaluate_cycle( self, deployment: DeploymentInfo, routes: Sequence[RouteInfo], + spec: BlueGreenSpec, ) -> StrategyCycleResult: """Evaluate one cycle of blue-green deployment for a single deployment.""" raise NotImplementedError("Blue-green deployment strategy is not yet implemented") diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py b/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py index c88369afb21..261bb17927e 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py @@ -133,7 +133,7 @@ async def evaluate( try: strategy = self._create_strategy(policy.strategy, policy) - cycle_result = strategy.evaluate_cycle(deployment, routes) + cycle_result = strategy.evaluate_cycle(deployment, routes, policy.strategy_spec) except BackendAIError as e: log.warning("deployment {}: evaluation error — {}", deployment.id, e) result.errors.append(EvaluationErrorData(deployment=deployment, reason=str(e))) @@ -195,4 +195,4 @@ def _create_strategy( f" got {type(spec).__name__}" ), ) - return entry.strategy_cls(spec) + return entry.strategy_cls() diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py b/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py index 81cc6356e23..963464256ba 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py @@ -60,17 +60,12 @@ def total_new_running(self) -> int: class RollingUpdateStrategy(AbstractDeploymentStrategy): """Rolling update deployment strategy FSM.""" - _spec: RollingUpdateSpec - - def __init__(self, spec: RollingUpdateSpec) -> None: - super().__init__(spec) - self._spec = spec - @override def evaluate_cycle( self, deployment: DeploymentInfo, routes: Sequence[RouteInfo], + spec: RollingUpdateSpec, ) -> StrategyCycleResult: """Evaluate one cycle of rolling update for a single deployment. @@ -110,7 +105,7 @@ def evaluate_cycle( return result if result := self._check_completed(deployment, classified, desired): return result - return self._compute_progressing(deployment, classified, desired) + return self._compute_progressing(deployment, classified, desired, spec) def _classify_routes( self, @@ -171,10 +166,11 @@ def _compute_progressing( deployment: DeploymentInfo, classified: _ClassifiedRoutes, desired: int, + spec: RollingUpdateSpec, ) -> StrategyCycleResult: """Compute surge/unavailable budget and return PROVISIONING with route mutations.""" - max_surge = self._spec.max_surge # extra routes allowed above desired - max_unavailable = self._spec.max_unavailable # routes allowed to be down + max_surge = spec.max_surge # extra routes allowed above desired + max_unavailable = spec.max_unavailable # routes allowed to be down max_total = desired + max_surge # upper bound on simultaneous routes current_total = ( diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/types.py b/src/ai/backend/manager/sokovan/deployment/strategy/types.py index 71157c09e75..c4357cde8fd 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/types.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/types.py @@ -15,6 +15,7 @@ DeploymentLifecycleSubStep, RouteInfo, ) +from ai.backend.manager.models.deployment_policy import DeploymentStrategySpec from ai.backend.manager.models.routing import RoutingRow from ai.backend.manager.repositories.base.rbac.entity_creator import RBACEntityCreator @@ -69,17 +70,15 @@ class AbstractDeploymentStrategy(ABC): """Base interface for deployment strategy cycle evaluation. Each concrete strategy (Blue-Green, Rolling Update) implements this interface. - The spec is injected via ``__init__`` — one instance per deployment. + The spec is passed per-cycle via ``evaluate_cycle``. """ - def __init__(self, spec: BaseModel) -> None: - self._spec = spec - @abstractmethod def evaluate_cycle( self, deployment: DeploymentInfo, routes: Sequence[RouteInfo], + spec: DeploymentStrategySpec, ) -> StrategyCycleResult: ... diff --git a/tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py b/tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py index 0347db5ba22..9876f80b3d9 100644 --- a/tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py +++ b/tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py @@ -144,7 +144,7 @@ def test_no_routes_creates_new(self) -> None: deployment = make_deployment(desired=1) spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, []) + result = RollingUpdateStrategy().evaluate_cycle(deployment, [], spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING assert len(result.route_changes.rollout_specs) == 1 @@ -159,7 +159,7 @@ def test_new_provisioning_waits(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.PROVISIONING), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING assert len(result.route_changes.rollout_specs) == 0 @@ -174,7 +174,7 @@ def test_completed_when_all_new_healthy_and_no_old(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED @@ -197,7 +197,7 @@ def test_all_new_failed_retries_creation(self, failed_status: RouteStatus) -> No make_route(revision_id=NEW_REV, status=failed_status), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING @@ -304,7 +304,7 @@ def test_budget_with_old_routes_only(self, scenario: RollingUpdateScenario) -> N for _ in range(scenario.input.old_count) ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert len(result.route_changes.rollout_specs) == scenario.expected.create assert len(result.route_changes.drain_route_ids) == scenario.expected.terminate @@ -319,7 +319,7 @@ def test_surge_already_at_max_no_create(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert len(result.route_changes.rollout_specs) == 0 @@ -334,7 +334,7 @@ def test_new_healthy_allows_more_termination(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert len(result.route_changes.drain_route_ids) == 1 @@ -362,7 +362,7 @@ def test_new_healthy_enables_further_creation(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert len(result.route_changes.rollout_specs) == 1 assert len(result.route_changes.drain_route_ids) == 0 @@ -378,7 +378,7 @@ def test_multiple_new_healthy_enables_old_termination(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert len(result.route_changes.rollout_specs) == 0 assert len(result.route_changes.drain_route_ids) == 1 @@ -393,7 +393,7 @@ def test_not_completed_when_old_still_exists(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING assert len(result.route_changes.drain_route_ids) == 1 @@ -415,7 +415,7 @@ def test_degraded_new_waits_provisioning(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.DEGRADED), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING @@ -427,7 +427,7 @@ def test_unhealthy_new_retries(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.UNHEALTHY), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING @@ -447,7 +447,7 @@ def test_old_inactive_not_counted_as_active(self, inactive_status: RouteStatus) make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED @@ -461,7 +461,7 @@ def test_partial_new_failure_continues_progress(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.FAILED_TO_START), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING @@ -474,7 +474,7 @@ def test_old_provisioning_counted_as_active(self) -> None: make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING @@ -511,7 +511,7 @@ def test_full_priority_order(self) -> None: ), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) terminated = result.route_changes.drain_route_ids assert len(terminated) == 3 @@ -533,7 +533,7 @@ def test_desired_0_no_routes_completed(self) -> None: deployment = make_deployment(desired=0) spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, []) + result = RollingUpdateStrategy().evaluate_cycle(deployment, [], spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED @@ -547,7 +547,7 @@ def test_more_new_healthy_than_desired_still_completes(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED @@ -560,7 +560,7 @@ def test_only_failed_new_no_old_rolls_back(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.FAILED_TO_START), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING @@ -573,7 +573,7 @@ def test_all_old_inactive_no_new_creates_desired(self) -> None: make_route(revision_id=OLD_REV, status=RouteStatus.TERMINATED), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert len(result.route_changes.rollout_specs) == 2 @@ -584,7 +584,7 @@ def test_deploying_rev_none_rejected(self) -> None: routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY)] with pytest.raises(Exception): # InvalidEndpointState - RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) def test_route_without_revision_classified_as_old(self) -> None: """Routes with revision_id=None are classified as old.""" @@ -592,7 +592,7 @@ def test_route_without_revision_classified_as_old(self) -> None: spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) routes = [make_route(revision_id=None, status=RouteStatus.HEALTHY)] # type: ignore[arg-type] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert len(result.route_changes.rollout_specs) == 1 @@ -605,7 +605,7 @@ def test_provisioning_prioritized_over_completion_check(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.PROVISIONING), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING @@ -623,7 +623,7 @@ def test_creator_specs_use_deploying_revision(self) -> None: deployment = make_deployment(desired=1) spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, []) + result = RollingUpdateStrategy().evaluate_cycle(deployment, [], spec) assert len(result.route_changes.rollout_specs) == 1 creator_spec = result.route_changes.rollout_specs[0].spec @@ -703,7 +703,7 @@ def test_surge_0_terminates_first_then_creates(self) -> None: # Cycle 1: 3 old → terminate 1, create 0 routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(3)] - r1 = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + r1 = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert len(r1.route_changes.rollout_specs) == 0 assert len(r1.route_changes.drain_route_ids) == 1 @@ -724,7 +724,7 @@ def test_new_routes_exceed_desired_no_extra_create(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert len(result.route_changes.rollout_specs) == 0 assert len(result.route_changes.drain_route_ids) == 1 @@ -740,7 +740,7 @@ def test_provisioning_blocks_all_further_actions(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.PROVISIONING), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING assert len(result.route_changes.rollout_specs) == 0 @@ -765,7 +765,7 @@ def test_desired_replica_count_overrides_replica_count(self) -> None: spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY)] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert len(result.route_changes.rollout_specs) == 3 @@ -782,7 +782,7 @@ def test_replica_count_used_when_no_desired(self) -> None: make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), ] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED @@ -801,7 +801,7 @@ def test_desired_reduced_terminates_excess_old(self) -> None: spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(3)] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert len(result.route_changes.rollout_specs) == 0 assert len(result.route_changes.drain_route_ids) == 2 @@ -812,6 +812,6 @@ def test_desired_increased_creates_more(self) -> None: spec = RollingUpdateSpec(max_surge=2, max_unavailable=0) routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(2)] - result = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes) + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) assert len(result.route_changes.rollout_specs) == 5 From 5078fe52ed49df9aac3458d168114cedf4f7cf19 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Mon, 23 Mar 2026 11:35:09 +0900 Subject: [PATCH 14/15] wip --- .../sokovan/deployment/strategy/blue_green.py | 4 +- .../deployment/strategy/rolling_update.py | 40 +++++++++---------- .../strategy/test_rolling_update.py | 12 +++--- 3 files changed, 26 insertions(+), 30 deletions(-) diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py b/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py index 1a790984e47..683873c7295 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py @@ -13,7 +13,7 @@ DeploymentInfo, RouteInfo, ) -from ai.backend.manager.models.deployment_policy import BlueGreenSpec +from ai.backend.manager.models.deployment_policy import DeploymentStrategySpec from .types import AbstractDeploymentStrategy, StrategyCycleResult @@ -26,7 +26,7 @@ def evaluate_cycle( self, deployment: DeploymentInfo, routes: Sequence[RouteInfo], - spec: BlueGreenSpec, + spec: DeploymentStrategySpec, ) -> StrategyCycleResult: """Evaluate one cycle of blue-green deployment for a single deployment.""" raise NotImplementedError("Blue-green deployment strategy is not yet implemented") diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py b/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py index 963464256ba..897ce8cc164 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py @@ -21,7 +21,7 @@ RouteStatus, ) from ai.backend.manager.data.permission.types import RBACElementRef -from ai.backend.manager.models.deployment_policy import RollingUpdateSpec +from ai.backend.manager.models.deployment_policy import DeploymentStrategySpec, RollingUpdateSpec from ai.backend.manager.models.routing import RoutingRow from ai.backend.manager.repositories.base.rbac.entity_creator import RBACEntityCreator from ai.backend.manager.repositories.deployment.creators import RouteCreatorSpec @@ -65,7 +65,7 @@ def evaluate_cycle( self, deployment: DeploymentInfo, routes: Sequence[RouteInfo], - spec: RollingUpdateSpec, + spec: DeploymentStrategySpec, ) -> StrategyCycleResult: """Evaluate one cycle of rolling update for a single deployment. @@ -80,6 +80,7 @@ def evaluate_cycle( sweep handles it by transitioning to ROLLING_BACK when the deploying timeout is exceeded. """ + assert isinstance(spec, RollingUpdateSpec) desired = deployment.replica_spec.target_replica_count deploying_revision_id = deployment.deploying_revision_id if deploying_revision_id is None: @@ -101,11 +102,9 @@ def evaluate_cycle( classified.new_failed_count, ) - if result := self._check_provisioning(deployment, classified): - return result if result := self._check_completed(deployment, classified, desired): return result - return self._compute_progressing(deployment, classified, desired, spec) + return self._compute_route_mutations(deployment, classified, desired, spec) def _classify_routes( self, @@ -130,21 +129,6 @@ def _classify_routes( classified.new_unhealthy_count += 1 return classified - def _check_provisioning( - self, - deployment: DeploymentInfo, - classified: _ClassifiedRoutes, - ) -> StrategyCycleResult | None: - """Return PROVISIONING result if any new routes are still being provisioned.""" - if not classified.new_provisioning_count: - return None - log.debug( - "deployment {}: {} new routes still provisioning", - deployment.id, - classified.new_provisioning_count, - ) - return StrategyCycleResult(sub_step=DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING) - def _check_completed( self, deployment: DeploymentInfo, @@ -161,14 +145,26 @@ def _check_completed( ) return StrategyCycleResult(sub_step=DeploymentLifecycleSubStep.DEPLOYING_COMPLETED) - def _compute_progressing( + def _compute_route_mutations( self, deployment: DeploymentInfo, classified: _ClassifiedRoutes, desired: int, spec: RollingUpdateSpec, ) -> StrategyCycleResult: - """Compute surge/unavailable budget and return PROVISIONING with route mutations.""" + """Compute surge/unavailable budget and return PROVISIONING with route mutations. + + If new routes are still being provisioned, waits without creating or + terminating additional routes. + """ + if classified.new_provisioning_count: + log.debug( + "deployment {}: {} new routes still provisioning", + deployment.id, + classified.new_provisioning_count, + ) + return StrategyCycleResult(sub_step=DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING) + max_surge = spec.max_surge # extra routes allowed above desired max_unavailable = spec.max_unavailable # routes allowed to be down diff --git a/tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py b/tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py index 9876f80b3d9..536a6cebbf8 100644 --- a/tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py +++ b/tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py @@ -650,7 +650,7 @@ def test_step_by_step_rolling_update(self) -> None: # Cycle 1: 5 old → create 2, terminate 1 old_routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(5)] - r1 = RollingUpdateStrategy(spec).evaluate_cycle(deployment, old_routes) + r1 = RollingUpdateStrategy().evaluate_cycle(deployment, old_routes, spec) assert len(r1.route_changes.rollout_specs) == 2 assert len(r1.route_changes.drain_route_ids) == 1 @@ -659,7 +659,7 @@ def test_step_by_step_rolling_update(self) -> None: *[make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(4)], *[make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY) for _ in range(2)], ] - r2 = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes_c2) + r2 = RollingUpdateStrategy().evaluate_cycle(deployment, routes_c2, spec) assert len(r2.route_changes.rollout_specs) == 1 assert len(r2.route_changes.drain_route_ids) == 2 @@ -668,7 +668,7 @@ def test_step_by_step_rolling_update(self) -> None: *[make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(2)], *[make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY) for _ in range(3)], ] - r3 = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes_c3) + r3 = RollingUpdateStrategy().evaluate_cycle(deployment, routes_c3, spec) assert len(r3.route_changes.rollout_specs) == 2 assert len(r3.route_changes.drain_route_ids) == 1 @@ -677,14 +677,14 @@ def test_step_by_step_rolling_update(self) -> None: make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), *[make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY) for _ in range(5)], ] - r4 = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes_c4) + r4 = RollingUpdateStrategy().evaluate_cycle(deployment, routes_c4, spec) assert len(r4.route_changes.rollout_specs) == 0 assert len(r4.route_changes.drain_route_ids) == 1 assert r4.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING # Cycle 5: 0 old, 5 new healthy → completed routes_c5 = [make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY) for _ in range(5)] - r5 = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes_c5) + r5 = RollingUpdateStrategy().evaluate_cycle(deployment, routes_c5, spec) assert r5.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED @@ -709,7 +709,7 @@ def test_surge_0_terminates_first_then_creates(self) -> None: # Cycle 2: 2 old → create 1, terminate 0 routes_c2 = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(2)] - r2 = RollingUpdateStrategy(spec).evaluate_cycle(deployment, routes_c2) + r2 = RollingUpdateStrategy().evaluate_cycle(deployment, routes_c2, spec) assert len(r2.route_changes.rollout_specs) == 1 assert len(r2.route_changes.drain_route_ids) == 0 From d89b05127f0e026a75169ccee69a8db34cac4002 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Mon, 23 Mar 2026 12:29:36 +0900 Subject: [PATCH 15/15] wip --- src/ai/backend/manager/models/deployment_policy/row.py | 4 ++-- .../sokovan/deployment/strategy/rolling_update.py | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/ai/backend/manager/models/deployment_policy/row.py b/src/ai/backend/manager/models/deployment_policy/row.py index ade92872d5d..81b759c3ae0 100644 --- a/src/ai/backend/manager/models/deployment_policy/row.py +++ b/src/ai/backend/manager/models/deployment_policy/row.py @@ -3,7 +3,7 @@ import logging import uuid from datetime import datetime -from typing import TYPE_CHECKING, Union +from typing import TYPE_CHECKING import sqlalchemy as sa from pydantic import BaseModel, Field, model_validator @@ -62,7 +62,7 @@ class BlueGreenSpec(BaseModel): promote_delay_seconds: int = 0 -DeploymentStrategySpec = Union[RollingUpdateSpec, BlueGreenSpec] +DeploymentStrategySpec = RollingUpdateSpec | BlueGreenSpec def _get_endpoint_join_condition() -> sa.ColumnElement[bool]: diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py b/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py index 897ce8cc164..ba585de835a 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py @@ -80,7 +80,8 @@ def evaluate_cycle( sweep handles it by transitioning to ROLLING_BACK when the deploying timeout is exceeded. """ - assert isinstance(spec, RollingUpdateSpec) + if not isinstance(spec, RollingUpdateSpec): + raise TypeError(f"Expected RollingUpdateSpec, got {type(spec).__name__}") desired = deployment.replica_spec.target_replica_count deploying_revision_id = deployment.deploying_revision_id if deploying_revision_id is None: @@ -136,7 +137,11 @@ def _check_completed( desired: int, ) -> StrategyCycleResult | None: """Return COMPLETED result if all old routes are replaced and enough new are healthy.""" - if classified.old_active or classified.new_healthy_count < desired: + if ( + classified.old_active + or classified.new_provisioning_count + or classified.new_healthy_count < desired + ): return None log.info( "deployment {}: rolling update complete ({} healthy routes)",