diff --git a/changes/9997.feature.md b/changes/9997.feature.md new file mode 100644 index 00000000000..1065196f9bd --- /dev/null +++ b/changes/9997.feature.md @@ -0,0 +1 @@ +Implement Rolling Update deployment strategy diff --git a/src/ai/backend/manager/models/deployment_policy/__init__.py b/src/ai/backend/manager/models/deployment_policy/__init__.py index efb2d32aee2..1a6e2de699a 100644 --- a/src/ai/backend/manager/models/deployment_policy/__init__.py +++ b/src/ai/backend/manager/models/deployment_policy/__init__.py @@ -1,11 +1,13 @@ from .row import ( BlueGreenSpec, DeploymentPolicyRow, + DeploymentStrategySpec, RollingUpdateSpec, ) __all__ = ( "BlueGreenSpec", "DeploymentPolicyRow", + "DeploymentStrategySpec", "RollingUpdateSpec", ) diff --git a/src/ai/backend/manager/models/deployment_policy/row.py b/src/ai/backend/manager/models/deployment_policy/row.py index 5a701851d2c..81b759c3ae0 100644 --- a/src/ai/backend/manager/models/deployment_policy/row.py +++ b/src/ai/backend/manager/models/deployment_policy/row.py @@ -26,6 +26,7 @@ __all__ = ( "BlueGreenSpec", "DeploymentPolicyRow", + "DeploymentStrategySpec", "RollingUpdateSpec", ) @@ -61,6 +62,9 @@ class BlueGreenSpec(BaseModel): promote_delay_seconds: int = 0 +DeploymentStrategySpec = RollingUpdateSpec | BlueGreenSpec + + def _get_endpoint_join_condition() -> sa.ColumnElement[bool]: from ai.backend.manager.models.endpoint import EndpointRow diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py b/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py index c0b6eb6463d..683873c7295 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py @@ -13,7 +13,7 @@ DeploymentInfo, RouteInfo, ) -from ai.backend.manager.models.deployment_policy import BlueGreenSpec +from ai.backend.manager.models.deployment_policy import DeploymentStrategySpec from .types import AbstractDeploymentStrategy, StrategyCycleResult @@ -21,15 +21,12 @@ class BlueGreenStrategy(AbstractDeploymentStrategy): """Blue-green deployment strategy FSM.""" - def __init__(self, spec: BlueGreenSpec) -> None: - super().__init__(spec) - self._spec = spec - @override def evaluate_cycle( self, deployment: DeploymentInfo, routes: Sequence[RouteInfo], + spec: DeploymentStrategySpec, ) -> StrategyCycleResult: """Evaluate one cycle of blue-green deployment for a single deployment.""" raise NotImplementedError("Blue-green deployment strategy is not yet implemented") diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py b/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py index 3a9973dab8a..261bb17927e 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py @@ -25,9 +25,16 @@ InvalidDeploymentStrategy, InvalidDeploymentStrategySpec, ) -from ai.backend.manager.models.deployment_policy.conditions import DeploymentPolicyConditions -from ai.backend.manager.models.routing.conditions import RouteConditions -from ai.backend.manager.repositories.base import BatchQuerier, NoPagination +from ai.backend.manager.repositories.base import ( + BatchQuerier, + NoPagination, + QueryCondition, + combine_conditions_or, +) +from ai.backend.manager.repositories.deployment.options import ( + DeploymentPolicyConditions, + RouteConditions, +) from ai.backend.manager.repositories.deployment.repository import DeploymentRepository from ai.backend.manager.sokovan.deployment.recorder import DeploymentRecorderContext @@ -79,15 +86,33 @@ async def evaluate( ) ) policy_map = {policy.endpoint: policy for policy in policy_search.items} - # Fetch all non-terminated routes so the strategy can detect rollback - # conditions (e.g. FAILED_TO_START routes after a coordinator crash). + # Fetch non-terminated routes + terminated routes belonging to a + # deploying revision. The FSM needs terminated new-revision routes + # to count accumulated failures for rollback detection, but old + # terminated routes are irrelevant and would bloat the result set. + deploying_revision_ids = { + deployment.deploying_revision_id + for deployment in deployments + if deployment.deploying_revision_id is not None + } + route_conditions: list[QueryCondition] = [ + RouteConditions.by_endpoint_ids(endpoint_ids), + ] + if deploying_revision_ids: + route_conditions.append( + combine_conditions_or([ + RouteConditions.exclude_statuses([RouteStatus.TERMINATED]), + RouteConditions.by_revision_ids(deploying_revision_ids), + ]) + ) + else: + route_conditions.append( + RouteConditions.exclude_statuses([RouteStatus.TERMINATED]), + ) route_search = await self._deployment_repo.search_routes( BatchQuerier( pagination=NoPagination(), - conditions=[ - RouteConditions.by_endpoint_ids(endpoint_ids), - RouteConditions.exclude_statuses([RouteStatus.TERMINATED]), - ], + conditions=route_conditions, ) ) route_map: defaultdict[UUID, list[RouteInfo]] = defaultdict(list) @@ -108,7 +133,7 @@ async def evaluate( try: strategy = self._create_strategy(policy.strategy, policy) - cycle_result = strategy.evaluate_cycle(deployment, routes) + cycle_result = strategy.evaluate_cycle(deployment, routes, policy.strategy_spec) except BackendAIError as e: log.warning("deployment {}: evaluation error — {}", deployment.id, e) result.errors.append(EvaluationErrorData(deployment=deployment, reason=str(e))) @@ -170,4 +195,4 @@ def _create_strategy( f" got {type(spec).__name__}" ), ) - return entry.strategy_cls(spec) + return entry.strategy_cls() diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py b/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py index dc2ff159203..ba585de835a 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py @@ -6,30 +6,274 @@ from __future__ import annotations +import logging from collections.abc import Sequence +from dataclasses import dataclass, field from typing import override +from uuid import UUID +from ai.backend.common.data.permission.types import RBACElementType +from ai.backend.logging import BraceStyleAdapter from ai.backend.manager.data.deployment.types import ( DeploymentInfo, + DeploymentLifecycleSubStep, RouteInfo, + RouteStatus, ) -from ai.backend.manager.models.deployment_policy import RollingUpdateSpec +from ai.backend.manager.data.permission.types import RBACElementRef +from ai.backend.manager.models.deployment_policy import DeploymentStrategySpec, RollingUpdateSpec +from ai.backend.manager.models.routing import RoutingRow +from ai.backend.manager.repositories.base.rbac.entity_creator import RBACEntityCreator +from ai.backend.manager.repositories.deployment.creators import RouteCreatorSpec +from ai.backend.manager.sokovan.deployment.exceptions import InvalidEndpointState -from .types import AbstractDeploymentStrategy, StrategyCycleResult +from .types import AbstractDeploymentStrategy, RouteChanges, StrategyCycleResult + +log = BraceStyleAdapter(logging.getLogger(__name__)) + + +@dataclass +class _ClassifiedRoutes: + """Routes classified by revision and status. + + Only ``old_active`` retains full RouteInfo (needed for drain ordering). + Other buckets store counts only. + """ + + old_active: list[RouteInfo] = field(default_factory=list) + new_provisioning_count: int = 0 + new_healthy_count: int = 0 + new_unhealthy_count: int = 0 + new_failed_count: int = 0 + + @property + def total_new_running(self) -> int: + """Count of new-revision routes whose processes are still running. + + Includes UNHEALTHY and DEGRADED to prevent duplicate route creation + in surge calculation. (They are excluded from ``available_count`` + in termination calculation, since they cannot serve traffic.) + """ + return self.new_provisioning_count + self.new_healthy_count + self.new_unhealthy_count class RollingUpdateStrategy(AbstractDeploymentStrategy): """Rolling update deployment strategy FSM.""" - def __init__(self, spec: RollingUpdateSpec) -> None: - super().__init__(spec) - self._spec = spec - @override def evaluate_cycle( self, deployment: DeploymentInfo, routes: Sequence[RouteInfo], + spec: DeploymentStrategySpec, + ) -> StrategyCycleResult: + """Evaluate one cycle of rolling update for a single deployment. + + FSM flow: + 1. Classify routes into old / new by revision_id. + 2. If any new route is PROVISIONING → PROVISIONING (wait). + 3. If no old routes remain and new_healthy >= desired → COMPLETED. + 4. Compute allowed surge/unavailable, decide create/terminate + → PROVISIONING (with route mutations). + + Rollback is not decided by the FSM — the coordinator's timeout + sweep handles it by transitioning to ROLLING_BACK when the + deploying timeout is exceeded. + """ + if not isinstance(spec, RollingUpdateSpec): + raise TypeError(f"Expected RollingUpdateSpec, got {type(spec).__name__}") + desired = deployment.replica_spec.target_replica_count + deploying_revision_id = deployment.deploying_revision_id + if deploying_revision_id is None: + raise InvalidEndpointState( + f"Deployment {deployment.id} has DEPLOYING lifecycle but deploying_revision_id is None. " + "This indicates an inconsistent state — the deployment will be skipped." + ) + classified = self._classify_routes(routes, deploying_revision_id) + log.info( + "deployment {}: sub_step={}, routes total={}, " + "old_active={}, new_prov={}, new_healthy={}, new_unhealthy={}, new_failed={}", + deployment.id, + deployment.sub_step, + len(routes), + len(classified.old_active), + classified.new_provisioning_count, + classified.new_healthy_count, + classified.new_unhealthy_count, + classified.new_failed_count, + ) + + if result := self._check_completed(deployment, classified, desired): + return result + return self._compute_route_mutations(deployment, classified, desired, spec) + + def _classify_routes( + self, + routes: Sequence[RouteInfo], + deploying_revision_id: UUID, + ) -> _ClassifiedRoutes: + """Classify routes into old/new by revision and status.""" + classified = _ClassifiedRoutes() + for route in routes: + if route.revision_id != deploying_revision_id: + if route.status.is_active(): + classified.old_active.append(route) + continue + + if route.status.is_provisioning(): + classified.new_provisioning_count += 1 + elif route.status.is_inactive(): + classified.new_failed_count += 1 + elif route.status == RouteStatus.HEALTHY: + classified.new_healthy_count += 1 + elif route.status == RouteStatus.UNHEALTHY: + classified.new_unhealthy_count += 1 + return classified + + def _check_completed( + self, + deployment: DeploymentInfo, + classified: _ClassifiedRoutes, + desired: int, + ) -> StrategyCycleResult | None: + """Return COMPLETED result if all old routes are replaced and enough new are healthy.""" + if ( + classified.old_active + or classified.new_provisioning_count + or classified.new_healthy_count < desired + ): + return None + log.info( + "deployment {}: rolling update complete ({} healthy routes)", + deployment.id, + classified.new_healthy_count, + ) + return StrategyCycleResult(sub_step=DeploymentLifecycleSubStep.DEPLOYING_COMPLETED) + + def _compute_route_mutations( + self, + deployment: DeploymentInfo, + classified: _ClassifiedRoutes, + desired: int, + spec: RollingUpdateSpec, ) -> StrategyCycleResult: - """Evaluate one cycle of rolling update for a single deployment.""" - raise NotImplementedError("Rolling update strategy is not yet implemented") + """Compute surge/unavailable budget and return PROVISIONING with route mutations. + + If new routes are still being provisioned, waits without creating or + terminating additional routes. + """ + if classified.new_provisioning_count: + log.debug( + "deployment {}: {} new routes still provisioning", + deployment.id, + classified.new_provisioning_count, + ) + return StrategyCycleResult(sub_step=DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING) + + max_surge = spec.max_surge # extra routes allowed above desired + max_unavailable = spec.max_unavailable # routes allowed to be down + + max_total = desired + max_surge # upper bound on simultaneous routes + current_total = ( + len(classified.old_active) + classified.total_new_running + ) # routes running now + min_available = max(0, desired - max_unavailable) # floor for traffic-serving routes + + route_changes = RouteChanges() + + to_create = self._compute_routes_to_create(desired, max_total, current_total, classified) + if to_create > 0: + route_changes.rollout_specs = _build_route_creators(deployment, to_create) + + to_terminate = self._compute_routes_to_terminate(classified, min_available) + if to_terminate > 0: + sorted_old = sorted( + classified.old_active, key=lambda route: route.status.termination_priority() + ) + for route in sorted_old[:to_terminate]: + route_changes.drain_route_ids.append(route.route_id) + + log.debug( + "deployment {}: PROVISIONING create={}, terminate={}, " + "old_active={}, new_healthy={}, new_unhealthy={}, new_prov={}", + deployment.id, + to_create, + to_terminate, + len(classified.old_active), + classified.new_healthy_count, + classified.new_unhealthy_count, + classified.new_provisioning_count, + ) + + return StrategyCycleResult( + sub_step=DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING, + route_changes=route_changes, + ) + + def _compute_routes_to_create( + self, + desired: int, + max_total: int, + current_total: int, + classified: _ClassifiedRoutes, + ) -> int: + """Decide how many new routes to create within surge budget. + + Takes the smaller of two constraints: + - ``can_create``: surge headroom (max_total - current_total) + - ``still_needed``: new routes remaining to reach desired + + Example (desired=4, max_surge=1 → max_total=5): + old=4, new_running=0 → can_create=1, still_needed=4 → 1 (surge-limited) + old=0, new_running=3 → can_create=2, still_needed=1 → 1 (goal-limited) + """ + can_create = max_total - current_total + still_needed = desired - classified.total_new_running + return max(0, min(can_create, still_needed)) + + def _compute_routes_to_terminate( + self, + classified: _ClassifiedRoutes, + min_available: int, + ) -> int: + """Decide how many old routes to terminate within unavailability budget. + + Only counts truly healthy routes as available (not UNHEALTHY/DEGRADED). + Takes the smaller of two constraints: + - ``can_terminate``: availability headroom (available_count - min_available) + - ``old_active``: cannot terminate more old routes than exist + + Example (desired=4, max_unavailable=1 → min_available=3): + new_healthy=2, old=3 → can_terminate=2, old=3 → 2 (budget-limited) + new_healthy=4, old=1 → can_terminate=2, old=1 → 1 (old-count-limited) + """ + available_count = classified.new_healthy_count + len(classified.old_active) + can_terminate = available_count - min_available + return max(0, min(can_terminate, len(classified.old_active))) # clamp to actual old count + + +def _build_route_creators( + deployment: DeploymentInfo, + count: int, +) -> list[RBACEntityCreator[RoutingRow]]: + """Build route creator specs for new revision routes.""" + creators: list[RBACEntityCreator[RoutingRow]] = [] + for _ in range(count): + spec = RouteCreatorSpec( + endpoint_id=deployment.id, + session_owner_id=deployment.metadata.session_owner, + domain=deployment.metadata.domain, + project_id=deployment.metadata.project, + revision_id=deployment.deploying_revision_id, + ) + creators.append( + RBACEntityCreator( + spec=spec, + element_type=RBACElementType.ROUTING, + scope_ref=RBACElementRef( + element_type=RBACElementType.MODEL_DEPLOYMENT, + element_id=str(deployment.id), + ), + ) + ) + return creators diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/types.py b/src/ai/backend/manager/sokovan/deployment/strategy/types.py index 71157c09e75..c4357cde8fd 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/types.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/types.py @@ -15,6 +15,7 @@ DeploymentLifecycleSubStep, RouteInfo, ) +from ai.backend.manager.models.deployment_policy import DeploymentStrategySpec from ai.backend.manager.models.routing import RoutingRow from ai.backend.manager.repositories.base.rbac.entity_creator import RBACEntityCreator @@ -69,17 +70,15 @@ class AbstractDeploymentStrategy(ABC): """Base interface for deployment strategy cycle evaluation. Each concrete strategy (Blue-Green, Rolling Update) implements this interface. - The spec is injected via ``__init__`` — one instance per deployment. + The spec is passed per-cycle via ``evaluate_cycle``. """ - def __init__(self, spec: BaseModel) -> None: - self._spec = spec - @abstractmethod def evaluate_cycle( self, deployment: DeploymentInfo, routes: Sequence[RouteInfo], + spec: DeploymentStrategySpec, ) -> StrategyCycleResult: ... diff --git a/tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py b/tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py new file mode 100644 index 00000000000..536a6cebbf8 --- /dev/null +++ b/tests/unit/manager/sokovan/deployment/strategy/test_rolling_update.py @@ -0,0 +1,817 @@ +"""Tests for the rolling update FSM evaluation (BEP-1049). + +Tests cover: +- FSM state transitions: PROVISIONING and COMPLETED +- max_surge / max_unavailable budget calculations +- Multi-cycle progression and termination priority +- Edge cases and boundary conditions + +Note: Rollback is not decided by the FSM — the coordinator's timeout +sweep handles it. The FSM only returns PROVISIONING (with or without +route mutations) or COMPLETED. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import UTC, datetime +from uuid import UUID, uuid4 + +import pytest + +from ai.backend.common.data.endpoint.types import EndpointLifecycle +from ai.backend.common.types import SessionId +from ai.backend.manager.data.deployment.types import ( + DeploymentInfo, + DeploymentLifecycleSubStep, + DeploymentMetadata, + DeploymentNetworkSpec, + DeploymentState, + ReplicaSpec, + RouteInfo, + RouteStatus, + RouteTrafficStatus, +) +from ai.backend.manager.models.deployment_policy import RollingUpdateSpec +from ai.backend.manager.repositories.deployment.creators import RouteCreatorSpec +from ai.backend.manager.sokovan.deployment.strategy.rolling_update import ( + RollingUpdateStrategy, +) + +ENDPOINT_ID = UUID("aaaaaaaa-0000-0000-0000-aaaaaaaaaaaa") +OLD_REV = UUID("11111111-1111-1111-1111-111111111111") +NEW_REV = UUID("22222222-2222-2222-2222-222222222222") +PROJECT_ID = UUID("cccccccc-cccc-cccc-cccc-cccccccccccc") +USER_ID = UUID("dddddddd-dddd-dddd-dddd-dddddddddddd") + + +# --------------------------------------------------------------------------- +# Test scenario types +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class RollingUpdateInput: + """Input conditions for a rolling update cycle test.""" + + desired_replicas: int + max_surge: int + max_unavailable: int + old_count: int + + +@dataclass(frozen=True) +class RollingUpdateExpected: + """Expected outcomes of a rolling update cycle.""" + + create: int + terminate: int + + +@dataclass(frozen=True) +class RollingUpdateScenario: + """A single test case for the rolling update strategy with only old routes.""" + + description: str + input: RollingUpdateInput + expected: RollingUpdateExpected + + +def make_deployment( + *, + desired: int = 1, + deploying_revision_id: UUID = NEW_REV, + current_revision_id: UUID = OLD_REV, + endpoint_id: UUID = ENDPOINT_ID, +) -> DeploymentInfo: + return DeploymentInfo( + id=endpoint_id, + metadata=DeploymentMetadata( + name="test-deploy", + domain="default", + project=PROJECT_ID, + resource_group="default", + created_user=USER_ID, + session_owner=USER_ID, + created_at=datetime.now(UTC), + revision_history_limit=5, + ), + state=DeploymentState( + lifecycle=EndpointLifecycle.DEPLOYING, + retry_count=0, + ), + replica_spec=ReplicaSpec( + replica_count=desired, + ), + network=DeploymentNetworkSpec(open_to_public=False), + model_revisions=[], + current_revision_id=current_revision_id, + deploying_revision_id=deploying_revision_id, + ) + + +def make_route( + *, + revision_id: UUID, + status: RouteStatus = RouteStatus.HEALTHY, + endpoint_id: UUID = ENDPOINT_ID, + route_id: UUID | None = None, +) -> RouteInfo: + return RouteInfo( + route_id=route_id or uuid4(), + endpoint_id=endpoint_id, + session_id=SessionId(uuid4()), + status=status, + traffic_ratio=1.0 if status.is_active() else 0.0, + created_at=datetime.now(UTC), + revision_id=revision_id, + traffic_status=RouteTrafficStatus.ACTIVE + if status.is_active() + else RouteTrafficStatus.INACTIVE, + ) + + +# =========================================================================== +# 1. Basic FSM states +# =========================================================================== + + +class TestBasicFSMStates: + """Test fundamental FSM transitions.""" + + def test_no_routes_creates_new(self) -> None: + """First cycle with 0 routes → PROVISIONING with route creation.""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + + result = RollingUpdateStrategy().evaluate_cycle(deployment, [], spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result.route_changes.rollout_specs) == 1 + assert len(result.route_changes.drain_route_ids) == 0 + + def test_new_provisioning_waits(self) -> None: + """New routes in PROVISIONING → wait (PROVISIONING sub-step).""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.PROVISIONING), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result.route_changes.rollout_specs) == 0 + assert len(result.route_changes.drain_route_ids) == 0 + + def test_completed_when_all_new_healthy_and_no_old(self) -> None: + """All old gone + new_healthy >= desired → completed.""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED + + @pytest.mark.parametrize( + "failed_status", + [ + pytest.param(RouteStatus.FAILED_TO_START, id="failed_to_start"), + pytest.param(RouteStatus.TERMINATED, id="terminated"), + ], + ) + def test_all_new_failed_retries_creation(self, failed_status: RouteStatus) -> None: + """All new routes failed → FSM retries by creating new routes. + + Rollback is handled by the coordinator's timeout sweep, not the FSM. + """ + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=failed_status), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + +# =========================================================================== +# 2. Surge and unavailability budget +# =========================================================================== + + +ROLLING_UPDATE_SCENARIOS = [ + # -- max_surge controls creation -- + RollingUpdateScenario( + description=( + "max_surge=1 allows exactly 1 extra route beyond desired." + " With 1 old route and desired=1, max_total=2 so 1 new route is created." + " No termination because max_unavailable=0 and no new healthy routes exist yet." + ), + input=RollingUpdateInput(desired_replicas=1, max_surge=1, max_unavailable=0, old_count=1), + expected=RollingUpdateExpected(create=1, terminate=0), + ), + RollingUpdateScenario( + description=( + "max_surge=2 with desired=3 and 3 old routes: max_total=5, current=3," + " so can_create=2. still_needed=3 (no new yet), min(2,3)=2 routes created." + ), + input=RollingUpdateInput(desired_replicas=3, max_surge=2, max_unavailable=0, old_count=3), + expected=RollingUpdateExpected(create=2, terminate=0), + ), + RollingUpdateScenario( + description=( + "max_surge=3 exceeds desired=2: max_total=5, can_create=3," + " but still_needed=2 (only 2 new routes needed), so creation is capped at 2." + ), + input=RollingUpdateInput(desired_replicas=2, max_surge=3, max_unavailable=0, old_count=2), + expected=RollingUpdateExpected(create=2, terminate=0), + ), + # -- max_unavailable controls termination -- + RollingUpdateScenario( + description=( + "max_unavailable=0 means zero downtime: min_available equals desired=2." + " With 2 old routes and 0 new healthy, available=2=min_available," + " so no old route can be terminated yet." + ), + input=RollingUpdateInput(desired_replicas=2, max_surge=1, max_unavailable=0, old_count=2), + expected=RollingUpdateExpected(create=1, terminate=0), + ), + RollingUpdateScenario( + description=( + "max_unavailable=1 allows 1 route to be unavailable: min_available=3-1=2." + " With 3 old routes and 0 new healthy, available=3, can_terminate=3-2=1." + ), + input=RollingUpdateInput(desired_replicas=3, max_surge=1, max_unavailable=1, old_count=3), + expected=RollingUpdateExpected(create=1, terminate=1), + ), + # -- combined surge + unavailable -- + RollingUpdateScenario( + description=( + "Both parameters act simultaneously: max_surge=2 allows 2 new creations" + " (max_total=6, current=4, can_create=2) while max_unavailable=1" + " allows terminating 1 old (min_available=3, available=4, can_terminate=1)." + ), + input=RollingUpdateInput(desired_replicas=4, max_surge=2, max_unavailable=1, old_count=4), + expected=RollingUpdateExpected(create=2, terminate=1), + ), + RollingUpdateScenario( + description=( + "Aggressive rollout: max_surge=3 and max_unavailable=2 with desired=3." + " All 3 new routes created at once (max_total=6, can_create=3=still_needed)" + " and 2 old terminated immediately (min_available=1, available=3, can_terminate=2)." + ), + input=RollingUpdateInput(desired_replicas=3, max_surge=3, max_unavailable=2, old_count=3), + expected=RollingUpdateExpected(create=3, terminate=2), + ), + # -- boundary: unavailable > desired -- + RollingUpdateScenario( + description=( + "max_unavailable=5 exceeds desired=1: min_available=max(0, 1-5)=0." + " The operator has opted into full unavailability," + " so the single old route can be terminated immediately." + ), + input=RollingUpdateInput(desired_replicas=1, max_surge=0, max_unavailable=5, old_count=1), + expected=RollingUpdateExpected(create=0, terminate=1), + ), +] + + +class TestSurgeAndUnavailabilityBudget: + """Test max_surge and max_unavailable parameter controls.""" + + @pytest.mark.parametrize( + "scenario", + ROLLING_UPDATE_SCENARIOS, + ids=[s.description for s in ROLLING_UPDATE_SCENARIOS], + ) + def test_budget_with_old_routes_only(self, scenario: RollingUpdateScenario) -> None: + """Verify creation/termination counts based on surge and unavailability budgets.""" + deployment = make_deployment(desired=scenario.input.desired_replicas) + spec = RollingUpdateSpec( + max_surge=scenario.input.max_surge, + max_unavailable=scenario.input.max_unavailable, + ) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) + for _ in range(scenario.input.old_count) + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert len(result.route_changes.rollout_specs) == scenario.expected.create + assert len(result.route_changes.drain_route_ids) == scenario.expected.terminate + + def test_surge_already_at_max_no_create(self) -> None: + """Already at max_total → no new creates.""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert len(result.route_changes.rollout_specs) == 0 + + def test_new_healthy_allows_more_termination(self) -> None: + """With new healthy routes, more old can be terminated.""" + deployment = make_deployment(desired=3) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert len(result.route_changes.drain_route_ids) == 1 + + def test_surge_and_unavailable_both_zero_rejected(self) -> None: + """surge=0, unavailable=0: rejected by Pydantic validation.""" + with pytest.raises(ValueError, match="max_surge or max_unavailable must be positive"): + RollingUpdateSpec(max_surge=0, max_unavailable=0) + + +# =========================================================================== +# 3. Multi-cycle progression +# =========================================================================== + + +class TestMultiCycleProgression: + """Simulate multiple evaluation cycles.""" + + def test_new_healthy_enables_further_creation(self) -> None: + """After new routes become healthy, more new routes can be created.""" + deployment = make_deployment(desired=3) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert len(result.route_changes.rollout_specs) == 1 + assert len(result.route_changes.drain_route_ids) == 0 + + def test_multiple_new_healthy_enables_old_termination(self) -> None: + """2 new healthy, 2 old: can terminate 1 old.""" + deployment = make_deployment(desired=3) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert len(result.route_changes.rollout_specs) == 0 + assert len(result.route_changes.drain_route_ids) == 1 + + def test_not_completed_when_old_still_exists(self) -> None: + """Even with enough new healthy, old still exists → not completed.""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result.route_changes.drain_route_ids) == 1 + + +# =========================================================================== +# 4. Route status classification +# =========================================================================== + + +class TestRouteStatusClassification: + """Test how different route statuses affect classification.""" + + def test_degraded_new_waits_provisioning(self) -> None: + """DEGRADED new routes are treated as PROVISIONING (still warming up).""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.DEGRADED), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + def test_unhealthy_new_retries(self) -> None: + """All new UNHEALTHY → PROVISIONING (retries, timeout handles rollback).""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.UNHEALTHY), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + @pytest.mark.parametrize( + "inactive_status", + [ + pytest.param(RouteStatus.TERMINATING, id="terminating"), + pytest.param(RouteStatus.TERMINATED, id="terminated"), + ], + ) + def test_old_inactive_not_counted_as_active(self, inactive_status: RouteStatus) -> None: + """Old routes in terminal states are not counted as old_active.""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=inactive_status), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED + + def test_partial_new_failure_continues_progress(self) -> None: + """Some new failed, some healthy → no rollback (live routes exist).""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=2, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.FAILED_TO_START), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + def test_old_provisioning_counted_as_active(self) -> None: + """Old routes in PROVISIONING are counted as old_active.""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.PROVISIONING), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + +# =========================================================================== +# 5. Termination priority ordering +# =========================================================================== + + +class TestTerminationPriority: + """Test that old routes are terminated in priority order.""" + + def test_full_priority_order(self) -> None: + """Termination order: UNHEALTHY → DEGRADED → PROVISIONING → HEALTHY.""" + unhealthy_id = UUID("00000000-0000-0000-0000-000000000001") + degraded_id = UUID("00000000-0000-0000-0000-000000000002") + provisioning_id = UUID("00000000-0000-0000-0000-000000000003") + healthy_id = UUID("00000000-0000-0000-0000-000000000004") + + deployment = make_deployment(desired=4) + spec = RollingUpdateSpec(max_surge=0, max_unavailable=3) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY, route_id=healthy_id), + make_route( + revision_id=OLD_REV, + status=RouteStatus.PROVISIONING, + route_id=provisioning_id, + ), + make_route(revision_id=OLD_REV, status=RouteStatus.DEGRADED, route_id=degraded_id), + make_route( + revision_id=OLD_REV, + status=RouteStatus.UNHEALTHY, + route_id=unhealthy_id, + ), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + terminated = result.route_changes.drain_route_ids + assert len(terminated) == 3 + assert terminated[0] == unhealthy_id + assert terminated[1] == degraded_id + assert terminated[2] == provisioning_id + + +# =========================================================================== +# 6. Edge cases +# =========================================================================== + + +class TestEdgeCases: + """Edge cases and boundary conditions.""" + + def test_desired_0_no_routes_completed(self) -> None: + """desired=0, no routes → completed (vacuously true).""" + deployment = make_deployment(desired=0) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + + result = RollingUpdateStrategy().evaluate_cycle(deployment, [], spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED + + def test_more_new_healthy_than_desired_still_completes(self) -> None: + """new_healthy > desired and no old → completed.""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=2, max_unavailable=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED + + def test_only_failed_new_no_old_rolls_back(self) -> None: + """Only failed new routes, no old → PROVISIONING (retries creation).""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.FAILED_TO_START), + make_route(revision_id=NEW_REV, status=RouteStatus.FAILED_TO_START), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + def test_all_old_inactive_no_new_creates_desired(self) -> None: + """All old routes are terminated, no new → create desired.""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.TERMINATED), + make_route(revision_id=OLD_REV, status=RouteStatus.TERMINATED), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert len(result.route_changes.rollout_specs) == 2 + + def test_deploying_rev_none_rejected(self) -> None: + """If deploying_revision_id is None, evaluate_cycle raises.""" + deployment = make_deployment(desired=1, deploying_revision_id=None) # type: ignore[arg-type] + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY)] + + with pytest.raises(Exception): # InvalidEndpointState + RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + def test_route_without_revision_classified_as_old(self) -> None: + """Routes with revision_id=None are classified as old.""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [make_route(revision_id=None, status=RouteStatus.HEALTHY)] # type: ignore[arg-type] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert len(result.route_changes.rollout_specs) == 1 + + def test_provisioning_prioritized_over_completion_check(self) -> None: + """PROVISIONING check comes before completion check in FSM.""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.PROVISIONING), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + +# =========================================================================== +# 7. Route creator specs validation +# =========================================================================== + + +class TestRouteCreatorSpecs: + """Validate that route creator specs have correct fields.""" + + def test_creator_specs_use_deploying_revision(self) -> None: + """Created routes should use the deploying revision metadata.""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + + result = RollingUpdateStrategy().evaluate_cycle(deployment, [], spec) + + assert len(result.route_changes.rollout_specs) == 1 + creator_spec = result.route_changes.rollout_specs[0].spec + assert isinstance(creator_spec, RouteCreatorSpec) + assert creator_spec.revision_id == NEW_REV + assert creator_spec.endpoint_id == ENDPOINT_ID + assert creator_spec.session_owner_id == USER_ID + assert creator_spec.domain == "default" + assert creator_spec.project_id == PROJECT_ID + + +# =========================================================================== +# 8. Realistic multi-step scenario (desired=5) +# =========================================================================== + + +class TestRealisticScenario: + """Simulate a realistic rolling update with desired=5, surge=2, unavail=1.""" + + def test_step_by_step_rolling_update(self) -> None: + """Full simulation of a rolling update across multiple cycles.""" + deployment = make_deployment(desired=5) + spec = RollingUpdateSpec(max_surge=2, max_unavailable=1) + + # Cycle 1: 5 old → create 2, terminate 1 + old_routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(5)] + r1 = RollingUpdateStrategy().evaluate_cycle(deployment, old_routes, spec) + assert len(r1.route_changes.rollout_specs) == 2 + assert len(r1.route_changes.drain_route_ids) == 1 + + # Cycle 2: 4 old, 2 new healthy → create 1, terminate 2 + routes_c2 = [ + *[make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(4)], + *[make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY) for _ in range(2)], + ] + r2 = RollingUpdateStrategy().evaluate_cycle(deployment, routes_c2, spec) + assert len(r2.route_changes.rollout_specs) == 1 + assert len(r2.route_changes.drain_route_ids) == 2 + + # Cycle 3: 2 old, 3 new healthy → create 2, terminate 1 + routes_c3 = [ + *[make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(2)], + *[make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY) for _ in range(3)], + ] + r3 = RollingUpdateStrategy().evaluate_cycle(deployment, routes_c3, spec) + assert len(r3.route_changes.rollout_specs) == 2 + assert len(r3.route_changes.drain_route_ids) == 1 + + # Cycle 4: 1 old, 5 new healthy → create 0, terminate 1 + routes_c4 = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + *[make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY) for _ in range(5)], + ] + r4 = RollingUpdateStrategy().evaluate_cycle(deployment, routes_c4, spec) + assert len(r4.route_changes.rollout_specs) == 0 + assert len(r4.route_changes.drain_route_ids) == 1 + assert r4.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + # Cycle 5: 0 old, 5 new healthy → completed + routes_c5 = [make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY) for _ in range(5)] + r5 = RollingUpdateStrategy().evaluate_cycle(deployment, routes_c5, spec) + assert r5.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED + + +# =========================================================================== +# 9. Deadlock prevention +# =========================================================================== + + +class TestDeadlockPrevention: + """Test scenarios where the FSM could potentially stall.""" + + def test_surge_0_terminates_first_then_creates(self) -> None: + """surge=0, unavailable=1 → terminate first, next cycle creates.""" + deployment = make_deployment(desired=3) + spec = RollingUpdateSpec(max_surge=0, max_unavailable=1) + + # Cycle 1: 3 old → terminate 1, create 0 + routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(3)] + r1 = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + assert len(r1.route_changes.rollout_specs) == 0 + assert len(r1.route_changes.drain_route_ids) == 1 + + # Cycle 2: 2 old → create 1, terminate 0 + routes_c2 = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(2)] + r2 = RollingUpdateStrategy().evaluate_cycle(deployment, routes_c2, spec) + assert len(r2.route_changes.rollout_specs) == 1 + assert len(r2.route_changes.drain_route_ids) == 0 + + def test_new_routes_exceed_desired_no_extra_create(self) -> None: + """More new_live than desired → no extra creation (still_needed < 0).""" + deployment = make_deployment(desired=2) + spec = RollingUpdateSpec(max_surge=2, max_unavailable=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert len(result.route_changes.rollout_specs) == 0 + assert len(result.route_changes.drain_route_ids) == 1 + + def test_provisioning_blocks_all_further_actions(self) -> None: + """Any new route in PROVISIONING → wait, even if old can be terminated.""" + deployment = make_deployment(desired=3) + spec = RollingUpdateSpec(max_surge=2, max_unavailable=1) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.PROVISIONING), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result.route_changes.rollout_specs) == 0 + assert len(result.route_changes.drain_route_ids) == 0 + + +# =========================================================================== +# 10. desired_replica_count resolution +# =========================================================================== + + +class TestDesiredReplicaCount: + """Test that the correct desired count is used.""" + + def test_desired_replica_count_overrides_replica_count(self) -> None: + """When desired_replica_count is set, it takes precedence.""" + deployment = make_deployment(desired=3) + deployment.replica_spec = ReplicaSpec( + replica_count=1, + desired_replica_count=3, + ) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY)] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert len(result.route_changes.rollout_specs) == 3 + + def test_replica_count_used_when_no_desired(self) -> None: + """When desired_replica_count is None, uses replica_count.""" + deployment = make_deployment(desired=2) + deployment.replica_spec = ReplicaSpec( + replica_count=2, + desired_replica_count=None, + ) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + make_route(revision_id=NEW_REV, status=RouteStatus.HEALTHY), + ] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_COMPLETED + + +# =========================================================================== +# 11. Scale changes during rolling update +# =========================================================================== + + +class TestScaleChangeDuringRollingUpdate: + """Test behavior when desired changes during rolling update.""" + + def test_desired_reduced_terminates_excess_old(self) -> None: + """If desired is lowered, more old can be terminated.""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec(max_surge=1, max_unavailable=0) + routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(3)] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert len(result.route_changes.rollout_specs) == 0 + assert len(result.route_changes.drain_route_ids) == 2 + + def test_desired_increased_creates_more(self) -> None: + """If desired is raised, more new routes are created.""" + deployment = make_deployment(desired=5) + spec = RollingUpdateSpec(max_surge=2, max_unavailable=0) + routes = [make_route(revision_id=OLD_REV, status=RouteStatus.HEALTHY) for _ in range(2)] + + result = RollingUpdateStrategy().evaluate_cycle(deployment, routes, spec) + + assert len(result.route_changes.rollout_specs) == 5