diff --git a/changes/10728.fix.md b/changes/10728.fix.md new file mode 100644 index 00000000000..eca24f1bcb1 --- /dev/null +++ b/changes/10728.fix.md @@ -0,0 +1 @@ +Register appproxy endpoints for deployments entering DEPLOYING via ActivateRevision diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index f6456123c07..8f10ea504fd 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -307,6 +307,8 @@ def _init_handlers( route_controller=self._route_controller, evaluator=evaluator, applier=applier, + deployment_executor=executor, + deployment_repo=self._deployment_repository, ), ), ( diff --git a/src/ai/backend/manager/sokovan/deployment/executor.py b/src/ai/backend/manager/sokovan/deployment/executor.py index 8ac492ab14b..956fb8cd97c 100644 --- a/src/ai/backend/manager/sokovan/deployment/executor.py +++ b/src/ai/backend/manager/sokovan/deployment/executor.py @@ -103,6 +103,7 @@ def __init__( async def check_pending_deployments( self, deployments: Sequence[DeploymentWithHistory] ) -> DeploymentExecutionResult: + """Register endpoints in appproxy for deployments that need it.""" # Phase 1: Load configuration with DeploymentRecorderContext.shared_phase("load_configuration"): with DeploymentRecorderContext.shared_step("load_proxy_targets"): @@ -132,7 +133,7 @@ async def check_pending_deployments( skipped_deployments.append(deployment) continue registration_tasks.append( - self._register_endpoint(info, targets, info.current_revision_id) + self.register_endpoint(info, targets, info.current_revision_id) ) valid_deployments.append(deployment) @@ -429,7 +430,7 @@ async def destroy_deployment( # Private helper methods - async def _register_endpoint( + async def register_endpoint( self, deployment: DeploymentInfo, scaling_group_target: ScalingGroupProxyTarget, diff --git a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py index ad7e04e631b..32c25e02587 100644 --- a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py +++ b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py @@ -25,9 +25,12 @@ from __future__ import annotations +import asyncio +import dataclasses import logging -from collections.abc import Sequence -from typing import override +from collections.abc import Coroutine, Sequence +from typing import Any, override +from uuid import UUID from ai.backend.logging import BraceStyleAdapter from ai.backend.manager.data.deployment.types import ( @@ -39,6 +42,7 @@ from ai.backend.manager.defs import LockID from ai.backend.manager.repositories.deployment.repository import DeploymentRepository from ai.backend.manager.sokovan.deployment.deployment_controller import DeploymentController +from ai.backend.manager.sokovan.deployment.executor import DeploymentExecutor from ai.backend.manager.sokovan.deployment.route.route_controller import RouteController from ai.backend.manager.sokovan.deployment.route.types import RouteLifecycleType from ai.backend.manager.sokovan.deployment.strategy.applier import ( @@ -59,6 +63,12 @@ log = BraceStyleAdapter(logging.getLogger(__name__)) +@dataclasses.dataclass(frozen=True) +class _EndpointRegistrationBatch: + deployments: list[DeploymentWithHistory] + coroutines: list[Coroutine[Any, Any, str]] + + # --------------------------------------------------------------------------- # DEPLOYING sub-step handlers # --------------------------------------------------------------------------- @@ -83,11 +93,15 @@ def __init__( route_controller: RouteController, evaluator: DeploymentStrategyEvaluator, applier: StrategyResultApplier, + deployment_executor: DeploymentExecutor, + deployment_repo: DeploymentRepository, ) -> None: self._deployment_controller = deployment_controller self._route_controller = route_controller self._evaluator = evaluator self._applier = applier + self._deployment_executor = deployment_executor + self._deployment_repo = deployment_repo @classmethod @override @@ -127,10 +141,90 @@ def status_transitions(cls) -> DeploymentStatusTransitions: ), ) + async def _ensure_endpoints_registered( + self, deployments: Sequence[DeploymentWithHistory] + ) -> None: + """Register endpoints in appproxy for deployments that have no URL yet. + + Deployments that entered DEPLOYING via ActivateRevision skip + check_pending (which normally registers them), so this method + ensures they are registered before route provisioning begins. + """ + unregistered = [ + d + for d in deployments + if not d.deployment_info.network.url and d.deployment_info.deploying_revision_id + ] + if not unregistered: + return + + batch = await self._build_registration_batch(unregistered) + if not batch.coroutines: + return + + url_updates = await self._execute_registration_batch(batch) + if url_updates: + await self._deployment_repo.update_endpoint_urls_bulk(url_updates) + + async def _build_registration_batch( + self, deployments: Sequence[DeploymentWithHistory] + ) -> _EndpointRegistrationBatch: + """Build registration coroutines for deployments with valid proxy targets.""" + scaling_groups = {d.deployment_info.metadata.resource_group for d in deployments} + scaling_group_targets = await self._deployment_repo.fetch_scaling_group_proxy_targets( + scaling_groups + ) + + valid_deployments: list[DeploymentWithHistory] = [] + coroutines: list[Coroutine[Any, Any, str]] = [] + for deployment in deployments: + info = deployment.deployment_info + target = scaling_group_targets.get(info.metadata.resource_group) + if not target: + log.warning( + "No proxy target for scaling group {}, skipping endpoint registration for {}", + info.metadata.resource_group, + info.id, + ) + continue + if info.deploying_revision_id is None: + # May have been cleared between handler start and registration. + continue + coroutines.append( + self._deployment_executor.register_endpoint( + info, target, info.deploying_revision_id + ) + ) + valid_deployments.append(deployment) + return _EndpointRegistrationBatch(valid_deployments, coroutines) + + @staticmethod + async def _execute_registration_batch( + batch: _EndpointRegistrationBatch, + ) -> dict[UUID, str]: + """Execute registration batch and collect successful URL updates.""" + results = await asyncio.gather(*batch.coroutines, return_exceptions=True) + + url_updates: dict[UUID, str] = {} + for deployment, result in zip(batch.deployments, results, strict=True): + if isinstance(result, BaseException): + log.error( + "Failed to register endpoint for deployment {}: {}", + deployment.deployment_info.id, + result, + ) + else: + url_updates[deployment.deployment_info.id] = result + return url_updates + @override async def execute( self, deployments: Sequence[DeploymentWithHistory] ) -> DeploymentExecutionResult: + # Register endpoints in appproxy for deployments that entered DEPLOYING + # via ActivateRevision without passing through check_pending. + await self._ensure_endpoints_registered(deployments) + deployment_infos = [d.deployment_info for d in deployments] deployment_map = {d.deployment_info.id: d for d in deployments} diff --git a/tests/unit/manager/sokovan/deployment/executor/test_deployment_executor.py b/tests/unit/manager/sokovan/deployment/executor/test_deployment_executor.py index 9cf226a07ca..a6b29709df0 100644 --- a/tests/unit/manager/sokovan/deployment/executor/test_deployment_executor.py +++ b/tests/unit/manager/sokovan/deployment/executor/test_deployment_executor.py @@ -50,10 +50,10 @@ async def test_successful_endpoint_registration( proxy_targets_by_scaling_group ) - # Mock _register_endpoint via patching + # Mock register_endpoint via patching expected_url = "http://endpoint.test/v1" with patch.object( - deployment_executor, "_register_endpoint", return_value=expected_url + deployment_executor, "register_endpoint", return_value=expected_url ) as mock_register: entity_ids = [pending_deployment.deployment_info.id] with DeploymentRecorderContext.scope("test", entity_ids=entity_ids): @@ -91,7 +91,7 @@ async def test_deployment_without_revision_is_skipped( ) expected_url = "http://endpoint.test/v1" - with patch.object(deployment_executor, "_register_endpoint", return_value=expected_url): + with patch.object(deployment_executor, "register_endpoint", return_value=expected_url): entity_ids = [pending_deployment_no_revision.deployment_info.id] with DeploymentRecorderContext.scope("test", entity_ids=entity_ids): # Act @@ -151,7 +151,7 @@ async def test_endpoint_registration_failure_captured( with patch.object( deployment_executor, - "_register_endpoint", + "register_endpoint", side_effect=RuntimeError("Registration failed"), ): entity_ids = [pending_deployment.deployment_info.id] diff --git a/tests/unit/manager/sokovan/deployment/handlers/BUILD b/tests/unit/manager/sokovan/deployment/handlers/BUILD new file mode 100644 index 00000000000..75b8f46de9b --- /dev/null +++ b/tests/unit/manager/sokovan/deployment/handlers/BUILD @@ -0,0 +1 @@ +python_tests(name="tests") diff --git a/tests/unit/manager/sokovan/deployment/handlers/__init__.py b/tests/unit/manager/sokovan/deployment/handlers/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/unit/manager/sokovan/deployment/handlers/test_deploying_handler.py b/tests/unit/manager/sokovan/deployment/handlers/test_deploying_handler.py new file mode 100644 index 00000000000..9d64020f1f3 --- /dev/null +++ b/tests/unit/manager/sokovan/deployment/handlers/test_deploying_handler.py @@ -0,0 +1,154 @@ +"""Regression test for BA-5557. + +A deployment created without a revision skips check_pending (which normally +registers the appproxy endpoint). When ActivateRevision later sets +deploying_revision_id and transitions the deployment to DEPLOYING, +execute() must register the endpoint before route provisioning begins. +""" + +from __future__ import annotations + +from datetime import datetime +from unittest.mock import AsyncMock, MagicMock +from uuid import uuid4 + +import pytest +from dateutil.tz import tzutc + +from ai.backend.common.data.endpoint.types import EndpointLifecycle +from ai.backend.common.types import RuntimeVariant +from ai.backend.manager.data.deployment.types import ( + DeploymentInfo, + DeploymentLifecycleSubStep, + DeploymentMetadata, + DeploymentNetworkSpec, + DeploymentState, + ReplicaSpec, +) +from ai.backend.manager.data.resource.types import ScalingGroupProxyTarget +from ai.backend.manager.sokovan.deployment.handlers.deploying import ( + DeployingProvisioningHandler, +) +from ai.backend.manager.sokovan.deployment.strategy.applier import StrategyApplyResult +from ai.backend.manager.sokovan.deployment.strategy.types import StrategyEvaluationSummary +from ai.backend.manager.sokovan.deployment.types import DeploymentWithHistory + + +class TestDeployingProvisioningHandler: + """Tests for DeployingProvisioningHandler.""" + + @pytest.fixture + def mock_deployment_repo(self) -> AsyncMock: + repo = AsyncMock() + repo.fetch_scaling_group_proxy_targets = AsyncMock(return_value={}) + repo.update_endpoint_urls_bulk = AsyncMock(return_value=None) + return repo + + @pytest.fixture + def mock_deployment_executor(self) -> AsyncMock: + executor = AsyncMock() + mock_revision_spec = MagicMock() + mock_revision_spec.execution.runtime_variant = RuntimeVariant.CUSTOM + executor.register_endpoint = AsyncMock(return_value="http://endpoint.test/v1") + return executor + + @pytest.fixture + def mock_evaluator(self) -> AsyncMock: + evaluator = AsyncMock() + evaluator.evaluate.return_value = StrategyEvaluationSummary() + return evaluator + + @pytest.fixture + def mock_applier(self) -> AsyncMock: + applier = AsyncMock() + applier.apply.return_value = StrategyApplyResult() + return applier + + @pytest.fixture + def handler( + self, + mock_deployment_executor: AsyncMock, + mock_deployment_repo: AsyncMock, + mock_evaluator: AsyncMock, + mock_applier: AsyncMock, + ) -> DeployingProvisioningHandler: + return DeployingProvisioningHandler( + deployment_controller=AsyncMock(), + route_controller=AsyncMock(), + evaluator=mock_evaluator, + applier=mock_applier, + deployment_executor=mock_deployment_executor, + deployment_repo=mock_deployment_repo, + ) + + @pytest.fixture + def proxy_target(self) -> ScalingGroupProxyTarget: + return ScalingGroupProxyTarget( + addr="http://proxy:8080", + api_token="test-token", + ) + + @pytest.fixture + def deployment_created_without_revision(self) -> DeploymentWithHistory: + """Deployment created without a revision, then ActivateRevision'd into DEPLOYING. + + current_revision_id is None (no initial revision), deploying_revision_id is set + (ActivateRevision assigned it), and url is None (check_pending was skipped). + """ + deploying_rev_id = uuid4() + revision = MagicMock() + revision.revision_id = deploying_rev_id + + return DeploymentWithHistory( + deployment_info=DeploymentInfo( + id=uuid4(), + metadata=DeploymentMetadata( + name="test-deployment", + domain="default", + project=uuid4(), + resource_group="default", + created_user=uuid4(), + session_owner=uuid4(), + created_at=datetime.now(tzutc()), + revision_history_limit=10, + ), + state=DeploymentState( + lifecycle=EndpointLifecycle.DEPLOYING, + retry_count=0, + ), + replica_spec=ReplicaSpec( + replica_count=1, + desired_replica_count=1, + ), + network=DeploymentNetworkSpec( + open_to_public=False, + url=None, + ), + model_revisions=[revision], + current_revision_id=None, + deploying_revision_id=deploying_rev_id, + sub_step=DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING, + ), + ) + + async def test_registers_endpoint_for_deployment_created_without_revision( + self, + handler: DeployingProvisioningHandler, + mock_deployment_repo: AsyncMock, + mock_deployment_executor: AsyncMock, + deployment_created_without_revision: DeploymentWithHistory, + proxy_target: ScalingGroupProxyTarget, + ) -> None: + """BA-5557: execute() registers appproxy endpoint for a deployment that + was created without a revision and later ActivateRevision'd into DEPLOYING.""" + mock_deployment_repo.fetch_scaling_group_proxy_targets.return_value = { + "default": proxy_target, + } + + await handler.execute([deployment_created_without_revision]) + + info = deployment_created_without_revision.deployment_info + mock_deployment_executor.register_endpoint.assert_awaited_once_with( + info, proxy_target, info.deploying_revision_id + ) + mock_deployment_repo.update_endpoint_urls_bulk.assert_awaited_once()