From fb226c2a580222acd632335813ca2a37814ebf85 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Tue, 17 Mar 2026 14:58:56 +0900 Subject: [PATCH 01/12] refactor: Remove sub_step writes from strategy applier Remove assignments parameter and _update_sub_steps calls from apply_strategy_mutations so that sub_step transitions are handled exclusively by the coordinator's status transition mechanism. Extract clear_deploying_revision as a separate public method on both the repository and db_source layers. Update tests to reflect that assignments-only summaries no longer trigger DB mutations. This is a prerequisite refactoring for the rolling update deployment strategy (BEP-1049). --- .../deployment/db_source/db_source.py | 58 ++++++++----------- .../repositories/deployment/repository.py | 19 +++--- .../sokovan/deployment/strategy/applier.py | 27 +++++---- .../deployment/strategy/test_applier.py | 34 ++++++----- 4 files changed, 69 insertions(+), 69 deletions(-) diff --git a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py index 359799094c4..ace64cf7efc 100644 --- a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py +++ b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py @@ -2519,38 +2519,22 @@ async def search_deployment_policies( async def apply_strategy_mutations( self, - assignments: Mapping[uuid.UUID, DeploymentSubStep], rollout: Sequence[RBACEntityCreator[RoutingRow]], drain: BatchUpdater[RoutingRow] | None, completed_ids: set[uuid.UUID], - rolled_back_ids: set[uuid.UUID], ) -> int: - """Apply all DB mutations from a strategy evaluation cycle in a single transaction. + """Apply route mutations from a strategy evaluation cycle in a single transaction. + + Sub-step transitions are handled exclusively by the coordinator + via ``EndpointLifecycleBatchUpdaterSpec``. Returns: Number of deployments whose revision was swapped. """ async with self._begin_session_read_committed() as db_sess: - await self._update_sub_steps(db_sess, assignments) await self._create_routes(db_sess, rollout) await self._drain_routes(db_sess, drain) - swapped = await self._complete_deployment_revision_swap(db_sess, completed_ids) - await self._clear_deploying_revision(db_sess, rolled_back_ids) - return swapped - - @staticmethod - async def _update_sub_steps( - db_sess: SASession, - assignments: Mapping[uuid.UUID, DeploymentSubStep], - ) -> None: - """Update deployment sub-step assignments.""" - for endpoint_id, sub_step in assignments.items(): - query = ( - sa.update(EndpointRow) - .where(EndpointRow.id == endpoint_id) - .values(sub_step=sub_step) - ) - await db_sess.execute(query) + return await self._complete_deployment_revision_swap(db_sess, completed_ids) @staticmethod async def _create_routes( @@ -2593,20 +2577,24 @@ async def _complete_deployment_revision_swap( result = await db_sess.execute(query) return cast(CursorResult[Any], result).rowcount - @staticmethod - async def _clear_deploying_revision( - db_sess: SASession, - rolled_back_ids: set[uuid.UUID], + async def clear_deploying_revision( + self, + deployment_ids: set[uuid.UUID], ) -> None: - """Clear deploying_revision for rolled-back deployments.""" - if not rolled_back_ids: + """Clear deploying_revision and sub_step for rolled-back deployments. + + This is called explicitly by the RollingBackHandler after rollback + completes, NOT automatically by apply_strategy_mutations. + """ + if not deployment_ids: return - query = ( - sa.update(EndpointRow) - .where(EndpointRow.id.in_(rolled_back_ids)) - .values( - deploying_revision=None, - sub_step=None, + async with self._begin_session_read_committed() as db_sess: + query = ( + sa.update(EndpointRow) + .where(EndpointRow.id.in_(deployment_ids)) + .values( + deploying_revision=None, + sub_step=None, + ) ) - ) - await db_sess.execute(query) + await db_sess.execute(query) diff --git a/src/ai/backend/manager/repositories/deployment/repository.py b/src/ai/backend/manager/repositories/deployment/repository.py index 2e6f9a7a4ab..1267dfbe974 100644 --- a/src/ai/backend/manager/repositories/deployment/repository.py +++ b/src/ai/backend/manager/repositories/deployment/repository.py @@ -1408,24 +1408,29 @@ async def search_deployment_policies( async def apply_strategy_mutations( self, - assignments: Mapping[UUID, DeploymentSubStep], rollout: Sequence[RBACEntityCreator[RoutingRow]], drain: BatchUpdater[RoutingRow] | None, completed_ids: set[UUID], - rolled_back_ids: set[UUID], ) -> int: - """Apply all DB mutations from a strategy evaluation cycle. + """Apply route mutations from a strategy evaluation cycle. - Performs sub-step updates, route rollout/drain, revision swap, - and deploying_revision cleanup in a single transaction. + Performs route rollout/drain and revision swap in a single transaction. + Sub-step transitions are handled by the coordinator via + ``EndpointLifecycleBatchUpdaterSpec``. Returns: Number of deployments whose revision was swapped. """ return await self._db_source.apply_strategy_mutations( - assignments=assignments, rollout=rollout, drain=drain, completed_ids=completed_ids, - rolled_back_ids=rolled_back_ids, ) + + async def clear_deploying_revision(self, deployment_ids: set[UUID]) -> None: + """Clear deploying_revision and sub_step for rolled-back deployments. + + Called explicitly by ``DeployingRollingBackHandler`` after rollback + completes — NOT automatically during strategy mutations. + """ + await self._db_source.clear_deploying_revision(deployment_ids) diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/applier.py b/src/ai/backend/manager/sokovan/deployment/strategy/applier.py index d73630dfe84..28f55dc02e6 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/applier.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/applier.py @@ -31,7 +31,7 @@ class StrategyApplyResult: """Deployment IDs that completed and had their revision swapped.""" rolled_back_ids: set[UUID] = field(default_factory=set) - """Deployment IDs that rolled back and had their deploying_revision cleared.""" + """Deployment IDs whose strategy evaluator determined rollback.""" routes_created: int = 0 """Number of new routes rolled out.""" @@ -47,7 +47,10 @@ class StrategyResultApplier: 1. Sub-step assignment updates 2. Route rollout (create) and drain (terminate) 3. Revision swap for COMPLETED deployments - 4. Clear deploying_revision for ROLLED_BACK deployments + + Note: Clearing ``deploying_revision`` for rolled-back deployments is NOT + done here — it is the responsibility of ``DeployingRollingBackHandler`` + to explicitly call ``clear_deploying_revision`` after rollback completes. """ def __init__(self, deployment_repo: DeploymentRepository) -> None: @@ -83,20 +86,17 @@ async def apply(self, summary: StrategyEvaluationSummary) -> StrategyApplyResult rollout = changes.rollout_specs - if not (summary.assignments or rollout or drain or completed_ids or rolled_back_ids): + if not (rollout or drain or completed_ids): return result swapped = await self._deployment_repo.apply_strategy_mutations( - assignments=summary.assignments, rollout=rollout, drain=drain, completed_ids=completed_ids, - rolled_back_ids=rolled_back_ids, ) log.debug( - "Applied evaluation: {} assignments, {} routes created, {} routes drained", - len(summary.assignments), + "Applied evaluation: {} routes created, {} routes drained", result.routes_created, result.routes_drained, ) @@ -106,10 +106,13 @@ async def apply(self, summary: StrategyEvaluationSummary) -> StrategyApplyResult swapped, len(completed_ids), ) - if rolled_back_ids: - log.info( - "Cleared deploying_revision for {} rolled-back deployments", - len(rolled_back_ids), - ) return result + + async def clear_deploying_revision(self, deployment_ids: set[UUID]) -> None: + """Clear deploying_revision and sub_step for rolled-back deployments. + + Called explicitly by ``DeployingRollingBackHandler`` after rollback + completes — NOT automatically during ``apply()``. + """ + await self._deployment_repo.clear_deploying_revision(deployment_ids) diff --git a/tests/unit/manager/sokovan/deployment/strategy/test_applier.py b/tests/unit/manager/sokovan/deployment/strategy/test_applier.py index faa398b3016..9c7e19bfb16 100644 --- a/tests/unit/manager/sokovan/deployment/strategy/test_applier.py +++ b/tests/unit/manager/sokovan/deployment/strategy/test_applier.py @@ -135,20 +135,19 @@ async def test_empty_summary_skips_all_db_calls( assert result.routes_created == 0 assert result.routes_drained == 0 - async def test_assignments_only( + async def test_assignments_only_no_db_call( self, applier: StrategyResultApplier, mock_deployment_repo: AsyncMock, provisioning_summary: StrategyEvaluationSummary, ) -> None: + """Assignments without route changes should not trigger DB mutations. + + Sub-step transitions are handled exclusively by the coordinator. + """ result = await applier.apply(provisioning_summary) - mock_deployment_repo.apply_strategy_mutations.assert_called_once() - kwargs = mock_deployment_repo.apply_strategy_mutations.call_args.kwargs - assert kwargs["completed_ids"] == set() - assert kwargs["rolled_back_ids"] == set() - assert kwargs["drain"] is None - assert not kwargs["rollout"] + mock_deployment_repo.apply_strategy_mutations.assert_not_called() assert result.routes_created == 0 assert result.routes_drained == 0 @@ -182,9 +181,13 @@ async def test_no_drain_routes_passes_none( self, applier: StrategyResultApplier, mock_deployment_repo: AsyncMock, - provisioning_summary: StrategyEvaluationSummary, ) -> None: - await applier.apply(provisioning_summary) + """Summary with only rollout should pass drain=None.""" + summary = _build_summary( + {uuid4(): DeploymentSubStep.PROVISIONING}, + route_changes=RouteChanges(rollout_specs=[MagicMock()]), + ) + await applier.apply(summary) kwargs = mock_deployment_repo.apply_strategy_mutations.call_args.kwargs assert kwargs["drain"] is None @@ -202,23 +205,25 @@ async def test_completed_passes_completed_ids( kwargs = mock_deployment_repo.apply_strategy_mutations.call_args.kwargs assert kwargs["completed_ids"] == completed_ids - assert kwargs["rolled_back_ids"] == set() assert result.completed_ids == completed_ids - async def test_rolled_back_passes_rolled_back_ids( + async def test_rolled_back_sets_result_ids( self, applier: StrategyResultApplier, mock_deployment_repo: AsyncMock, rolled_back_summary: tuple[StrategyEvaluationSummary, set[UUID]], ) -> None: + """Rolled-back IDs are tracked in result but not passed to DB mutations. + + The RollingBackHandler is responsible for clearing deploying_revision. + """ summary, rolled_back_ids = rolled_back_summary result = await applier.apply(summary) - kwargs = mock_deployment_repo.apply_strategy_mutations.call_args.kwargs - assert kwargs["rolled_back_ids"] == rolled_back_ids - assert kwargs["completed_ids"] == set() assert result.rolled_back_ids == rolled_back_ids + # No route mutations, so apply_strategy_mutations should not be called + mock_deployment_repo.apply_strategy_mutations.assert_not_called() async def test_mixed_assignments_handles_all( self, @@ -234,7 +239,6 @@ async def test_mixed_assignments_handles_all( mock_deployment_repo.apply_strategy_mutations.assert_called_once() kwargs = mock_deployment_repo.apply_strategy_mutations.call_args.kwargs assert kwargs["completed_ids"] == {completed_id} - assert kwargs["rolled_back_ids"] == {rolled_back_id} assert len(kwargs["rollout"]) == 1 assert kwargs["drain"] is not None assert result.completed_ids == {completed_id} From a239c0a23a08fd13f3b98e2fe3b606e899900300 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Tue, 17 Mar 2026 15:09:34 +0900 Subject: [PATCH 02/12] feat: Add rewind transition and QueryCondition combinators MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add rewind as a new transition category in DeploymentStatusTransitions and DeploymentExecutionResult. Unlike need_retry, rewind does not increment phase_attempts — it represents normal forward progress (e.g. progressing → provisioning for the next batch in rolling update). Also add or_conditions/and_conditions utilities for combining QueryConditions, useful for building complex route filters. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../backend/manager/data/deployment/types.py | 5 ++ .../manager/repositories/base/__init__.py | 4 + .../manager/repositories/base/types.py | 33 ++++++++ .../manager/sokovan/deployment/coordinator.py | 15 +++- .../manager/sokovan/deployment/types.py | 1 + .../deployment/test_coordinator_history.py | 77 +++++++++++++++++++ 6 files changed, 131 insertions(+), 4 deletions(-) diff --git a/src/ai/backend/manager/data/deployment/types.py b/src/ai/backend/manager/data/deployment/types.py index 8087ae091f9..9583e018299 100644 --- a/src/ai/backend/manager/data/deployment/types.py +++ b/src/ai/backend/manager/data/deployment/types.py @@ -204,12 +204,17 @@ class DeploymentStatusTransitions: need_retry: Target lifecycle when handler fails but can retry expired: Target lifecycle when time elapsed in current state give_up: Target lifecycle when retry count exceeded + rewind: Target lifecycle for normal forward progress that returns to a + previous sub-step (e.g. progressing → provisioning for the next + batch in rolling update). Unlike need_retry, rewind does NOT + increment phase_attempts. """ success: DeploymentLifecycleStatus | None = None need_retry: DeploymentLifecycleStatus | None = None expired: DeploymentLifecycleStatus | None = None give_up: DeploymentLifecycleStatus | None = None + rewind: DeploymentLifecycleStatus | None = None @dataclass(frozen=True) diff --git a/src/ai/backend/manager/repositories/base/__init__.py b/src/ai/backend/manager/repositories/base/__init__.py index a473c3a840a..59ed85cd00b 100644 --- a/src/ai/backend/manager/repositories/base/__init__.py +++ b/src/ai/backend/manager/repositories/base/__init__.py @@ -62,6 +62,8 @@ QueryCondition, QueryOrder, SearchScope, + and_conditions, + or_conditions, ) from .updater import ( BatchUpdater, @@ -169,4 +171,6 @@ # Utils "combine_conditions_or", "negate_conditions", + "or_conditions", + "and_conditions", ] diff --git a/src/ai/backend/manager/repositories/base/types.py b/src/ai/backend/manager/repositories/base/types.py index 217d80a25eb..3b153ade241 100644 --- a/src/ai/backend/manager/repositories/base/types.py +++ b/src/ai/backend/manager/repositories/base/types.py @@ -19,6 +19,39 @@ # QueryCondition now returns a ColumnElement (whereclause) instead of modifying stmt type QueryCondition = Callable[[], sa.sql.expression.ColumnElement[bool]] + +def or_conditions(*conditions: QueryCondition) -> QueryCondition: + """Combine multiple QueryConditions with OR. + + Args: + *conditions: Two or more QueryCondition callables to combine. + + Returns: + A single QueryCondition that evaluates to the OR of all inputs. + """ + + def inner() -> sa.sql.expression.ColumnElement[bool]: + return sa.or_(*[c() for c in conditions]) + + return inner + + +def and_conditions(*conditions: QueryCondition) -> QueryCondition: + """Combine multiple QueryConditions with AND. + + Args: + *conditions: Two or more QueryCondition callables to combine. + + Returns: + A single QueryCondition that evaluates to the AND of all inputs. + """ + + def inner() -> sa.sql.expression.ColumnElement[bool]: + return sa.and_(*[c() for c in conditions]) + + return inner + + T = TypeVar("T") diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index 56f55c8167c..1fc3613e13f 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -437,12 +437,19 @@ async def _handle_status_transitions( transitions = handler.status_transitions() - # Success transitions (None = stay in current state) - if transitions.success is not None and result.successes: + # Success and rewind both advance without incrementing phase_attempts. + # Rewind returns to a previous sub-step as normal forward progress + # (e.g. progressing → provisioning for the next batch of routes). + for deployments, lifecycle_status in ( + (result.successes, transitions.success), + (result.rewind, transitions.rewind), + ): + if not deployments or lifecycle_status is None: + continue transition = self._build_success_transition( handler_name=handler_name, - deployments=result.successes, - lifecycle_status=transitions.success, + deployments=deployments, + lifecycle_status=lifecycle_status, target_lifecycles=target_statuses, records=records, timestamp_now=timestamp_now, diff --git a/src/ai/backend/manager/sokovan/deployment/types.py b/src/ai/backend/manager/sokovan/deployment/types.py index 60547bbbce8..497ab165a3d 100644 --- a/src/ai/backend/manager/sokovan/deployment/types.py +++ b/src/ai/backend/manager/sokovan/deployment/types.py @@ -41,6 +41,7 @@ class DeploymentExecutionResult: successes: list[DeploymentWithHistory] = field(default_factory=list) errors: list[DeploymentExecutionError] = field(default_factory=list) skipped: list[DeploymentWithHistory] = field(default_factory=list) + rewind: list[DeploymentWithHistory] = field(default_factory=list) @dataclass diff --git a/tests/unit/manager/sokovan/deployment/test_coordinator_history.py b/tests/unit/manager/sokovan/deployment/test_coordinator_history.py index 30b97dd71ce..b18ab78f1db 100644 --- a/tests/unit/manager/sokovan/deployment/test_coordinator_history.py +++ b/tests/unit/manager/sokovan/deployment/test_coordinator_history.py @@ -381,3 +381,80 @@ async def test_skips_history_when_handler_returns_empty( ) mock_deployment_repository.update_endpoint_lifecycle_bulk_with_history.assert_not_called() + + async def test_records_history_on_rewind( + self, + coordinator_with_pending_deployments: DeploymentCoordinator, + mock_deployment_repository: AsyncMock, + sample_deployment_with_history: DeploymentWithHistory, + ) -> None: + """History is recorded when handler returns rewind result.""" + rewind_status = DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.DEPLOYING) + mock_handler = MagicMock(spec=DeploymentHandler) + mock_handler.name = MagicMock(return_value="deploying_progressing") + mock_handler.lock_id = None + mock_handler.target_statuses = MagicMock( + return_value=[DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.DEPLOYING)] + ) + mock_handler.status_transitions = MagicMock( + return_value=DeploymentStatusTransitions( + success=DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.READY), + rewind=rewind_status, + ) + ) + mock_handler.execute = AsyncMock( + return_value=DeploymentExecutionResult( + successes=[], + errors=[], + rewind=[sample_deployment_with_history], + ) + ) + mock_handler.post_process = AsyncMock() + + coordinator_with_pending_deployments._registry.handlers = { + (DeploymentLifecycleType.CHECK_PENDING, None): mock_handler + } + + await coordinator_with_pending_deployments.process_deployment_lifecycle( + DeploymentLifecycleType.CHECK_PENDING + ) + + mock_deployment_repository.update_endpoint_lifecycle_bulk_with_history.assert_called_once() + + async def test_rewind_without_transition_does_not_record_history( + self, + coordinator_with_pending_deployments: DeploymentCoordinator, + mock_deployment_repository: AsyncMock, + sample_deployment_with_history: DeploymentWithHistory, + ) -> None: + """No history recorded when rewind result exists but transitions.rewind is None.""" + mock_handler = MagicMock(spec=DeploymentHandler) + mock_handler.name = MagicMock(return_value="deploying_progressing") + mock_handler.lock_id = None + mock_handler.target_statuses = MagicMock( + return_value=[DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.DEPLOYING)] + ) + mock_handler.status_transitions = MagicMock( + return_value=DeploymentStatusTransitions( + success=DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.READY), + rewind=None, + ) + ) + mock_handler.execute = AsyncMock( + return_value=DeploymentExecutionResult( + successes=[], + errors=[], + rewind=[sample_deployment_with_history], + ) + ) + mock_handler.post_process = AsyncMock() + + coordinator_with_pending_deployments._registry.handlers = { + (DeploymentLifecycleType.CHECK_PENDING, None): mock_handler + } + + await coordinator_with_pending_deployments.process_deployment_lifecycle( + DeploymentLifecycleType.CHECK_PENDING + ) + + mock_deployment_repository.update_endpoint_lifecycle_bulk_with_history.assert_not_called() From bec490aed58842d1c2303add04056d60f6ee8ab4 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Tue, 17 Mar 2026 15:16:03 +0900 Subject: [PATCH 03/12] docs: Add news fragment --- changes/10276.feature.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/10276.feature.md diff --git a/changes/10276.feature.md b/changes/10276.feature.md new file mode 100644 index 00000000000..ef21cc97956 --- /dev/null +++ b/changes/10276.feature.md @@ -0,0 +1 @@ +Move `sub_step` transitions to coordinator with rewind support \ No newline at end of file From abced25be2e3a65bfc5cf767ac69e5e2de3ffab5 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Tue, 17 Mar 2026 15:23:13 +0900 Subject: [PATCH 04/12] wip --- .../deployment/db_source/db_source.py | 4 ++-- .../repositories/deployment/repository.py | 2 ++ .../sokovan/deployment/strategy/applier.py | 16 ++++++++-------- .../sokovan/deployment/strategy/test_applier.py | 3 ++- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py index ace64cf7efc..17d7582410f 100644 --- a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py +++ b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py @@ -2583,8 +2583,8 @@ async def clear_deploying_revision( ) -> None: """Clear deploying_revision and sub_step for rolled-back deployments. - This is called explicitly by the RollingBackHandler after rollback - completes, NOT automatically by apply_strategy_mutations. + This is called explicitly by ``DeployingRollingBackHandler`` after + rollback completes, NOT automatically by apply_strategy_mutations. """ if not deployment_ids: return diff --git a/src/ai/backend/manager/repositories/deployment/repository.py b/src/ai/backend/manager/repositories/deployment/repository.py index 1267dfbe974..c2aff57e249 100644 --- a/src/ai/backend/manager/repositories/deployment/repository.py +++ b/src/ai/backend/manager/repositories/deployment/repository.py @@ -1406,6 +1406,7 @@ async def search_deployment_policies( """ return await self._db_source.search_deployment_policies(querier) + @deployment_repository_resilience.apply() async def apply_strategy_mutations( self, rollout: Sequence[RBACEntityCreator[RoutingRow]], @@ -1427,6 +1428,7 @@ async def apply_strategy_mutations( completed_ids=completed_ids, ) + @deployment_repository_resilience.apply() async def clear_deploying_revision(self, deployment_ids: set[UUID]) -> None: """Clear deploying_revision and sub_step for rolled-back deployments. diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/applier.py b/src/ai/backend/manager/sokovan/deployment/strategy/applier.py index 28f55dc02e6..861a12805ab 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/applier.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/applier.py @@ -43,14 +43,14 @@ class StrategyApplyResult: class StrategyResultApplier: """Applies a ``StrategyEvaluationSummary`` to the database. - Handles all DB mutations from a strategy evaluation cycle: - 1. Sub-step assignment updates - 2. Route rollout (create) and drain (terminate) - 3. Revision swap for COMPLETED deployments - - Note: Clearing ``deploying_revision`` for rolled-back deployments is NOT - done here — it is the responsibility of ``DeployingRollingBackHandler`` - to explicitly call ``clear_deploying_revision`` after rollback completes. + Handles route mutations from a strategy evaluation cycle: + 1. Route rollout (create) and drain (terminate) + 2. Revision swap for COMPLETED deployments + + Sub-step transitions are handled exclusively by the coordinator. + Clearing ``deploying_revision`` for rolled-back deployments is the + responsibility of ``DeployingRollingBackHandler``, which explicitly + calls ``clear_deploying_revision`` after rollback completes. """ def __init__(self, deployment_repo: DeploymentRepository) -> None: diff --git a/tests/unit/manager/sokovan/deployment/strategy/test_applier.py b/tests/unit/manager/sokovan/deployment/strategy/test_applier.py index 9c7e19bfb16..11872c064a2 100644 --- a/tests/unit/manager/sokovan/deployment/strategy/test_applier.py +++ b/tests/unit/manager/sokovan/deployment/strategy/test_applier.py @@ -189,6 +189,7 @@ async def test_no_drain_routes_passes_none( ) await applier.apply(summary) + mock_deployment_repo.apply_strategy_mutations.assert_called_once() kwargs = mock_deployment_repo.apply_strategy_mutations.call_args.kwargs assert kwargs["drain"] is None @@ -215,7 +216,7 @@ async def test_rolled_back_sets_result_ids( ) -> None: """Rolled-back IDs are tracked in result but not passed to DB mutations. - The RollingBackHandler is responsible for clearing deploying_revision. + The DeployingRollingBackHandler is responsible for clearing deploying_revision. """ summary, rolled_back_ids = rolled_back_summary From b4298031680db3e4b9239e09000823a39da4ca01 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Tue, 17 Mar 2026 17:12:46 +0900 Subject: [PATCH 05/12] wip --- .../deployment/db_source/db_source.py | 5 +- .../scheduling_history/creators.py | 1 + .../manager/sokovan/deployment/coordinator.py | 25 +- .../sokovan/deployment/handlers/__init__.py | 2 - .../sokovan/deployment/handlers/deploying.py | 298 ++++-------------- 5 files changed, 81 insertions(+), 250 deletions(-) diff --git a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py index 17d7582410f..e27790a2173 100644 --- a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py +++ b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py @@ -728,10 +728,11 @@ async def update_endpoint_lifecycle_bulk_with_history( merge_ids: list[uuid.UUID] = [] create_rows: list[DeploymentHistoryRow] = [] - for new_row in new_rows: + for spec, new_row in zip(bulk_creator.specs, new_rows, strict=True): + allow_merge = getattr(spec, "allow_merge", True) last_row = last_records.get(new_row.deployment_id) - if last_row is not None and last_row.should_merge_with(new_row): + if allow_merge and last_row is not None and last_row.should_merge_with(new_row): merge_ids.append(last_row.id) else: create_rows.append(new_row) diff --git a/src/ai/backend/manager/repositories/scheduling_history/creators.py b/src/ai/backend/manager/repositories/scheduling_history/creators.py index 949b1727333..474fdd4e913 100644 --- a/src/ai/backend/manager/repositories/scheduling_history/creators.py +++ b/src/ai/backend/manager/repositories/scheduling_history/creators.py @@ -99,6 +99,7 @@ class DeploymentHistoryCreatorSpec(CreatorSpec[DeploymentHistoryRow]): to_status: EndpointLifecycle | None = None error_code: str | None = None sub_steps: list[SubStepResult] = field(default_factory=list) + allow_merge: bool = True @override def build_row(self) -> DeploymentHistoryRow: diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index 1fc3613e13f..cbd085f0e99 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -61,7 +61,6 @@ from .handlers import ( CheckPendingDeploymentHandler, CheckReplicaDeploymentHandler, - DeployingProgressingHandler, DeployingProvisioningHandler, DeployingRollingBackHandler, DeploymentHandler, @@ -325,21 +324,11 @@ def _init_handlers( applier=applier, ), ), - ( - (DeploymentLifecycleType.DEPLOYING, DeploymentSubStep.PROGRESSING), - DeployingProgressingHandler( - deployment_controller=self._deployment_controller, - route_controller=self._route_controller, - evaluator=evaluator, - applier=applier, - ), - ), ( (DeploymentLifecycleType.DEPLOYING, DeploymentSubStep.ROLLING_BACK), DeployingRollingBackHandler( deployment_controller=self._deployment_controller, route_controller=self._route_controller, - evaluator=evaluator, applier=applier, ), ), @@ -438,11 +427,12 @@ async def _handle_status_transitions( transitions = handler.status_transitions() # Success and rewind both advance without incrementing phase_attempts. - # Rewind returns to a previous sub-step as normal forward progress - # (e.g. progressing → provisioning for the next batch of routes). - for deployments, lifecycle_status in ( - (result.successes, transitions.success), - (result.rewind, transitions.rewind), + # Rewind records route mutations within the same phase (e.g. + # PROVISIONING → PROVISIONING) — each rewind creates a separate + # history record (allow_merge=False) so progress is visible. + for deployments, lifecycle_status, allow_merge in ( + (result.successes, transitions.success, True), + (result.rewind, transitions.rewind, False), ): if not deployments or lifecycle_status is None: continue @@ -453,6 +443,7 @@ async def _handle_status_transitions( target_lifecycles=target_statuses, records=records, timestamp_now=timestamp_now, + allow_merge=allow_merge, ) batch_updaters.append(transition.updater) all_history_specs.extend(transition.history_specs) @@ -532,6 +523,7 @@ def _build_success_transition( target_lifecycles: list[DeploymentLifecycleStatus], records: Mapping[UUID, ExecutionRecord], timestamp_now: str, + allow_merge: bool = True, ) -> _TransitionResult: next_lifecycle = lifecycle_status.lifecycle target_lifecycle_stages = [status.lifecycle for status in target_lifecycles] @@ -544,6 +536,7 @@ def _build_success_transition( message=f"{handler_name} completed successfully", from_status=deployment.deployment_info.state.lifecycle, to_status=next_lifecycle, + allow_merge=allow_merge, sub_steps=self._build_history_sub_steps( deployment.deployment_info.id, records, diff --git a/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py b/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py index 489070e9143..961e4544307 100644 --- a/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py +++ b/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py @@ -4,7 +4,6 @@ from .base import DeploymentHandler from .deploying import ( - DeployingProgressingHandler, DeployingProvisioningHandler, DeployingRollingBackHandler, ) @@ -17,7 +16,6 @@ __all__ = [ "CheckPendingDeploymentHandler", "CheckReplicaDeploymentHandler", - "DeployingProgressingHandler", "DeployingProvisioningHandler", "DeployingRollingBackHandler", "DeploymentHandler", diff --git a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py index f95886e90e4..b7336b34c26 100644 --- a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py +++ b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py @@ -1,29 +1,26 @@ """Handlers for DEPLOYING sub-steps (BEP-1049). -Three DEPLOYING handlers are registered flat in the coordinator's HandlerRegistry -alongside other lifecycle handlers, keyed by ``(lifecycle_type, sub_step)``. -Each handler calls the strategy evaluator and applier directly in ``execute()``. +Two DEPLOYING handlers are registered in the coordinator's HandlerRegistry: + +- **DeployingProvisioningHandler**: Runs the strategy FSM each cycle to + create/drain routes and check for completion. +- **DeployingRollingBackHandler**: Clears ``deploying_revision`` and + transitions directly to READY. Sub-step flow:: - PROVISIONING ──(success)──▸ PROGRESSING - │ │ - │ (expired/give_up) ┌──────┴──────┐ - ▼ ▼ ▼ - ROLLING_BACK COMPLETED ROLLING_BACK - │ │ │ - │ (success) │ (success) │ (success) - ▼ ▼ ▼ - ROLLED_BACK READY ROLLED_BACK - │ │ - │ (success, via Progressing) │ (success, via Progressing) - ▼ ▼ - READY READY + PROVISIONING ──(rewind)──▸ PROVISIONING (route mutations, logged) + │ + │ (success) + ▼ + READY (completed — all routes replaced) + + PROVISIONING ──(timeout)──▸ ROLLING_BACK ──(success)──▸ READY The evaluator determines sub-step assignments and route mutations; the applier persists them to DB atomically. Each handler classifies -deployments into successes (transition forward), errors (failure path), -and skipped (still in progress — no state transition). +deployments into successes (transition forward), rewind (route mutations +with history logged), and skipped (no change — waiting). """ from __future__ import annotations @@ -67,12 +64,18 @@ class DeployingProvisioningHandler(DeploymentHandler): - """Handler for DEPLOYING / PROVISIONING sub-step. + """Handler for the DEPLOYING lifecycle (PROVISIONING + ROLLED_BACK sub-steps). + + Runs the strategy FSM each cycle to create/drain routes and check + for completion. Classification: + + - **Route mutations executed** (create/drain): rewind — stays in + PROVISIONING with a new history record for progress tracking. + - **No changes** (routes still warming up): skipped — no history. + - **Completed** (all old routes replaced): success → READY. - New-revision routes are being created; waiting for them to become HEALTHY. - The evaluator assigns sub-steps; when all new routes are healthy the - deployment advances to PROGRESSING (success), otherwise it stays in - PROVISIONING (skipped — no state transition). + ROLLED_BACK deployments (cleared by RollingBackHandler) are + transitioned to READY directly without FSM evaluation. """ def __init__( @@ -104,7 +107,7 @@ def target_statuses(cls) -> list[DeploymentLifecycleStatus]: DeploymentLifecycleStatus( lifecycle=EndpointLifecycle.DEPLOYING, sub_status=DeploymentSubStep.PROVISIONING, - ) + ), ] @classmethod @@ -112,155 +115,30 @@ def target_statuses(cls) -> list[DeploymentLifecycleStatus]: def status_transitions(cls) -> DeploymentStatusTransitions: return DeploymentStatusTransitions( success=DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.PROGRESSING, - ), - need_retry=None, - expired=DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.ROLLING_BACK, + lifecycle=EndpointLifecycle.READY, + sub_status=None, ), - give_up=DeploymentLifecycleStatus( + rewind=DeploymentLifecycleStatus( lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.ROLLING_BACK, - ), - ) - - @override - async def execute( - self, deployments: Sequence[DeploymentWithHistory] - ) -> DeploymentExecutionResult: - deployment_infos = [deployment.deployment_info for deployment in deployments] - deployment_map = {deployment.deployment_info.id: deployment for deployment in deployments} - - summary = await self._evaluator.evaluate(deployment_infos) - await self._applier.apply(summary) - - successes: list[DeploymentWithHistory] = [] - skipped: list[DeploymentWithHistory] = [] - - # Classify by assigned sub_step - for deployment in deployments: - endpoint_id = deployment.deployment_info.id - assigned = summary.assignments.get(endpoint_id) - if assigned is None: - # Evaluation error — handled below - continue - if assigned == DeploymentSubStep.PROGRESSING: - # Advanced to PROGRESSING → success (coordinator transitions) - successes.append(deployment) - else: - # Still PROVISIONING → skip (no state transition) - skipped.append(deployment) - - # Evaluation errors → execution errors - errors = [ - DeploymentExecutionError( - deployment_info=deployment_map[evaluation_error.deployment.id], - reason=evaluation_error.reason, - error_detail=evaluation_error.reason, - ) - for evaluation_error in summary.errors - if evaluation_error.deployment.id in deployment_map - ] - - return DeploymentExecutionResult(successes=successes, errors=errors, skipped=skipped) - - @override - async def post_process(self, result: DeploymentExecutionResult) -> None: - await self._deployment_controller.mark_lifecycle_needed( - DeploymentLifecycleType.DEPLOYING, sub_step=DeploymentSubStep.PROVISIONING - ) - await self._route_controller.mark_lifecycle_needed(RouteLifecycleType.PROVISIONING) - - -class DeployingProgressingHandler(DeploymentHandler): - """Handler for DEPLOYING / PROGRESSING (+ COMPLETED, ROLLED_BACK). - - This single handler processes three sub-steps: - - - **PROGRESSING**: Still replacing routes — re-evaluate next cycle. - - **COMPLETED**: Applier has swapped revision → returned as success - → coordinator transitions to READY. - - **ROLLED_BACK**: Applier has cleared deploying_revision → returned - as error → coordinator transitions to READY. - """ - - def __init__( - self, - deployment_controller: DeploymentController, - route_controller: RouteController, - evaluator: DeploymentStrategyEvaluator, - applier: StrategyResultApplier, - ) -> None: - self._deployment_controller = deployment_controller - self._route_controller = route_controller - self._evaluator = evaluator - self._applier = applier - - @classmethod - @override - def name(cls) -> str: - return "deploying-progressing" - - @property - @override - def lock_id(self) -> LockID | None: - return LockID.LOCKID_DEPLOYMENT_DEPLOYING - - @classmethod - @override - def target_statuses(cls) -> list[DeploymentLifecycleStatus]: - return [ - DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.PROGRESSING, - ), - DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.COMPLETED, - ), - DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.ROLLED_BACK, + sub_status=DeploymentSubStep.PROVISIONING, ), - ] - - @classmethod - @override - def status_transitions(cls) -> DeploymentStatusTransitions: - ready = DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.READY, - sub_status=None, - ) - rolling_back = DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.ROLLING_BACK, - ) - return DeploymentStatusTransitions( - success=ready, - need_retry=None, - expired=rolling_back, - give_up=rolling_back, ) @override async def execute( self, deployments: Sequence[DeploymentWithHistory] ) -> DeploymentExecutionResult: - deployment_infos = [deployment.deployment_info for deployment in deployments] - deployment_map = {deployment.deployment_info.id: deployment for deployment in deployments} + deployment_infos = [d.deployment_info for d in deployments] + deployment_map = {d.deployment_info.id: d for d in deployments} summary = await self._evaluator.evaluate(deployment_infos) apply_result = await self._applier.apply(summary) - # Filter out deployments that have been marked for destruction during DEPLOYING. - # Without this guard, a COMPLETED sub_step would swap revisions and - # transition the deployment back to READY, resurrecting it. + # Filter out deployments marked for destruction during DEPLOYING. destroying_ids = { - deployment.deployment_info.id - for deployment in deployments - if deployment.deployment_info.state.lifecycle + d.deployment_info.id + for d in deployments + if d.deployment_info.state.lifecycle in (EndpointLifecycle.DESTROYING, EndpointLifecycle.DESTROYED) } if destroying_ids: @@ -272,11 +150,9 @@ async def execute( successes: list[DeploymentWithHistory] = [] errors: list[DeploymentExecutionError] = [] skipped: list[DeploymentWithHistory] = [] + rewind: list[DeploymentWithHistory] = [] - terminal_ids = apply_result.completed_ids | apply_result.rolled_back_ids - evaluation_error_ids = {e.deployment.id for e in summary.errors} - - # COMPLETED → successes (coordinator transitions to READY) + # COMPLETED → success (coordinator transitions to READY) for endpoint_id in apply_result.completed_ids: if endpoint_id in destroying_ids: continue @@ -284,20 +160,6 @@ async def execute( if deployment is not None: successes.append(deployment) - # ROLLED_BACK → errors (coordinator transitions to READY) - for endpoint_id in apply_result.rolled_back_ids: - if endpoint_id in destroying_ids: - continue - deployment = deployment_map.get(endpoint_id) - if deployment is not None: - errors.append( - DeploymentExecutionError( - deployment_info=deployment, - reason="Deployment rolled back — all new routes failed", - error_detail="Strategy FSM determined rollback", - ) - ) - # Evaluation errors → execution errors for error_data in summary.errors: deployment = deployment_map.get(error_data.deployment.id) @@ -310,22 +172,31 @@ async def execute( ) ) - # Still PROGRESSING → skipped (no state transition) - for deployment in deployments: + # Classify remaining: route mutations happened → rewind (history logged), + # no changes → skipped (no history). + completed_or_error_ids = apply_result.completed_ids | { + e.deployment.id for e in summary.errors + } + has_route_mutations = bool(apply_result.routes_created or apply_result.routes_drained) + for deployment in remaining: endpoint_id = deployment.deployment_info.id - if ( - endpoint_id not in terminal_ids - and endpoint_id not in destroying_ids - and endpoint_id not in evaluation_error_ids - ): + if endpoint_id in completed_or_error_ids or endpoint_id in destroying_ids: + continue + if has_route_mutations: + # Routes were created/drained this cycle → rewind to log progress + rewind.append(deployment) + else: + # No changes, still waiting → skipped skipped.append(deployment) - return DeploymentExecutionResult(successes=successes, errors=errors, skipped=skipped) + return DeploymentExecutionResult( + successes=successes, errors=errors, skipped=skipped, rewind=rewind + ) @override async def post_process(self, result: DeploymentExecutionResult) -> None: await self._deployment_controller.mark_lifecycle_needed( - DeploymentLifecycleType.DEPLOYING, sub_step=DeploymentSubStep.PROGRESSING + DeploymentLifecycleType.DEPLOYING, sub_step=DeploymentSubStep.PROVISIONING ) await self._route_controller.mark_lifecycle_needed(RouteLifecycleType.PROVISIONING) @@ -333,23 +204,18 @@ async def post_process(self, result: DeploymentExecutionResult) -> None: class DeployingRollingBackHandler(DeploymentHandler): """Handler for DEPLOYING / ROLLING_BACK sub-step. - Actively rolling back failed new-revision routes to the previous revision. - The evaluator re-evaluates the deployment (which is now in ROLLING_BACK) - and the applier drains new-revision routes and restores old-revision routes. - Once rollback is complete, the evaluator assigns ROLLED_BACK, which the - ProgressingHandler will pick up and transition to READY. + Clears ``deploying_revision`` and transitions directly to READY. + This is a cleanup-only operation — no FSM evaluation needed. """ def __init__( self, deployment_controller: DeploymentController, route_controller: RouteController, - evaluator: DeploymentStrategyEvaluator, applier: StrategyResultApplier, ) -> None: self._deployment_controller = deployment_controller self._route_controller = route_controller - self._evaluator = evaluator self._applier = applier @classmethod @@ -377,17 +243,8 @@ def target_statuses(cls) -> list[DeploymentLifecycleStatus]: def status_transitions(cls) -> DeploymentStatusTransitions: return DeploymentStatusTransitions( success=DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.ROLLED_BACK, - ), - need_retry=None, - expired=DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.ROLLED_BACK, - ), - give_up=DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.DEPLOYING, - sub_status=DeploymentSubStep.ROLLED_BACK, + lifecycle=EndpointLifecycle.READY, + sub_status=None, ), ) @@ -395,32 +252,13 @@ def status_transitions(cls) -> DeploymentStatusTransitions: async def execute( self, deployments: Sequence[DeploymentWithHistory] ) -> DeploymentExecutionResult: - deployment_infos = [deployment.deployment_info for deployment in deployments] - deployment_map = {deployment.deployment_info.id: deployment for deployment in deployments} - - summary = await self._evaluator.evaluate(deployment_infos) - await self._applier.apply(summary) - - # Successfully evaluated deployments → successes (coordinator transitions to ROLLED_BACK) - evaluated_ids = set(summary.assignments.keys()) - successes = [ - deployment - for deployment in deployments - if deployment.deployment_info.id in evaluated_ids - ] - - # Evaluation errors → execution errors - errors = [ - DeploymentExecutionError( - deployment_info=deployment_map[evaluation_error.deployment.id], - reason=evaluation_error.reason, - error_detail=evaluation_error.reason, - ) - for evaluation_error in summary.errors - if evaluation_error.deployment.id in deployment_map - ] - - return DeploymentExecutionResult(successes=successes, errors=errors) + all_deployment_ids = {deployment.deployment_info.id for deployment in deployments} + await self._applier.clear_deploying_revision(all_deployment_ids) + log.info( + "Cleared deploying_revision for {} rolling-back deployments", + len(all_deployment_ids), + ) + return DeploymentExecutionResult(successes=list(deployments)) @override async def post_process(self, result: DeploymentExecutionResult) -> None: From ddbac2725d736bab905f637c2c3972653dd665a6 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Tue, 17 Mar 2026 18:26:27 +0900 Subject: [PATCH 06/12] feat: Consolidate deploying handlers and remove unused sub-steps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove DeployingProgressingHandler — ProvisioningHandler handles the entire DEPLOYING lifecycle - Remove PROGRESSING and ROLLED_BACK from DeploymentSubStep enum - Simplify RollingBackHandler to transition directly to READY - Remove rolled_back_ids from StrategyApplyResult - Add allow_merge flag to DeploymentHistoryCreatorSpec so rewind history records are never merged (each route mutation cycle is visible as a separate record) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../backend/manager/data/deployment/types.py | 25 +++++------- .../sokovan/deployment/handlers/deploying.py | 11 ++---- .../sokovan/deployment/strategy/applier.py | 7 ---- .../sokovan/deployment/strategy/types.py | 5 +-- .../deployment/strategy/test_applier.py | 39 +++---------------- 5 files changed, 20 insertions(+), 67 deletions(-) diff --git a/src/ai/backend/manager/data/deployment/types.py b/src/ai/backend/manager/data/deployment/types.py index 9583e018299..b8e99d68c2c 100644 --- a/src/ai/backend/manager/data/deployment/types.py +++ b/src/ai/backend/manager/data/deployment/types.py @@ -153,28 +153,22 @@ class DeploymentSubStatus(enum.StrEnum): Each lifecycle type can define its own sub-status enum by inheriting from this class. For example, DEPLOYING handlers - use ``DeploymentSubStep`` (provisioning, progressing, …). + use ``DeploymentSubStep`` (provisioning, rolling_back, …). """ class DeploymentSubStep(DeploymentSubStatus): """Sub-steps for the DEPLOYING lifecycle phase. - Active states: - - PROVISIONING: New revision routes are being provisioned; waiting for readiness. - - PROGRESSING: Actively replacing old routes with new routes. - - ROLLING_BACK: Actively rolling back failed new routes to previous revision. - - Terminal markers (no handler execution, trigger transition only): - - COMPLETED: All strategy conditions satisfied; ready for revision swap. - - ROLLED_BACK: Rollback finished; ready for cleanup and transition to READY. + - PROVISIONING: New revision routes are being provisioned and old routes + are being drained. The main handler for rolling updates. + - ROLLING_BACK: Clearing deploying_revision and transitioning to READY. + - COMPLETED: All strategy conditions satisfied; triggers revision swap. """ PROVISIONING = "provisioning" - PROGRESSING = "progressing" ROLLING_BACK = "rolling_back" COMPLETED = "completed" - ROLLED_BACK = "rolled_back" @dataclass(frozen=True) @@ -204,10 +198,11 @@ class DeploymentStatusTransitions: need_retry: Target lifecycle when handler fails but can retry expired: Target lifecycle when time elapsed in current state give_up: Target lifecycle when retry count exceeded - rewind: Target lifecycle for normal forward progress that returns to a - previous sub-step (e.g. progressing → provisioning for the next - batch in rolling update). Unlike need_retry, rewind does NOT - increment phase_attempts. + rewind: Target lifecycle when route mutations were executed but + the deployment stays in the same sub-step (e.g. PROVISIONING + → PROVISIONING after create/drain). Unlike need_retry, rewind + does NOT increment phase_attempts. Each rewind creates a + separate history record (allow_merge=False) for progress tracking. """ success: DeploymentLifecycleStatus | None = None diff --git a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py index b7336b34c26..e2b332b92ce 100644 --- a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py +++ b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py @@ -64,7 +64,7 @@ class DeployingProvisioningHandler(DeploymentHandler): - """Handler for the DEPLOYING lifecycle (PROVISIONING + ROLLED_BACK sub-steps). + """Handler for the DEPLOYING / PROVISIONING sub-step. Runs the strategy FSM each cycle to create/drain routes and check for completion. Classification: @@ -73,9 +73,6 @@ class DeployingProvisioningHandler(DeploymentHandler): PROVISIONING with a new history record for progress tracking. - **No changes** (routes still warming up): skipped — no history. - **Completed** (all old routes replaced): success → READY. - - ROLLED_BACK deployments (cleared by RollingBackHandler) are - transitioned to READY directly without FSM evaluation. """ def __init__( @@ -172,21 +169,19 @@ async def execute( ) ) - # Classify remaining: route mutations happened → rewind (history logged), + # Classify rest: route mutations happened → rewind (history logged), # no changes → skipped (no history). completed_or_error_ids = apply_result.completed_ids | { e.deployment.id for e in summary.errors } has_route_mutations = bool(apply_result.routes_created or apply_result.routes_drained) - for deployment in remaining: + for deployment in deployments: endpoint_id = deployment.deployment_info.id if endpoint_id in completed_or_error_ids or endpoint_id in destroying_ids: continue if has_route_mutations: - # Routes were created/drained this cycle → rewind to log progress rewind.append(deployment) else: - # No changes, still waiting → skipped skipped.append(deployment) return DeploymentExecutionResult( diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/applier.py b/src/ai/backend/manager/sokovan/deployment/strategy/applier.py index 861a12805ab..c7abcf5c315 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/applier.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/applier.py @@ -30,9 +30,6 @@ class StrategyApplyResult: completed_ids: set[UUID] = field(default_factory=set) """Deployment IDs that completed and had their revision swapped.""" - rolled_back_ids: set[UUID] = field(default_factory=set) - """Deployment IDs whose strategy evaluator determined rollback.""" - routes_created: int = 0 """Number of new routes rolled out.""" @@ -59,16 +56,12 @@ def __init__(self, deployment_repo: DeploymentRepository) -> None: async def apply(self, summary: StrategyEvaluationSummary) -> StrategyApplyResult: changes = summary.route_changes completed_ids: set[UUID] = set() - rolled_back_ids: set[UUID] = set() for endpoint_id, sub_step in summary.assignments.items(): if sub_step == DeploymentSubStep.COMPLETED: completed_ids.add(endpoint_id) - elif sub_step == DeploymentSubStep.ROLLED_BACK: - rolled_back_ids.add(endpoint_id) result = StrategyApplyResult( completed_ids=completed_ids, - rolled_back_ids=rolled_back_ids, routes_created=len(changes.rollout_specs), routes_drained=len(changes.drain_route_ids), ) diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/types.py b/src/ai/backend/manager/sokovan/deployment/strategy/types.py index c22c2f2ad72..213d23d5eba 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/types.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/types.py @@ -31,8 +31,7 @@ class RouteChanges: class StrategyCycleResult: """Result of evaluating a single deployment's strategy cycle. - ``sub_step`` indicates the next state: PROVISIONING, PROGRESSING, - ROLLING_BACK, COMPLETED, or ROLLED_BACK. + ``sub_step`` indicates the next state: PROVISIONING or COMPLETED. """ sub_step: DeploymentSubStep @@ -53,7 +52,7 @@ class StrategyEvaluationSummary: The evaluator classifies each deployment into a sub_step and records the mapping so the applier can bulk-update the DB column. - All outcomes — including ROLLING_BACK, COMPLETED, and ROLLED_BACK — are expressed + All outcomes — including ROLLING_BACK and COMPLETED — are expressed as sub_step values and persisted to the DB by the applier. """ diff --git a/tests/unit/manager/sokovan/deployment/strategy/test_applier.py b/tests/unit/manager/sokovan/deployment/strategy/test_applier.py index 11872c064a2..7853d05bea5 100644 --- a/tests/unit/manager/sokovan/deployment/strategy/test_applier.py +++ b/tests/unit/manager/sokovan/deployment/strategy/test_applier.py @@ -70,7 +70,7 @@ def summary_with_rollout() -> StrategyEvaluationSummary: @pytest.fixture def summary_with_drain() -> StrategyEvaluationSummary: return _build_summary( - {uuid4(): DeploymentSubStep.PROGRESSING}, + {uuid4(): DeploymentSubStep.PROVISIONING}, route_changes=RouteChanges(drain_route_ids=[uuid4()]), ) @@ -88,29 +88,20 @@ def completed_summary() -> tuple[StrategyEvaluationSummary, set[UUID]]: @pytest.fixture -def rolled_back_summary() -> tuple[StrategyEvaluationSummary, set[UUID]]: - ep_id = uuid4() - summary = _build_summary({ep_id: DeploymentSubStep.ROLLED_BACK}) - return summary, {ep_id} - - -@pytest.fixture -def mixed_summary() -> tuple[StrategyEvaluationSummary, UUID, UUID, UUID]: +def mixed_summary() -> tuple[StrategyEvaluationSummary, UUID, UUID]: provisioning_id = uuid4() completed_id = uuid4() - rolled_back_id = uuid4() summary = _build_summary( { provisioning_id: DeploymentSubStep.PROVISIONING, completed_id: DeploymentSubStep.COMPLETED, - rolled_back_id: DeploymentSubStep.ROLLED_BACK, }, route_changes=RouteChanges( rollout_specs=[MagicMock()], drain_route_ids=[uuid4()], ), ) - return summary, provisioning_id, completed_id, rolled_back_id + return summary, provisioning_id, completed_id # ============================================================================= @@ -131,7 +122,6 @@ async def test_empty_summary_skips_all_db_calls( mock_deployment_repo.apply_strategy_mutations.assert_not_called() assert result.completed_ids == set() - assert result.rolled_back_ids == set() assert result.routes_created == 0 assert result.routes_drained == 0 @@ -208,31 +198,13 @@ async def test_completed_passes_completed_ids( assert kwargs["completed_ids"] == completed_ids assert result.completed_ids == completed_ids - async def test_rolled_back_sets_result_ids( - self, - applier: StrategyResultApplier, - mock_deployment_repo: AsyncMock, - rolled_back_summary: tuple[StrategyEvaluationSummary, set[UUID]], - ) -> None: - """Rolled-back IDs are tracked in result but not passed to DB mutations. - - The DeployingRollingBackHandler is responsible for clearing deploying_revision. - """ - summary, rolled_back_ids = rolled_back_summary - - result = await applier.apply(summary) - - assert result.rolled_back_ids == rolled_back_ids - # No route mutations, so apply_strategy_mutations should not be called - mock_deployment_repo.apply_strategy_mutations.assert_not_called() - async def test_mixed_assignments_handles_all( self, applier: StrategyResultApplier, mock_deployment_repo: AsyncMock, - mixed_summary: tuple[StrategyEvaluationSummary, UUID, UUID, UUID], + mixed_summary: tuple[StrategyEvaluationSummary, UUID, UUID], ) -> None: - summary, _provisioning_id, completed_id, rolled_back_id = mixed_summary + summary, _provisioning_id, completed_id = mixed_summary mock_deployment_repo.apply_strategy_mutations.return_value = 1 result = await applier.apply(summary) @@ -243,7 +215,6 @@ async def test_mixed_assignments_handles_all( assert len(kwargs["rollout"]) == 1 assert kwargs["drain"] is not None assert result.completed_ids == {completed_id} - assert result.rolled_back_ids == {rolled_back_id} assert result.routes_created == 1 assert result.routes_drained == 1 From cb225a6ab312dbbf388f67ab6e6b43e9330fd629 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Wed, 18 Mar 2026 11:13:38 +0900 Subject: [PATCH 07/12] wip --- .../manager/repositories/deployment/db_source/db_source.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py index e27790a2173..23fc9081d75 100644 --- a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py +++ b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py @@ -148,6 +148,7 @@ ImageContext, UserContext, ) +from ai.backend.manager.repositories.scheduling_history.creators import DeploymentHistoryCreatorSpec from ai.backend.manager.utils import query_userinfo_from_session @@ -729,7 +730,7 @@ async def update_endpoint_lifecycle_bulk_with_history( create_rows: list[DeploymentHistoryRow] = [] for spec, new_row in zip(bulk_creator.specs, new_rows, strict=True): - allow_merge = getattr(spec, "allow_merge", True) + allow_merge = cast(DeploymentHistoryCreatorSpec, spec).allow_merge last_row = last_records.get(new_row.deployment_id) if allow_merge and last_row is not None and last_row.should_merge_with(new_row): From 16c64f729247bb4b84440a03f3e32545c040516d Mon Sep 17 00:00:00 2001 From: jopemachine Date: Wed, 18 Mar 2026 11:15:06 +0900 Subject: [PATCH 08/12] wip --- src/ai/backend/manager/data/deployment/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ai/backend/manager/data/deployment/types.py b/src/ai/backend/manager/data/deployment/types.py index b8e99d68c2c..f01b9811712 100644 --- a/src/ai/backend/manager/data/deployment/types.py +++ b/src/ai/backend/manager/data/deployment/types.py @@ -198,7 +198,7 @@ class DeploymentStatusTransitions: need_retry: Target lifecycle when handler fails but can retry expired: Target lifecycle when time elapsed in current state give_up: Target lifecycle when retry count exceeded - rewind: Target lifecycle when route mutations were executed but + rewind: Target lifecycle when route mutations were executed but the deployment stays in the same sub-step (e.g. PROVISIONING → PROVISIONING after create/drain). Unlike need_retry, rewind does NOT increment phase_attempts. Each rewind creates a From 728a7b5cbc6cbdd0c37ab716918e73b0ad2e3c18 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Wed, 18 Mar 2026 17:34:00 +0900 Subject: [PATCH 09/12] wip --- .../backend/manager/data/deployment/types.py | 12 +++---- .../deployment/db_source/db_source.py | 6 ++-- .../scheduling_history/creators.py | 1 - .../manager/sokovan/deployment/coordinator.py | 34 +++++++++++-------- .../sokovan/deployment/handlers/deploying.py | 17 +++++----- .../manager/sokovan/deployment/types.py | 2 +- .../deployment/test_coordinator_history.py | 18 +++++----- 7 files changed, 45 insertions(+), 45 deletions(-) diff --git a/src/ai/backend/manager/data/deployment/types.py b/src/ai/backend/manager/data/deployment/types.py index f01b9811712..f4e3c60dd72 100644 --- a/src/ai/backend/manager/data/deployment/types.py +++ b/src/ai/backend/manager/data/deployment/types.py @@ -195,21 +195,19 @@ class DeploymentStatusTransitions: Attributes: success: Target lifecycle when handler succeeds, None means no change - need_retry: Target lifecycle when handler fails but can retry + need_retry: Target lifecycle when handler fails but can retry, or when + route mutations were executed but the deployment stays in the same + sub-step (e.g. PROVISIONING → PROVISIONING after create/drain). + Items explicitly returned as need_retry by handlers are never + escalated to give_up — they represent normal progress. expired: Target lifecycle when time elapsed in current state give_up: Target lifecycle when retry count exceeded - rewind: Target lifecycle when route mutations were executed but - the deployment stays in the same sub-step (e.g. PROVISIONING - → PROVISIONING after create/drain). Unlike need_retry, rewind - does NOT increment phase_attempts. Each rewind creates a - separate history record (allow_merge=False) for progress tracking. """ success: DeploymentLifecycleStatus | None = None need_retry: DeploymentLifecycleStatus | None = None expired: DeploymentLifecycleStatus | None = None give_up: DeploymentLifecycleStatus | None = None - rewind: DeploymentLifecycleStatus | None = None @dataclass(frozen=True) diff --git a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py index 23fc9081d75..17d7582410f 100644 --- a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py +++ b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py @@ -148,7 +148,6 @@ ImageContext, UserContext, ) -from ai.backend.manager.repositories.scheduling_history.creators import DeploymentHistoryCreatorSpec from ai.backend.manager.utils import query_userinfo_from_session @@ -729,11 +728,10 @@ async def update_endpoint_lifecycle_bulk_with_history( merge_ids: list[uuid.UUID] = [] create_rows: list[DeploymentHistoryRow] = [] - for spec, new_row in zip(bulk_creator.specs, new_rows, strict=True): - allow_merge = cast(DeploymentHistoryCreatorSpec, spec).allow_merge + for new_row in new_rows: last_row = last_records.get(new_row.deployment_id) - if allow_merge and last_row is not None and last_row.should_merge_with(new_row): + if last_row is not None and last_row.should_merge_with(new_row): merge_ids.append(last_row.id) else: create_rows.append(new_row) diff --git a/src/ai/backend/manager/repositories/scheduling_history/creators.py b/src/ai/backend/manager/repositories/scheduling_history/creators.py index 474fdd4e913..949b1727333 100644 --- a/src/ai/backend/manager/repositories/scheduling_history/creators.py +++ b/src/ai/backend/manager/repositories/scheduling_history/creators.py @@ -99,7 +99,6 @@ class DeploymentHistoryCreatorSpec(CreatorSpec[DeploymentHistoryRow]): to_status: EndpointLifecycle | None = None error_code: str | None = None sub_steps: list[SubStepResult] = field(default_factory=list) - allow_merge: bool = True @override def build_row(self) -> DeploymentHistoryRow: diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index cbd085f0e99..98488ccd05d 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -426,24 +426,30 @@ async def _handle_status_transitions( transitions = handler.status_transitions() - # Success and rewind both advance without incrementing phase_attempts. - # Rewind records route mutations within the same phase (e.g. - # PROVISIONING → PROVISIONING) — each rewind creates a separate - # history record (allow_merge=False) so progress is visible. - for deployments, lifecycle_status, allow_merge in ( - (result.successes, transitions.success, True), - (result.rewind, transitions.rewind, False), - ): - if not deployments or lifecycle_status is None: - continue + # Success transitions + if result.successes and transitions.success is not None: + transition = self._build_success_transition( + handler_name=handler_name, + deployments=result.successes, + lifecycle_status=transitions.success, + target_lifecycles=target_statuses, + records=records, + timestamp_now=timestamp_now, + ) + batch_updaters.append(transition.updater) + all_history_specs.extend(transition.history_specs) + notification_events.extend(transition.notification_events) + + # Explicit need_retry from handlers (e.g. route mutations in progress). + # These are never escalated to give_up — they represent normal progress. + if result.need_retry and transitions.need_retry is not None: transition = self._build_success_transition( handler_name=handler_name, - deployments=deployments, - lifecycle_status=lifecycle_status, + deployments=result.need_retry, + lifecycle_status=transitions.need_retry, target_lifecycles=target_statuses, records=records, timestamp_now=timestamp_now, - allow_merge=allow_merge, ) batch_updaters.append(transition.updater) all_history_specs.extend(transition.history_specs) @@ -523,7 +529,6 @@ def _build_success_transition( target_lifecycles: list[DeploymentLifecycleStatus], records: Mapping[UUID, ExecutionRecord], timestamp_now: str, - allow_merge: bool = True, ) -> _TransitionResult: next_lifecycle = lifecycle_status.lifecycle target_lifecycle_stages = [status.lifecycle for status in target_lifecycles] @@ -536,7 +541,6 @@ def _build_success_transition( message=f"{handler_name} completed successfully", from_status=deployment.deployment_info.state.lifecycle, to_status=next_lifecycle, - allow_merge=allow_merge, sub_steps=self._build_history_sub_steps( deployment.deployment_info.id, records, diff --git a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py index e2b332b92ce..8f07354daba 100644 --- a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py +++ b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py @@ -9,7 +9,7 @@ Sub-step flow:: - PROVISIONING ──(rewind)──▸ PROVISIONING (route mutations, logged) + PROVISIONING ──(need_retry)──▸ PROVISIONING (route mutations, logged) │ │ (success) ▼ @@ -19,7 +19,7 @@ The evaluator determines sub-step assignments and route mutations; the applier persists them to DB atomically. Each handler classifies -deployments into successes (transition forward), rewind (route mutations +deployments into successes (transition forward), need_retry (route mutations with history logged), and skipped (no change — waiting). """ @@ -69,8 +69,9 @@ class DeployingProvisioningHandler(DeploymentHandler): Runs the strategy FSM each cycle to create/drain routes and check for completion. Classification: - - **Route mutations executed** (create/drain): rewind — stays in + - **Route mutations executed** (create/drain): need_retry — stays in PROVISIONING with a new history record for progress tracking. + Never escalated to give_up (normal progress). - **No changes** (routes still warming up): skipped — no history. - **Completed** (all old routes replaced): success → READY. """ @@ -115,7 +116,7 @@ def status_transitions(cls) -> DeploymentStatusTransitions: lifecycle=EndpointLifecycle.READY, sub_status=None, ), - rewind=DeploymentLifecycleStatus( + need_retry=DeploymentLifecycleStatus( lifecycle=EndpointLifecycle.DEPLOYING, sub_status=DeploymentSubStep.PROVISIONING, ), @@ -147,7 +148,7 @@ async def execute( successes: list[DeploymentWithHistory] = [] errors: list[DeploymentExecutionError] = [] skipped: list[DeploymentWithHistory] = [] - rewind: list[DeploymentWithHistory] = [] + need_retry: list[DeploymentWithHistory] = [] # COMPLETED → success (coordinator transitions to READY) for endpoint_id in apply_result.completed_ids: @@ -169,7 +170,7 @@ async def execute( ) ) - # Classify rest: route mutations happened → rewind (history logged), + # Classify rest: route mutations happened → need_retry (never give_up), # no changes → skipped (no history). completed_or_error_ids = apply_result.completed_ids | { e.deployment.id for e in summary.errors @@ -180,12 +181,12 @@ async def execute( if endpoint_id in completed_or_error_ids or endpoint_id in destroying_ids: continue if has_route_mutations: - rewind.append(deployment) + need_retry.append(deployment) else: skipped.append(deployment) return DeploymentExecutionResult( - successes=successes, errors=errors, skipped=skipped, rewind=rewind + successes=successes, errors=errors, skipped=skipped, need_retry=need_retry ) @override diff --git a/src/ai/backend/manager/sokovan/deployment/types.py b/src/ai/backend/manager/sokovan/deployment/types.py index 497ab165a3d..f4e56c0a39a 100644 --- a/src/ai/backend/manager/sokovan/deployment/types.py +++ b/src/ai/backend/manager/sokovan/deployment/types.py @@ -41,7 +41,7 @@ class DeploymentExecutionResult: successes: list[DeploymentWithHistory] = field(default_factory=list) errors: list[DeploymentExecutionError] = field(default_factory=list) skipped: list[DeploymentWithHistory] = field(default_factory=list) - rewind: list[DeploymentWithHistory] = field(default_factory=list) + need_retry: list[DeploymentWithHistory] = field(default_factory=list) @dataclass diff --git a/tests/unit/manager/sokovan/deployment/test_coordinator_history.py b/tests/unit/manager/sokovan/deployment/test_coordinator_history.py index b18ab78f1db..10c465a331f 100644 --- a/tests/unit/manager/sokovan/deployment/test_coordinator_history.py +++ b/tests/unit/manager/sokovan/deployment/test_coordinator_history.py @@ -382,14 +382,14 @@ async def test_skips_history_when_handler_returns_empty( mock_deployment_repository.update_endpoint_lifecycle_bulk_with_history.assert_not_called() - async def test_records_history_on_rewind( + async def test_records_history_on_need_retry( self, coordinator_with_pending_deployments: DeploymentCoordinator, mock_deployment_repository: AsyncMock, sample_deployment_with_history: DeploymentWithHistory, ) -> None: - """History is recorded when handler returns rewind result.""" - rewind_status = DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.DEPLOYING) + """History is recorded when handler returns need_retry result.""" + need_retry_status = DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.DEPLOYING) mock_handler = MagicMock(spec=DeploymentHandler) mock_handler.name = MagicMock(return_value="deploying_progressing") mock_handler.lock_id = None @@ -399,14 +399,14 @@ async def test_records_history_on_rewind( mock_handler.status_transitions = MagicMock( return_value=DeploymentStatusTransitions( success=DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.READY), - rewind=rewind_status, + need_retry=need_retry_status, ) ) mock_handler.execute = AsyncMock( return_value=DeploymentExecutionResult( successes=[], errors=[], - rewind=[sample_deployment_with_history], + need_retry=[sample_deployment_with_history], ) ) mock_handler.post_process = AsyncMock() @@ -421,13 +421,13 @@ async def test_records_history_on_rewind( mock_deployment_repository.update_endpoint_lifecycle_bulk_with_history.assert_called_once() - async def test_rewind_without_transition_does_not_record_history( + async def test_need_retry_without_transition_does_not_record_history( self, coordinator_with_pending_deployments: DeploymentCoordinator, mock_deployment_repository: AsyncMock, sample_deployment_with_history: DeploymentWithHistory, ) -> None: - """No history recorded when rewind result exists but transitions.rewind is None.""" + """No history recorded when need_retry result exists but transitions.need_retry is None.""" mock_handler = MagicMock(spec=DeploymentHandler) mock_handler.name = MagicMock(return_value="deploying_progressing") mock_handler.lock_id = None @@ -437,14 +437,14 @@ async def test_rewind_without_transition_does_not_record_history( mock_handler.status_transitions = MagicMock( return_value=DeploymentStatusTransitions( success=DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.READY), - rewind=None, + need_retry=None, ) ) mock_handler.execute = AsyncMock( return_value=DeploymentExecutionResult( successes=[], errors=[], - rewind=[sample_deployment_with_history], + need_retry=[sample_deployment_with_history], ) ) mock_handler.post_process = AsyncMock() From 51af136e044955080cd9f0e9e706c01dffbb9085 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Wed, 18 Mar 2026 17:38:26 +0900 Subject: [PATCH 10/12] wip --- .../manager/repositories/base/__init__.py | 4 --- .../manager/repositories/base/types.py | 32 ------------------- 2 files changed, 36 deletions(-) diff --git a/src/ai/backend/manager/repositories/base/__init__.py b/src/ai/backend/manager/repositories/base/__init__.py index 59ed85cd00b..a473c3a840a 100644 --- a/src/ai/backend/manager/repositories/base/__init__.py +++ b/src/ai/backend/manager/repositories/base/__init__.py @@ -62,8 +62,6 @@ QueryCondition, QueryOrder, SearchScope, - and_conditions, - or_conditions, ) from .updater import ( BatchUpdater, @@ -171,6 +169,4 @@ # Utils "combine_conditions_or", "negate_conditions", - "or_conditions", - "and_conditions", ] diff --git a/src/ai/backend/manager/repositories/base/types.py b/src/ai/backend/manager/repositories/base/types.py index 3b153ade241..89cbb39f354 100644 --- a/src/ai/backend/manager/repositories/base/types.py +++ b/src/ai/backend/manager/repositories/base/types.py @@ -20,38 +20,6 @@ type QueryCondition = Callable[[], sa.sql.expression.ColumnElement[bool]] -def or_conditions(*conditions: QueryCondition) -> QueryCondition: - """Combine multiple QueryConditions with OR. - - Args: - *conditions: Two or more QueryCondition callables to combine. - - Returns: - A single QueryCondition that evaluates to the OR of all inputs. - """ - - def inner() -> sa.sql.expression.ColumnElement[bool]: - return sa.or_(*[c() for c in conditions]) - - return inner - - -def and_conditions(*conditions: QueryCondition) -> QueryCondition: - """Combine multiple QueryConditions with AND. - - Args: - *conditions: Two or more QueryCondition callables to combine. - - Returns: - A single QueryCondition that evaluates to the AND of all inputs. - """ - - def inner() -> sa.sql.expression.ColumnElement[bool]: - return sa.and_(*[c() for c in conditions]) - - return inner - - T = TypeVar("T") From 29b63ac5d75d489f35659eb59d2ce76badd72337 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Wed, 18 Mar 2026 17:40:05 +0900 Subject: [PATCH 11/12] wip --- .../manager/sokovan/deployment/strategy/applier.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/applier.py b/src/ai/backend/manager/sokovan/deployment/strategy/applier.py index c7abcf5c315..dea1c908005 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/applier.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/applier.py @@ -36,6 +36,14 @@ class StrategyApplyResult: routes_drained: int = 0 """Number of routes marked for draining.""" + def has_mutations(self) -> bool: + """Check if there are any route mutations to persist. + + Returns True when at least one of the following is present: + new routes to roll out, routes to drain, or deployments completed. + """ + return bool(self.completed_ids or self.routes_created or self.routes_drained) + class StrategyResultApplier: """Applies a ``StrategyEvaluationSummary`` to the database. @@ -79,7 +87,7 @@ async def apply(self, summary: StrategyEvaluationSummary) -> StrategyApplyResult rollout = changes.rollout_specs - if not (rollout or drain or completed_ids): + if not result.has_mutations(): return result swapped = await self._deployment_repo.apply_strategy_mutations( From 3ae0da9665a914cb1ce74ddec10f4ed464a91000 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Wed, 18 Mar 2026 17:41:42 +0900 Subject: [PATCH 12/12] docs: Add news fragment --- changes/10276.feature.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/10276.feature.md b/changes/10276.feature.md index ef21cc97956..9fc16b2c1b1 100644 --- a/changes/10276.feature.md +++ b/changes/10276.feature.md @@ -1 +1 @@ -Move `sub_step` transitions to coordinator with rewind support \ No newline at end of file +Consolidate deploying handlers and remove unused sub-steps