diff --git a/changes/10276.feature.md b/changes/10276.feature.md new file mode 100644 index 00000000000..9fc16b2c1b1 --- /dev/null +++ b/changes/10276.feature.md @@ -0,0 +1 @@ +Consolidate deploying handlers and remove unused sub-steps diff --git a/src/ai/backend/manager/data/deployment/types.py b/src/ai/backend/manager/data/deployment/types.py index 8087ae091f9..f4e3c60dd72 100644 --- a/src/ai/backend/manager/data/deployment/types.py +++ b/src/ai/backend/manager/data/deployment/types.py @@ -153,28 +153,22 @@ class DeploymentSubStatus(enum.StrEnum): Each lifecycle type can define its own sub-status enum by inheriting from this class. For example, DEPLOYING handlers - use ``DeploymentSubStep`` (provisioning, progressing, …). + use ``DeploymentSubStep`` (provisioning, rolling_back, …). """ class DeploymentSubStep(DeploymentSubStatus): """Sub-steps for the DEPLOYING lifecycle phase. - Active states: - - PROVISIONING: New revision routes are being provisioned; waiting for readiness. - - PROGRESSING: Actively replacing old routes with new routes. - - ROLLING_BACK: Actively rolling back failed new routes to previous revision. - - Terminal markers (no handler execution, trigger transition only): - - COMPLETED: All strategy conditions satisfied; ready for revision swap. - - ROLLED_BACK: Rollback finished; ready for cleanup and transition to READY. + - PROVISIONING: New revision routes are being provisioned and old routes + are being drained. The main handler for rolling updates. + - ROLLING_BACK: Clearing deploying_revision and transitioning to READY. + - COMPLETED: All strategy conditions satisfied; triggers revision swap. """ PROVISIONING = "provisioning" - PROGRESSING = "progressing" ROLLING_BACK = "rolling_back" COMPLETED = "completed" - ROLLED_BACK = "rolled_back" @dataclass(frozen=True) @@ -201,7 +195,11 @@ class DeploymentStatusTransitions: Attributes: success: Target lifecycle when handler succeeds, None means no change - need_retry: Target lifecycle when handler fails but can retry + need_retry: Target lifecycle when handler fails but can retry, or when + route mutations were executed but the deployment stays in the same + sub-step (e.g. PROVISIONING → PROVISIONING after create/drain). + Items explicitly returned as need_retry by handlers are never + escalated to give_up — they represent normal progress. expired: Target lifecycle when time elapsed in current state give_up: Target lifecycle when retry count exceeded """ diff --git a/src/ai/backend/manager/repositories/base/types.py b/src/ai/backend/manager/repositories/base/types.py index 217d80a25eb..89cbb39f354 100644 --- a/src/ai/backend/manager/repositories/base/types.py +++ b/src/ai/backend/manager/repositories/base/types.py @@ -19,6 +19,7 @@ # QueryCondition now returns a ColumnElement (whereclause) instead of modifying stmt type QueryCondition = Callable[[], sa.sql.expression.ColumnElement[bool]] + T = TypeVar("T") diff --git a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py index 359799094c4..17d7582410f 100644 --- a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py +++ b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py @@ -2519,38 +2519,22 @@ async def search_deployment_policies( async def apply_strategy_mutations( self, - assignments: Mapping[uuid.UUID, DeploymentSubStep], rollout: Sequence[RBACEntityCreator[RoutingRow]], drain: BatchUpdater[RoutingRow] | None, completed_ids: set[uuid.UUID], - rolled_back_ids: set[uuid.UUID], ) -> int: - """Apply all DB mutations from a strategy evaluation cycle in a single transaction. + """Apply route mutations from a strategy evaluation cycle in a single transaction. + + Sub-step transitions are handled exclusively by the coordinator + via ``EndpointLifecycleBatchUpdaterSpec``. Returns: Number of deployments whose revision was swapped. """ async with self._begin_session_read_committed() as db_sess: - await self._update_sub_steps(db_sess, assignments) await self._create_routes(db_sess, rollout) await self._drain_routes(db_sess, drain) - swapped = await self._complete_deployment_revision_swap(db_sess, completed_ids) - await self._clear_deploying_revision(db_sess, rolled_back_ids) - return swapped - - @staticmethod - async def _update_sub_steps( - db_sess: SASession, - assignments: Mapping[uuid.UUID, DeploymentSubStep], - ) -> None: - """Update deployment sub-step assignments.""" - for endpoint_id, sub_step in assignments.items(): - query = ( - sa.update(EndpointRow) - .where(EndpointRow.id == endpoint_id) - .values(sub_step=sub_step) - ) - await db_sess.execute(query) + return await self._complete_deployment_revision_swap(db_sess, completed_ids) @staticmethod async def _create_routes( @@ -2593,20 +2577,24 @@ async def _complete_deployment_revision_swap( result = await db_sess.execute(query) return cast(CursorResult[Any], result).rowcount - @staticmethod - async def _clear_deploying_revision( - db_sess: SASession, - rolled_back_ids: set[uuid.UUID], + async def clear_deploying_revision( + self, + deployment_ids: set[uuid.UUID], ) -> None: - """Clear deploying_revision for rolled-back deployments.""" - if not rolled_back_ids: + """Clear deploying_revision and sub_step for rolled-back deployments. + + This is called explicitly by ``DeployingRollingBackHandler`` after + rollback completes, NOT automatically by apply_strategy_mutations. + """ + if not deployment_ids: return - query = ( - sa.update(EndpointRow) - .where(EndpointRow.id.in_(rolled_back_ids)) - .values( - deploying_revision=None, - sub_step=None, + async with self._begin_session_read_committed() as db_sess: + query = ( + sa.update(EndpointRow) + .where(EndpointRow.id.in_(deployment_ids)) + .values( + deploying_revision=None, + sub_step=None, + ) ) - ) - await db_sess.execute(query) + await db_sess.execute(query) diff --git a/src/ai/backend/manager/repositories/deployment/repository.py b/src/ai/backend/manager/repositories/deployment/repository.py index 2e6f9a7a4ab..c2aff57e249 100644 --- a/src/ai/backend/manager/repositories/deployment/repository.py +++ b/src/ai/backend/manager/repositories/deployment/repository.py @@ -1406,26 +1406,33 @@ async def search_deployment_policies( """ return await self._db_source.search_deployment_policies(querier) + @deployment_repository_resilience.apply() async def apply_strategy_mutations( self, - assignments: Mapping[UUID, DeploymentSubStep], rollout: Sequence[RBACEntityCreator[RoutingRow]], drain: BatchUpdater[RoutingRow] | None, completed_ids: set[UUID], - rolled_back_ids: set[UUID], ) -> int: - """Apply all DB mutations from a strategy evaluation cycle. + """Apply route mutations from a strategy evaluation cycle. - Performs sub-step updates, route rollout/drain, revision swap, - and deploying_revision cleanup in a single transaction. + Performs route rollout/drain and revision swap in a single transaction. + Sub-step transitions are handled by the coordinator via + ``EndpointLifecycleBatchUpdaterSpec``. Returns: Number of deployments whose revision was swapped. """ return await self._db_source.apply_strategy_mutations( - assignments=assignments, rollout=rollout, drain=drain, completed_ids=completed_ids, - rolled_back_ids=rolled_back_ids, ) + + @deployment_repository_resilience.apply() + async def clear_deploying_revision(self, deployment_ids: set[UUID]) -> None: + """Clear deploying_revision and sub_step for rolled-back deployments. + + Called explicitly by ``DeployingRollingBackHandler`` after rollback + completes — NOT automatically during strategy mutations. + """ + await self._db_source.clear_deploying_revision(deployment_ids) diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index 56f55c8167c..98488ccd05d 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -61,7 +61,6 @@ from .handlers import ( CheckPendingDeploymentHandler, CheckReplicaDeploymentHandler, - DeployingProgressingHandler, DeployingProvisioningHandler, DeployingRollingBackHandler, DeploymentHandler, @@ -325,21 +324,11 @@ def _init_handlers( applier=applier, ), ), - ( - (DeploymentLifecycleType.DEPLOYING, DeploymentSubStep.PROGRESSING), - DeployingProgressingHandler( - deployment_controller=self._deployment_controller, - route_controller=self._route_controller, - evaluator=evaluator, - applier=applier, - ), - ), ( (DeploymentLifecycleType.DEPLOYING, DeploymentSubStep.ROLLING_BACK), DeployingRollingBackHandler( deployment_controller=self._deployment_controller, route_controller=self._route_controller, - evaluator=evaluator, applier=applier, ), ), @@ -437,8 +426,8 @@ async def _handle_status_transitions( transitions = handler.status_transitions() - # Success transitions (None = stay in current state) - if transitions.success is not None and result.successes: + # Success transitions + if result.successes and transitions.success is not None: transition = self._build_success_transition( handler_name=handler_name, deployments=result.successes, @@ -451,6 +440,21 @@ async def _handle_status_transitions( all_history_specs.extend(transition.history_specs) notification_events.extend(transition.notification_events) + # Explicit need_retry from handlers (e.g. route mutations in progress). + # These are never escalated to give_up — they represent normal progress. + if result.need_retry and transitions.need_retry is not None: + transition = self._build_success_transition( + handler_name=handler_name, + deployments=result.need_retry, + lifecycle_status=transitions.need_retry, + target_lifecycles=target_statuses, + records=records, + timestamp_now=timestamp_now, + ) + batch_updaters.append(transition.updater) + all_history_specs.extend(transition.history_specs) + notification_events.extend(transition.notification_events) + # Failure transitions — classify into need_retry/expired/give_up if result.errors: current_dbtime = await self._deployment_repository.get_db_now() diff --git a/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py b/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py index 489070e9143..961e4544307 100644 --- a/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py +++ b/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py @@ -4,7 +4,6 @@ from .base import DeploymentHandler from .deploying import ( - DeployingProgressingHandler, DeployingProvisioningHandler, DeployingRollingBackHandler, ) @@ -17,7 +16,6 @@ __all__ = [ "CheckPendingDeploymentHandler", "CheckReplicaDeploymentHandler", - "DeployingProgressingHandler", "DeployingProvisioningHandler", "DeployingRollingBackHandler", "DeploymentHandler", diff --git a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py index f95886e90e4..8f07354daba 100644 --- a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py +++ b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py @@ -1,29 +1,26 @@ """Handlers for DEPLOYING sub-steps (BEP-1049). -Three DEPLOYING handlers are registered flat in the coordinator's HandlerRegistry -alongside other lifecycle handlers, keyed by ``(lifecycle_type, sub_step)``. -Each handler calls the strategy evaluator and applier directly in ``execute()``. +Two DEPLOYING handlers are registered in the coordinator's HandlerRegistry: + +- **DeployingProvisioningHandler**: Runs the strategy FSM each cycle to + create/drain routes and check for completion. +- **DeployingRollingBackHandler**: Clears ``deploying_revision`` and + transitions directly to READY. Sub-step flow:: - PROVISIONING ──(success)──▸ PROGRESSING - │ │ - │ (expired/give_up) ┌──────┴──────┐ - ▼ ▼ ▼ - ROLLING_BACK COMPLETED ROLLING_BACK - │ │ │ - │ (success) │ (success) │ (success) - ▼ ▼ ▼ - ROLLED_BACK READY ROLLED_BACK - │ │ - │ (success, via Progressing) │ (success, via Progressing) - ▼ ▼ - READY READY + PROVISIONING ──(need_retry)──▸ PROVISIONING (route mutations, logged) + │ + │ (success) + ▼ + READY (completed — all routes replaced) + + PROVISIONING ──(timeout)──▸ ROLLING_BACK ──(success)──▸ READY The evaluator determines sub-step assignments and route mutations; the applier persists them to DB atomically. Each handler classifies -deployments into successes (transition forward), errors (failure path), -and skipped (still in progress — no state transition). +deployments into successes (transition forward), need_retry (route mutations +with history logged), and skipped (no change — waiting). """ from __future__ import annotations @@ -67,12 +64,16 @@ class DeployingProvisioningHandler(DeploymentHandler): - """Handler for DEPLOYING / PROVISIONING sub-step. + """Handler for the DEPLOYING / PROVISIONING sub-step. - New-revision routes are being created; waiting for them to become HEALTHY. - The evaluator assigns sub-steps; when all new routes are healthy the - deployment advances to PROGRESSING (success), otherwise it stays in - PROVISIONING (skipped — no state transition). + Runs the strategy FSM each cycle to create/drain routes and check + for completion. Classification: + + - **Route mutations executed** (create/drain): need_retry — stays in + PROVISIONING with a new history record for progress tracking. + Never escalated to give_up (normal progress). + - **No changes** (routes still warming up): skipped — no history. + - **Completed** (all old routes replaced): success → READY. """ def __init__( @@ -104,7 +105,7 @@ def target_statuses(cls) -> list[DeploymentLifecycleStatus]: DeploymentLifecycleStatus( lifecycle=EndpointLifecycle.DEPLOYING, sub_status=DeploymentSubStep.PROVISIONING, - ) + ), ] @classmethod @@ -112,155 +113,30 @@ def target_statuses(cls) -> list[DeploymentLifecycleStatus]: def status_transitions(cls) -> DeploymentStatusTransitions: return DeploymentStatusTransitions( success=DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.PROGRESSING, - ), - need_retry=None, - expired=DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.ROLLING_BACK, + lifecycle=EndpointLifecycle.READY, + sub_status=None, ), - give_up=DeploymentLifecycleStatus( + need_retry=DeploymentLifecycleStatus( lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.ROLLING_BACK, - ), - ) - - @override - async def execute( - self, deployments: Sequence[DeploymentWithHistory] - ) -> DeploymentExecutionResult: - deployment_infos = [deployment.deployment_info for deployment in deployments] - deployment_map = {deployment.deployment_info.id: deployment for deployment in deployments} - - summary = await self._evaluator.evaluate(deployment_infos) - await self._applier.apply(summary) - - successes: list[DeploymentWithHistory] = [] - skipped: list[DeploymentWithHistory] = [] - - # Classify by assigned sub_step - for deployment in deployments: - endpoint_id = deployment.deployment_info.id - assigned = summary.assignments.get(endpoint_id) - if assigned is None: - # Evaluation error — handled below - continue - if assigned == DeploymentSubStep.PROGRESSING: - # Advanced to PROGRESSING → success (coordinator transitions) - successes.append(deployment) - else: - # Still PROVISIONING → skip (no state transition) - skipped.append(deployment) - - # Evaluation errors → execution errors - errors = [ - DeploymentExecutionError( - deployment_info=deployment_map[evaluation_error.deployment.id], - reason=evaluation_error.reason, - error_detail=evaluation_error.reason, - ) - for evaluation_error in summary.errors - if evaluation_error.deployment.id in deployment_map - ] - - return DeploymentExecutionResult(successes=successes, errors=errors, skipped=skipped) - - @override - async def post_process(self, result: DeploymentExecutionResult) -> None: - await self._deployment_controller.mark_lifecycle_needed( - DeploymentLifecycleType.DEPLOYING, sub_step=DeploymentSubStep.PROVISIONING - ) - await self._route_controller.mark_lifecycle_needed(RouteLifecycleType.PROVISIONING) - - -class DeployingProgressingHandler(DeploymentHandler): - """Handler for DEPLOYING / PROGRESSING (+ COMPLETED, ROLLED_BACK). - - This single handler processes three sub-steps: - - - **PROGRESSING**: Still replacing routes — re-evaluate next cycle. - - **COMPLETED**: Applier has swapped revision → returned as success - → coordinator transitions to READY. - - **ROLLED_BACK**: Applier has cleared deploying_revision → returned - as error → coordinator transitions to READY. - """ - - def __init__( - self, - deployment_controller: DeploymentController, - route_controller: RouteController, - evaluator: DeploymentStrategyEvaluator, - applier: StrategyResultApplier, - ) -> None: - self._deployment_controller = deployment_controller - self._route_controller = route_controller - self._evaluator = evaluator - self._applier = applier - - @classmethod - @override - def name(cls) -> str: - return "deploying-progressing" - - @property - @override - def lock_id(self) -> LockID | None: - return LockID.LOCKID_DEPLOYMENT_DEPLOYING - - @classmethod - @override - def target_statuses(cls) -> list[DeploymentLifecycleStatus]: - return [ - DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.PROGRESSING, - ), - DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.COMPLETED, - ), - DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.ROLLED_BACK, + sub_status=DeploymentSubStep.PROVISIONING, ), - ] - - @classmethod - @override - def status_transitions(cls) -> DeploymentStatusTransitions: - ready = DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.READY, - sub_status=None, - ) - rolling_back = DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.ROLLING_BACK, - ) - return DeploymentStatusTransitions( - success=ready, - need_retry=None, - expired=rolling_back, - give_up=rolling_back, ) @override async def execute( self, deployments: Sequence[DeploymentWithHistory] ) -> DeploymentExecutionResult: - deployment_infos = [deployment.deployment_info for deployment in deployments] - deployment_map = {deployment.deployment_info.id: deployment for deployment in deployments} + deployment_infos = [d.deployment_info for d in deployments] + deployment_map = {d.deployment_info.id: d for d in deployments} summary = await self._evaluator.evaluate(deployment_infos) apply_result = await self._applier.apply(summary) - # Filter out deployments that have been marked for destruction during DEPLOYING. - # Without this guard, a COMPLETED sub_step would swap revisions and - # transition the deployment back to READY, resurrecting it. + # Filter out deployments marked for destruction during DEPLOYING. destroying_ids = { - deployment.deployment_info.id - for deployment in deployments - if deployment.deployment_info.state.lifecycle + d.deployment_info.id + for d in deployments + if d.deployment_info.state.lifecycle in (EndpointLifecycle.DESTROYING, EndpointLifecycle.DESTROYED) } if destroying_ids: @@ -272,11 +148,9 @@ async def execute( successes: list[DeploymentWithHistory] = [] errors: list[DeploymentExecutionError] = [] skipped: list[DeploymentWithHistory] = [] + need_retry: list[DeploymentWithHistory] = [] - terminal_ids = apply_result.completed_ids | apply_result.rolled_back_ids - evaluation_error_ids = {e.deployment.id for e in summary.errors} - - # COMPLETED → successes (coordinator transitions to READY) + # COMPLETED → success (coordinator transitions to READY) for endpoint_id in apply_result.completed_ids: if endpoint_id in destroying_ids: continue @@ -284,20 +158,6 @@ async def execute( if deployment is not None: successes.append(deployment) - # ROLLED_BACK → errors (coordinator transitions to READY) - for endpoint_id in apply_result.rolled_back_ids: - if endpoint_id in destroying_ids: - continue - deployment = deployment_map.get(endpoint_id) - if deployment is not None: - errors.append( - DeploymentExecutionError( - deployment_info=deployment, - reason="Deployment rolled back — all new routes failed", - error_detail="Strategy FSM determined rollback", - ) - ) - # Evaluation errors → execution errors for error_data in summary.errors: deployment = deployment_map.get(error_data.deployment.id) @@ -310,22 +170,29 @@ async def execute( ) ) - # Still PROGRESSING → skipped (no state transition) + # Classify rest: route mutations happened → need_retry (never give_up), + # no changes → skipped (no history). + completed_or_error_ids = apply_result.completed_ids | { + e.deployment.id for e in summary.errors + } + has_route_mutations = bool(apply_result.routes_created or apply_result.routes_drained) for deployment in deployments: endpoint_id = deployment.deployment_info.id - if ( - endpoint_id not in terminal_ids - and endpoint_id not in destroying_ids - and endpoint_id not in evaluation_error_ids - ): + if endpoint_id in completed_or_error_ids or endpoint_id in destroying_ids: + continue + if has_route_mutations: + need_retry.append(deployment) + else: skipped.append(deployment) - return DeploymentExecutionResult(successes=successes, errors=errors, skipped=skipped) + return DeploymentExecutionResult( + successes=successes, errors=errors, skipped=skipped, need_retry=need_retry + ) @override async def post_process(self, result: DeploymentExecutionResult) -> None: await self._deployment_controller.mark_lifecycle_needed( - DeploymentLifecycleType.DEPLOYING, sub_step=DeploymentSubStep.PROGRESSING + DeploymentLifecycleType.DEPLOYING, sub_step=DeploymentSubStep.PROVISIONING ) await self._route_controller.mark_lifecycle_needed(RouteLifecycleType.PROVISIONING) @@ -333,23 +200,18 @@ async def post_process(self, result: DeploymentExecutionResult) -> None: class DeployingRollingBackHandler(DeploymentHandler): """Handler for DEPLOYING / ROLLING_BACK sub-step. - Actively rolling back failed new-revision routes to the previous revision. - The evaluator re-evaluates the deployment (which is now in ROLLING_BACK) - and the applier drains new-revision routes and restores old-revision routes. - Once rollback is complete, the evaluator assigns ROLLED_BACK, which the - ProgressingHandler will pick up and transition to READY. + Clears ``deploying_revision`` and transitions directly to READY. + This is a cleanup-only operation — no FSM evaluation needed. """ def __init__( self, deployment_controller: DeploymentController, route_controller: RouteController, - evaluator: DeploymentStrategyEvaluator, applier: StrategyResultApplier, ) -> None: self._deployment_controller = deployment_controller self._route_controller = route_controller - self._evaluator = evaluator self._applier = applier @classmethod @@ -377,17 +239,8 @@ def target_statuses(cls) -> list[DeploymentLifecycleStatus]: def status_transitions(cls) -> DeploymentStatusTransitions: return DeploymentStatusTransitions( success=DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.ROLLED_BACK, - ), - need_retry=None, - expired=DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.ROLLED_BACK, - ), - give_up=DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.ROLLED_BACK, + lifecycle=EndpointLifecycle.READY, + sub_status=None, ), ) @@ -395,32 +248,13 @@ def status_transitions(cls) -> DeploymentStatusTransitions: async def execute( self, deployments: Sequence[DeploymentWithHistory] ) -> DeploymentExecutionResult: - deployment_infos = [deployment.deployment_info for deployment in deployments] - deployment_map = {deployment.deployment_info.id: deployment for deployment in deployments} - - summary = await self._evaluator.evaluate(deployment_infos) - await self._applier.apply(summary) - - # Successfully evaluated deployments → successes (coordinator transitions to ROLLED_BACK) - evaluated_ids = set(summary.assignments.keys()) - successes = [ - deployment - for deployment in deployments - if deployment.deployment_info.id in evaluated_ids - ] - - # Evaluation errors → execution errors - errors = [ - DeploymentExecutionError( - deployment_info=deployment_map[evaluation_error.deployment.id], - reason=evaluation_error.reason, - error_detail=evaluation_error.reason, - ) - for evaluation_error in summary.errors - if evaluation_error.deployment.id in deployment_map - ] - - return DeploymentExecutionResult(successes=successes, errors=errors) + all_deployment_ids = {deployment.deployment_info.id for deployment in deployments} + await self._applier.clear_deploying_revision(all_deployment_ids) + log.info( + "Cleared deploying_revision for {} rolling-back deployments", + len(all_deployment_ids), + ) + return DeploymentExecutionResult(successes=list(deployments)) @override async def post_process(self, result: DeploymentExecutionResult) -> None: diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/applier.py b/src/ai/backend/manager/sokovan/deployment/strategy/applier.py index d73630dfe84..dea1c908005 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/applier.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/applier.py @@ -30,24 +30,32 @@ class StrategyApplyResult: completed_ids: set[UUID] = field(default_factory=set) """Deployment IDs that completed and had their revision swapped.""" - rolled_back_ids: set[UUID] = field(default_factory=set) - """Deployment IDs that rolled back and had their deploying_revision cleared.""" - routes_created: int = 0 """Number of new routes rolled out.""" routes_drained: int = 0 """Number of routes marked for draining.""" + def has_mutations(self) -> bool: + """Check if there are any route mutations to persist. + + Returns True when at least one of the following is present: + new routes to roll out, routes to drain, or deployments completed. + """ + return bool(self.completed_ids or self.routes_created or self.routes_drained) + class StrategyResultApplier: """Applies a ``StrategyEvaluationSummary`` to the database. - Handles all DB mutations from a strategy evaluation cycle: - 1. Sub-step assignment updates - 2. Route rollout (create) and drain (terminate) - 3. Revision swap for COMPLETED deployments - 4. Clear deploying_revision for ROLLED_BACK deployments + Handles route mutations from a strategy evaluation cycle: + 1. Route rollout (create) and drain (terminate) + 2. Revision swap for COMPLETED deployments + + Sub-step transitions are handled exclusively by the coordinator. + Clearing ``deploying_revision`` for rolled-back deployments is the + responsibility of ``DeployingRollingBackHandler``, which explicitly + calls ``clear_deploying_revision`` after rollback completes. """ def __init__(self, deployment_repo: DeploymentRepository) -> None: @@ -56,16 +64,12 @@ def __init__(self, deployment_repo: DeploymentRepository) -> None: async def apply(self, summary: StrategyEvaluationSummary) -> StrategyApplyResult: changes = summary.route_changes completed_ids: set[UUID] = set() - rolled_back_ids: set[UUID] = set() for endpoint_id, sub_step in summary.assignments.items(): if sub_step == DeploymentSubStep.COMPLETED: completed_ids.add(endpoint_id) - elif sub_step == DeploymentSubStep.ROLLED_BACK: - rolled_back_ids.add(endpoint_id) result = StrategyApplyResult( completed_ids=completed_ids, - rolled_back_ids=rolled_back_ids, routes_created=len(changes.rollout_specs), routes_drained=len(changes.drain_route_ids), ) @@ -83,20 +87,17 @@ async def apply(self, summary: StrategyEvaluationSummary) -> StrategyApplyResult rollout = changes.rollout_specs - if not (summary.assignments or rollout or drain or completed_ids or rolled_back_ids): + if not result.has_mutations(): return result swapped = await self._deployment_repo.apply_strategy_mutations( - assignments=summary.assignments, rollout=rollout, drain=drain, completed_ids=completed_ids, - rolled_back_ids=rolled_back_ids, ) log.debug( - "Applied evaluation: {} assignments, {} routes created, {} routes drained", - len(summary.assignments), + "Applied evaluation: {} routes created, {} routes drained", result.routes_created, result.routes_drained, ) @@ -106,10 +107,13 @@ async def apply(self, summary: StrategyEvaluationSummary) -> StrategyApplyResult swapped, len(completed_ids), ) - if rolled_back_ids: - log.info( - "Cleared deploying_revision for {} rolled-back deployments", - len(rolled_back_ids), - ) return result + + async def clear_deploying_revision(self, deployment_ids: set[UUID]) -> None: + """Clear deploying_revision and sub_step for rolled-back deployments. + + Called explicitly by ``DeployingRollingBackHandler`` after rollback + completes — NOT automatically during ``apply()``. + """ + await self._deployment_repo.clear_deploying_revision(deployment_ids) diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/types.py b/src/ai/backend/manager/sokovan/deployment/strategy/types.py index c22c2f2ad72..213d23d5eba 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/types.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/types.py @@ -31,8 +31,7 @@ class RouteChanges: class StrategyCycleResult: """Result of evaluating a single deployment's strategy cycle. - ``sub_step`` indicates the next state: PROVISIONING, PROGRESSING, - ROLLING_BACK, COMPLETED, or ROLLED_BACK. + ``sub_step`` indicates the next state: PROVISIONING or COMPLETED. """ sub_step: DeploymentSubStep @@ -53,7 +52,7 @@ class StrategyEvaluationSummary: The evaluator classifies each deployment into a sub_step and records the mapping so the applier can bulk-update the DB column. - All outcomes — including ROLLING_BACK, COMPLETED, and ROLLED_BACK — are expressed + All outcomes — including ROLLING_BACK and COMPLETED — are expressed as sub_step values and persisted to the DB by the applier. """ diff --git a/src/ai/backend/manager/sokovan/deployment/types.py b/src/ai/backend/manager/sokovan/deployment/types.py index 60547bbbce8..f4e56c0a39a 100644 --- a/src/ai/backend/manager/sokovan/deployment/types.py +++ b/src/ai/backend/manager/sokovan/deployment/types.py @@ -41,6 +41,7 @@ class DeploymentExecutionResult: successes: list[DeploymentWithHistory] = field(default_factory=list) errors: list[DeploymentExecutionError] = field(default_factory=list) skipped: list[DeploymentWithHistory] = field(default_factory=list) + need_retry: list[DeploymentWithHistory] = field(default_factory=list) @dataclass diff --git a/tests/unit/manager/sokovan/deployment/strategy/test_applier.py b/tests/unit/manager/sokovan/deployment/strategy/test_applier.py index faa398b3016..7853d05bea5 100644 --- a/tests/unit/manager/sokovan/deployment/strategy/test_applier.py +++ b/tests/unit/manager/sokovan/deployment/strategy/test_applier.py @@ -70,7 +70,7 @@ def summary_with_rollout() -> StrategyEvaluationSummary: @pytest.fixture def summary_with_drain() -> StrategyEvaluationSummary: return _build_summary( - {uuid4(): DeploymentSubStep.PROGRESSING}, + {uuid4(): DeploymentSubStep.PROVISIONING}, route_changes=RouteChanges(drain_route_ids=[uuid4()]), ) @@ -88,29 +88,20 @@ def completed_summary() -> tuple[StrategyEvaluationSummary, set[UUID]]: @pytest.fixture -def rolled_back_summary() -> tuple[StrategyEvaluationSummary, set[UUID]]: - ep_id = uuid4() - summary = _build_summary({ep_id: DeploymentSubStep.ROLLED_BACK}) - return summary, {ep_id} - - -@pytest.fixture -def mixed_summary() -> tuple[StrategyEvaluationSummary, UUID, UUID, UUID]: +def mixed_summary() -> tuple[StrategyEvaluationSummary, UUID, UUID]: provisioning_id = uuid4() completed_id = uuid4() - rolled_back_id = uuid4() summary = _build_summary( { provisioning_id: DeploymentSubStep.PROVISIONING, completed_id: DeploymentSubStep.COMPLETED, - rolled_back_id: DeploymentSubStep.ROLLED_BACK, }, route_changes=RouteChanges( rollout_specs=[MagicMock()], drain_route_ids=[uuid4()], ), ) - return summary, provisioning_id, completed_id, rolled_back_id + return summary, provisioning_id, completed_id # ============================================================================= @@ -131,24 +122,22 @@ async def test_empty_summary_skips_all_db_calls( mock_deployment_repo.apply_strategy_mutations.assert_not_called() assert result.completed_ids == set() - assert result.rolled_back_ids == set() assert result.routes_created == 0 assert result.routes_drained == 0 - async def test_assignments_only( + async def test_assignments_only_no_db_call( self, applier: StrategyResultApplier, mock_deployment_repo: AsyncMock, provisioning_summary: StrategyEvaluationSummary, ) -> None: + """Assignments without route changes should not trigger DB mutations. + + Sub-step transitions are handled exclusively by the coordinator. + """ result = await applier.apply(provisioning_summary) - mock_deployment_repo.apply_strategy_mutations.assert_called_once() - kwargs = mock_deployment_repo.apply_strategy_mutations.call_args.kwargs - assert kwargs["completed_ids"] == set() - assert kwargs["rolled_back_ids"] == set() - assert kwargs["drain"] is None - assert not kwargs["rollout"] + mock_deployment_repo.apply_strategy_mutations.assert_not_called() assert result.routes_created == 0 assert result.routes_drained == 0 @@ -182,10 +171,15 @@ async def test_no_drain_routes_passes_none( self, applier: StrategyResultApplier, mock_deployment_repo: AsyncMock, - provisioning_summary: StrategyEvaluationSummary, ) -> None: - await applier.apply(provisioning_summary) + """Summary with only rollout should pass drain=None.""" + summary = _build_summary( + {uuid4(): DeploymentSubStep.PROVISIONING}, + route_changes=RouteChanges(rollout_specs=[MagicMock()]), + ) + await applier.apply(summary) + mock_deployment_repo.apply_strategy_mutations.assert_called_once() kwargs = mock_deployment_repo.apply_strategy_mutations.call_args.kwargs assert kwargs["drain"] is None @@ -202,31 +196,15 @@ async def test_completed_passes_completed_ids( kwargs = mock_deployment_repo.apply_strategy_mutations.call_args.kwargs assert kwargs["completed_ids"] == completed_ids - assert kwargs["rolled_back_ids"] == set() assert result.completed_ids == completed_ids - async def test_rolled_back_passes_rolled_back_ids( - self, - applier: StrategyResultApplier, - mock_deployment_repo: AsyncMock, - rolled_back_summary: tuple[StrategyEvaluationSummary, set[UUID]], - ) -> None: - summary, rolled_back_ids = rolled_back_summary - - result = await applier.apply(summary) - - kwargs = mock_deployment_repo.apply_strategy_mutations.call_args.kwargs - assert kwargs["rolled_back_ids"] == rolled_back_ids - assert kwargs["completed_ids"] == set() - assert result.rolled_back_ids == rolled_back_ids - async def test_mixed_assignments_handles_all( self, applier: StrategyResultApplier, mock_deployment_repo: AsyncMock, - mixed_summary: tuple[StrategyEvaluationSummary, UUID, UUID, UUID], + mixed_summary: tuple[StrategyEvaluationSummary, UUID, UUID], ) -> None: - summary, _provisioning_id, completed_id, rolled_back_id = mixed_summary + summary, _provisioning_id, completed_id = mixed_summary mock_deployment_repo.apply_strategy_mutations.return_value = 1 result = await applier.apply(summary) @@ -234,11 +212,9 @@ async def test_mixed_assignments_handles_all( mock_deployment_repo.apply_strategy_mutations.assert_called_once() kwargs = mock_deployment_repo.apply_strategy_mutations.call_args.kwargs assert kwargs["completed_ids"] == {completed_id} - assert kwargs["rolled_back_ids"] == {rolled_back_id} assert len(kwargs["rollout"]) == 1 assert kwargs["drain"] is not None assert result.completed_ids == {completed_id} - assert result.rolled_back_ids == {rolled_back_id} assert result.routes_created == 1 assert result.routes_drained == 1 diff --git a/tests/unit/manager/sokovan/deployment/test_coordinator_history.py b/tests/unit/manager/sokovan/deployment/test_coordinator_history.py index 30b97dd71ce..10c465a331f 100644 --- a/tests/unit/manager/sokovan/deployment/test_coordinator_history.py +++ b/tests/unit/manager/sokovan/deployment/test_coordinator_history.py @@ -381,3 +381,80 @@ async def test_skips_history_when_handler_returns_empty( ) mock_deployment_repository.update_endpoint_lifecycle_bulk_with_history.assert_not_called() + + async def test_records_history_on_need_retry( + self, + coordinator_with_pending_deployments: DeploymentCoordinator, + mock_deployment_repository: AsyncMock, + sample_deployment_with_history: DeploymentWithHistory, + ) -> None: + """History is recorded when handler returns need_retry result.""" + need_retry_status = DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.DEPLOYING) + mock_handler = MagicMock(spec=DeploymentHandler) + mock_handler.name = MagicMock(return_value="deploying_progressing") + mock_handler.lock_id = None + mock_handler.target_statuses = MagicMock( + return_value=[DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.DEPLOYING)] + ) + mock_handler.status_transitions = MagicMock( + return_value=DeploymentStatusTransitions( + success=DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.READY), + need_retry=need_retry_status, + ) + ) + mock_handler.execute = AsyncMock( + return_value=DeploymentExecutionResult( + successes=[], + errors=[], + need_retry=[sample_deployment_with_history], + ) + ) + mock_handler.post_process = AsyncMock() + + coordinator_with_pending_deployments._registry.handlers = { + (DeploymentLifecycleType.CHECK_PENDING, None): mock_handler + } + + await coordinator_with_pending_deployments.process_deployment_lifecycle( + DeploymentLifecycleType.CHECK_PENDING + ) + + mock_deployment_repository.update_endpoint_lifecycle_bulk_with_history.assert_called_once() + + async def test_need_retry_without_transition_does_not_record_history( + self, + coordinator_with_pending_deployments: DeploymentCoordinator, + mock_deployment_repository: AsyncMock, + sample_deployment_with_history: DeploymentWithHistory, + ) -> None: + """No history recorded when need_retry result exists but transitions.need_retry is None.""" + mock_handler = MagicMock(spec=DeploymentHandler) + mock_handler.name = MagicMock(return_value="deploying_progressing") + mock_handler.lock_id = None + mock_handler.target_statuses = MagicMock( + return_value=[DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.DEPLOYING)] + ) + mock_handler.status_transitions = MagicMock( + return_value=DeploymentStatusTransitions( + success=DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.READY), + need_retry=None, + ) + ) + mock_handler.execute = AsyncMock( + return_value=DeploymentExecutionResult( + successes=[], + errors=[], + need_retry=[sample_deployment_with_history], + ) + ) + mock_handler.post_process = AsyncMock() + + coordinator_with_pending_deployments._registry.handlers = { + (DeploymentLifecycleType.CHECK_PENDING, None): mock_handler + } + + await coordinator_with_pending_deployments.process_deployment_lifecycle( + DeploymentLifecycleType.CHECK_PENDING + ) + + mock_deployment_repository.update_endpoint_lifecycle_bulk_with_history.assert_not_called()