Skip to content
Merged
1 change: 1 addition & 0 deletions changes/10276.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Consolidate deploying handlers and remove unused sub-steps
22 changes: 10 additions & 12 deletions src/ai/backend/manager/data/deployment/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,28 +153,22 @@ class DeploymentSubStatus(enum.StrEnum):

Each lifecycle type can define its own sub-status enum by
inheriting from this class. For example, DEPLOYING handlers
use ``DeploymentSubStep`` (provisioning, progressing, …).
use ``DeploymentSubStep`` (provisioning, rolling_back, …).
"""


class DeploymentSubStep(DeploymentSubStatus):
"""Sub-steps for the DEPLOYING lifecycle phase.

Active states:
- PROVISIONING: New revision routes are being provisioned; waiting for readiness.
- PROGRESSING: Actively replacing old routes with new routes.
- ROLLING_BACK: Actively rolling back failed new routes to previous revision.

Terminal markers (no handler execution, trigger transition only):
- COMPLETED: All strategy conditions satisfied; ready for revision swap.
- ROLLED_BACK: Rollback finished; ready for cleanup and transition to READY.
- PROVISIONING: New revision routes are being provisioned and old routes
are being drained. The main handler for rolling updates.
- ROLLING_BACK: Clearing deploying_revision and transitioning to READY.
- COMPLETED: All strategy conditions satisfied; triggers revision swap.
"""

PROVISIONING = "provisioning"
PROGRESSING = "progressing"
ROLLING_BACK = "rolling_back"
COMPLETED = "completed"
ROLLED_BACK = "rolled_back"


@dataclass(frozen=True)
Expand All @@ -201,7 +195,11 @@ class DeploymentStatusTransitions:

Attributes:
success: Target lifecycle when handler succeeds, None means no change
need_retry: Target lifecycle when handler fails but can retry
need_retry: Target lifecycle when handler fails but can retry, or when
route mutations were executed but the deployment stays in the same
sub-step (e.g. PROVISIONING → PROVISIONING after create/drain).
Items explicitly returned as need_retry by handlers are never
escalated to give_up — they represent normal progress.
expired: Target lifecycle when time elapsed in current state
give_up: Target lifecycle when retry count exceeded
"""
Expand Down
1 change: 1 addition & 0 deletions src/ai/backend/manager/repositories/base/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# QueryCondition now returns a ColumnElement (whereclause) instead of modifying stmt
type QueryCondition = Callable[[], sa.sql.expression.ColumnElement[bool]]


T = TypeVar("T")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2519,38 +2519,22 @@ async def search_deployment_policies(

async def apply_strategy_mutations(
self,
assignments: Mapping[uuid.UUID, DeploymentSubStep],
rollout: Sequence[RBACEntityCreator[RoutingRow]],
drain: BatchUpdater[RoutingRow] | None,
completed_ids: set[uuid.UUID],
rolled_back_ids: set[uuid.UUID],
) -> int:
"""Apply all DB mutations from a strategy evaluation cycle in a single transaction.
"""Apply route mutations from a strategy evaluation cycle in a single transaction.

Sub-step transitions are handled exclusively by the coordinator
via ``EndpointLifecycleBatchUpdaterSpec``.

Returns:
Number of deployments whose revision was swapped.
"""
async with self._begin_session_read_committed() as db_sess:
await self._update_sub_steps(db_sess, assignments)
await self._create_routes(db_sess, rollout)
await self._drain_routes(db_sess, drain)
swapped = await self._complete_deployment_revision_swap(db_sess, completed_ids)
await self._clear_deploying_revision(db_sess, rolled_back_ids)
return swapped

@staticmethod
async def _update_sub_steps(
db_sess: SASession,
assignments: Mapping[uuid.UUID, DeploymentSubStep],
) -> None:
"""Update deployment sub-step assignments."""
for endpoint_id, sub_step in assignments.items():
query = (
sa.update(EndpointRow)
.where(EndpointRow.id == endpoint_id)
.values(sub_step=sub_step)
)
await db_sess.execute(query)
return await self._complete_deployment_revision_swap(db_sess, completed_ids)

@staticmethod
async def _create_routes(
Expand Down Expand Up @@ -2593,20 +2577,24 @@ async def _complete_deployment_revision_swap(
result = await db_sess.execute(query)
return cast(CursorResult[Any], result).rowcount

@staticmethod
async def _clear_deploying_revision(
db_sess: SASession,
rolled_back_ids: set[uuid.UUID],
async def clear_deploying_revision(
self,
deployment_ids: set[uuid.UUID],
) -> None:
"""Clear deploying_revision for rolled-back deployments."""
if not rolled_back_ids:
"""Clear deploying_revision and sub_step for rolled-back deployments.

This is called explicitly by ``DeployingRollingBackHandler`` after
rollback completes, NOT automatically by apply_strategy_mutations.
"""
if not deployment_ids:
return
query = (
sa.update(EndpointRow)
.where(EndpointRow.id.in_(rolled_back_ids))
.values(
deploying_revision=None,
sub_step=None,
async with self._begin_session_read_committed() as db_sess:
query = (
sa.update(EndpointRow)
.where(EndpointRow.id.in_(deployment_ids))
.values(
deploying_revision=None,
sub_step=None,
)
)
)
await db_sess.execute(query)
await db_sess.execute(query)
21 changes: 14 additions & 7 deletions src/ai/backend/manager/repositories/deployment/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -1406,26 +1406,33 @@ async def search_deployment_policies(
"""
return await self._db_source.search_deployment_policies(querier)

@deployment_repository_resilience.apply()
async def apply_strategy_mutations(
self,
assignments: Mapping[UUID, DeploymentSubStep],
rollout: Sequence[RBACEntityCreator[RoutingRow]],
drain: BatchUpdater[RoutingRow] | None,
completed_ids: set[UUID],
Comment on lines 1410 to 1414
rolled_back_ids: set[UUID],
) -> int:
"""Apply all DB mutations from a strategy evaluation cycle.
"""Apply route mutations from a strategy evaluation cycle.

Performs sub-step updates, route rollout/drain, revision swap,
and deploying_revision cleanup in a single transaction.
Performs route rollout/drain and revision swap in a single transaction.
Sub-step transitions are handled by the coordinator via
``EndpointLifecycleBatchUpdaterSpec``.

Returns:
Number of deployments whose revision was swapped.
"""
return await self._db_source.apply_strategy_mutations(
assignments=assignments,
rollout=rollout,
drain=drain,
completed_ids=completed_ids,
rolled_back_ids=rolled_back_ids,
)

@deployment_repository_resilience.apply()
async def clear_deploying_revision(self, deployment_ids: set[UUID]) -> None:
"""Clear deploying_revision and sub_step for rolled-back deployments.

Called explicitly by ``DeployingRollingBackHandler`` after rollback
completes — NOT automatically during strategy mutations.
Comment on lines +1432 to +1436
"""
await self._db_source.clear_deploying_revision(deployment_ids)
30 changes: 17 additions & 13 deletions src/ai/backend/manager/sokovan/deployment/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
from .handlers import (
CheckPendingDeploymentHandler,
CheckReplicaDeploymentHandler,
DeployingProgressingHandler,
DeployingProvisioningHandler,
DeployingRollingBackHandler,
DeploymentHandler,
Expand Down Expand Up @@ -325,21 +324,11 @@ def _init_handlers(
applier=applier,
),
),
(
(DeploymentLifecycleType.DEPLOYING, DeploymentSubStep.PROGRESSING),
DeployingProgressingHandler(
deployment_controller=self._deployment_controller,
route_controller=self._route_controller,
evaluator=evaluator,
applier=applier,
),
),
(
(DeploymentLifecycleType.DEPLOYING, DeploymentSubStep.ROLLING_BACK),
DeployingRollingBackHandler(
deployment_controller=self._deployment_controller,
route_controller=self._route_controller,
evaluator=evaluator,
applier=applier,
),
),
Expand Down Expand Up @@ -437,8 +426,8 @@ async def _handle_status_transitions(

transitions = handler.status_transitions()

# Success transitions (None = stay in current state)
if transitions.success is not None and result.successes:
# Success transitions
if result.successes and transitions.success is not None:
transition = self._build_success_transition(
handler_name=handler_name,
deployments=result.successes,
Expand All @@ -451,6 +440,21 @@ async def _handle_status_transitions(
all_history_specs.extend(transition.history_specs)
notification_events.extend(transition.notification_events)

# Explicit need_retry from handlers (e.g. route mutations in progress).
# These are never escalated to give_up — they represent normal progress.
if result.need_retry and transitions.need_retry is not None:
transition = self._build_success_transition(
handler_name=handler_name,
deployments=result.need_retry,
lifecycle_status=transitions.need_retry,
target_lifecycles=target_statuses,
records=records,
timestamp_now=timestamp_now,
)
batch_updaters.append(transition.updater)
all_history_specs.extend(transition.history_specs)
notification_events.extend(transition.notification_events)

# Failure transitions — classify into need_retry/expired/give_up
if result.errors:
current_dbtime = await self._deployment_repository.get_db_now()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from .base import DeploymentHandler
from .deploying import (
DeployingProgressingHandler,
DeployingProvisioningHandler,
DeployingRollingBackHandler,
)
Expand All @@ -17,7 +16,6 @@
__all__ = [
"CheckPendingDeploymentHandler",
"CheckReplicaDeploymentHandler",
"DeployingProgressingHandler",
"DeployingProvisioningHandler",
"DeployingRollingBackHandler",
"DeploymentHandler",
Expand Down
Loading
Loading