Skip to content
Open
1 change: 1 addition & 0 deletions changes/10728.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Register appproxy endpoints for deployments entering DEPLOYING via ActivateRevision
2 changes: 2 additions & 0 deletions src/ai/backend/manager/sokovan/deployment/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ def _init_handlers(
route_controller=self._route_controller,
evaluator=evaluator,
applier=applier,
deployment_executor=executor,
deployment_repo=self._deployment_repository,
),
),
(
Expand Down
5 changes: 3 additions & 2 deletions src/ai/backend/manager/sokovan/deployment/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def __init__(
async def check_pending_deployments(
self, deployments: Sequence[DeploymentWithHistory]
) -> DeploymentExecutionResult:
"""Register endpoints in appproxy for deployments that need it."""
# Phase 1: Load configuration
with DeploymentRecorderContext.shared_phase("load_configuration"):
with DeploymentRecorderContext.shared_step("load_proxy_targets"):
Expand Down Expand Up @@ -132,7 +133,7 @@ async def check_pending_deployments(
skipped_deployments.append(deployment)
continue
registration_tasks.append(
self._register_endpoint(info, targets, info.current_revision_id)
self.register_endpoint(info, targets, info.current_revision_id)
)
valid_deployments.append(deployment)

Expand Down Expand Up @@ -429,7 +430,7 @@ async def destroy_deployment(

# Private helper methods

async def _register_endpoint(
async def register_endpoint(
self,
deployment: DeploymentInfo,
scaling_group_target: ScalingGroupProxyTarget,
Expand Down
98 changes: 96 additions & 2 deletions src/ai/backend/manager/sokovan/deployment/handlers/deploying.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@

from __future__ import annotations

import asyncio
import dataclasses
import logging
from collections.abc import Sequence
from typing import override
from collections.abc import Coroutine, Sequence
from typing import Any, override
from uuid import UUID

from ai.backend.logging import BraceStyleAdapter
from ai.backend.manager.data.deployment.types import (
Expand All @@ -39,6 +42,7 @@
from ai.backend.manager.defs import LockID
from ai.backend.manager.repositories.deployment.repository import DeploymentRepository
from ai.backend.manager.sokovan.deployment.deployment_controller import DeploymentController
from ai.backend.manager.sokovan.deployment.executor import DeploymentExecutor
from ai.backend.manager.sokovan.deployment.route.route_controller import RouteController
from ai.backend.manager.sokovan.deployment.route.types import RouteLifecycleType
from ai.backend.manager.sokovan.deployment.strategy.applier import (
Expand All @@ -59,6 +63,12 @@
log = BraceStyleAdapter(logging.getLogger(__name__))


@dataclasses.dataclass(frozen=True)
class _EndpointRegistrationBatch:
deployments: list[DeploymentWithHistory]
coroutines: list[Coroutine[Any, Any, str]]


# ---------------------------------------------------------------------------
# DEPLOYING sub-step handlers
# ---------------------------------------------------------------------------
Expand All @@ -83,11 +93,15 @@ def __init__(
route_controller: RouteController,
evaluator: DeploymentStrategyEvaluator,
applier: StrategyResultApplier,
deployment_executor: DeploymentExecutor,
deployment_repo: DeploymentRepository,
) -> None:
self._deployment_controller = deployment_controller
self._route_controller = route_controller
self._evaluator = evaluator
self._applier = applier
self._deployment_executor = deployment_executor
self._deployment_repo = deployment_repo

@classmethod
@override
Expand Down Expand Up @@ -127,10 +141,90 @@ def status_transitions(cls) -> DeploymentStatusTransitions:
),
)

async def _ensure_endpoints_registered(
self, deployments: Sequence[DeploymentWithHistory]
) -> None:
"""Register endpoints in appproxy for deployments that have no URL yet.

Deployments that entered DEPLOYING via ActivateRevision skip
check_pending (which normally registers them), so this method
ensures they are registered before route provisioning begins.
"""
unregistered = [
d
for d in deployments
if not d.deployment_info.network.url and d.deployment_info.deploying_revision_id
]
if not unregistered:
return

batch = await self._build_registration_batch(unregistered)
if not batch.coroutines:
return

url_updates = await self._execute_registration_batch(batch)
if url_updates:
await self._deployment_repo.update_endpoint_urls_bulk(url_updates)

async def _build_registration_batch(
self, deployments: Sequence[DeploymentWithHistory]
) -> _EndpointRegistrationBatch:
"""Build registration coroutines for deployments with valid proxy targets."""
scaling_groups = {d.deployment_info.metadata.resource_group for d in deployments}
scaling_group_targets = await self._deployment_repo.fetch_scaling_group_proxy_targets(
scaling_groups
)

valid_deployments: list[DeploymentWithHistory] = []
coroutines: list[Coroutine[Any, Any, str]] = []
for deployment in deployments:
info = deployment.deployment_info
target = scaling_group_targets.get(info.metadata.resource_group)
if not target:
log.warning(
"No proxy target for scaling group {}, skipping endpoint registration for {}",
info.metadata.resource_group,
info.id,
)
continue
if info.deploying_revision_id is None:
# May have been cleared between handler start and registration.
continue
coroutines.append(
self._deployment_executor.register_endpoint(
info, target, info.deploying_revision_id
)
)
valid_deployments.append(deployment)
return _EndpointRegistrationBatch(valid_deployments, coroutines)

@staticmethod
async def _execute_registration_batch(
batch: _EndpointRegistrationBatch,
) -> dict[UUID, str]:
"""Execute registration batch and collect successful URL updates."""
results = await asyncio.gather(*batch.coroutines, return_exceptions=True)

url_updates: dict[UUID, str] = {}
for deployment, result in zip(batch.deployments, results, strict=True):
if isinstance(result, BaseException):
log.error(
"Failed to register endpoint for deployment {}: {}",
deployment.deployment_info.id,
result,
)
else:
url_updates[deployment.deployment_info.id] = result
return url_updates

@override
async def execute(
self, deployments: Sequence[DeploymentWithHistory]
) -> DeploymentExecutionResult:
# Register endpoints in appproxy for deployments that entered DEPLOYING
# via ActivateRevision without passing through check_pending.
await self._ensure_endpoints_registered(deployments)

deployment_infos = [d.deployment_info for d in deployments]
deployment_map = {d.deployment_info.id: d for d in deployments}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ async def test_successful_endpoint_registration(
proxy_targets_by_scaling_group
)

# Mock _register_endpoint via patching
# Mock register_endpoint via patching
expected_url = "http://endpoint.test/v1"
with patch.object(
deployment_executor, "_register_endpoint", return_value=expected_url
deployment_executor, "register_endpoint", return_value=expected_url
) as mock_register:
entity_ids = [pending_deployment.deployment_info.id]
with DeploymentRecorderContext.scope("test", entity_ids=entity_ids):
Expand Down Expand Up @@ -91,7 +91,7 @@ async def test_deployment_without_revision_is_skipped(
)

expected_url = "http://endpoint.test/v1"
with patch.object(deployment_executor, "_register_endpoint", return_value=expected_url):
with patch.object(deployment_executor, "register_endpoint", return_value=expected_url):
entity_ids = [pending_deployment_no_revision.deployment_info.id]
with DeploymentRecorderContext.scope("test", entity_ids=entity_ids):
# Act
Expand Down Expand Up @@ -151,7 +151,7 @@ async def test_endpoint_registration_failure_captured(

with patch.object(
deployment_executor,
"_register_endpoint",
"register_endpoint",
side_effect=RuntimeError("Registration failed"),
):
entity_ids = [pending_deployment.deployment_info.id]
Expand Down
1 change: 1 addition & 0 deletions tests/unit/manager/sokovan/deployment/handlers/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
python_tests(name="tests")
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""Regression test for BA-5557.

A deployment created without a revision skips check_pending (which normally
registers the appproxy endpoint). When ActivateRevision later sets
deploying_revision_id and transitions the deployment to DEPLOYING,
execute() must register the endpoint before route provisioning begins.
"""

from __future__ import annotations

from datetime import datetime
from unittest.mock import AsyncMock, MagicMock
from uuid import uuid4

import pytest
from dateutil.tz import tzutc

from ai.backend.common.data.endpoint.types import EndpointLifecycle
from ai.backend.common.types import RuntimeVariant
from ai.backend.manager.data.deployment.types import (
DeploymentInfo,
DeploymentLifecycleSubStep,
DeploymentMetadata,
DeploymentNetworkSpec,
DeploymentState,
ReplicaSpec,
)
from ai.backend.manager.data.resource.types import ScalingGroupProxyTarget
from ai.backend.manager.sokovan.deployment.handlers.deploying import (
DeployingProvisioningHandler,
)
from ai.backend.manager.sokovan.deployment.strategy.applier import StrategyApplyResult
from ai.backend.manager.sokovan.deployment.strategy.types import StrategyEvaluationSummary
from ai.backend.manager.sokovan.deployment.types import DeploymentWithHistory


class TestDeployingProvisioningHandler:
"""Tests for DeployingProvisioningHandler."""

@pytest.fixture
def mock_deployment_repo(self) -> AsyncMock:
repo = AsyncMock()
repo.fetch_scaling_group_proxy_targets = AsyncMock(return_value={})
repo.update_endpoint_urls_bulk = AsyncMock(return_value=None)
return repo

@pytest.fixture
def mock_deployment_executor(self) -> AsyncMock:
executor = AsyncMock()
mock_revision_spec = MagicMock()
mock_revision_spec.execution.runtime_variant = RuntimeVariant.CUSTOM
executor.register_endpoint = AsyncMock(return_value="http://endpoint.test/v1")
return executor

@pytest.fixture
def mock_evaluator(self) -> AsyncMock:
evaluator = AsyncMock()
evaluator.evaluate.return_value = StrategyEvaluationSummary()
return evaluator

@pytest.fixture
def mock_applier(self) -> AsyncMock:
applier = AsyncMock()
applier.apply.return_value = StrategyApplyResult()
return applier

@pytest.fixture
def handler(
self,
mock_deployment_executor: AsyncMock,
mock_deployment_repo: AsyncMock,
mock_evaluator: AsyncMock,
mock_applier: AsyncMock,
) -> DeployingProvisioningHandler:
return DeployingProvisioningHandler(
deployment_controller=AsyncMock(),
route_controller=AsyncMock(),
evaluator=mock_evaluator,
applier=mock_applier,
deployment_executor=mock_deployment_executor,
deployment_repo=mock_deployment_repo,
)

@pytest.fixture
def proxy_target(self) -> ScalingGroupProxyTarget:
return ScalingGroupProxyTarget(
addr="http://proxy:8080",
api_token="test-token",
)

@pytest.fixture
def deployment_created_without_revision(self) -> DeploymentWithHistory:
"""Deployment created without a revision, then ActivateRevision'd into DEPLOYING.

current_revision_id is None (no initial revision), deploying_revision_id is set
(ActivateRevision assigned it), and url is None (check_pending was skipped).
"""
deploying_rev_id = uuid4()
revision = MagicMock()
revision.revision_id = deploying_rev_id

return DeploymentWithHistory(
deployment_info=DeploymentInfo(
id=uuid4(),
metadata=DeploymentMetadata(
name="test-deployment",
domain="default",
project=uuid4(),
resource_group="default",
created_user=uuid4(),
session_owner=uuid4(),
created_at=datetime.now(tzutc()),
revision_history_limit=10,
),
state=DeploymentState(
lifecycle=EndpointLifecycle.DEPLOYING,
retry_count=0,
),
replica_spec=ReplicaSpec(
replica_count=1,
desired_replica_count=1,
),
network=DeploymentNetworkSpec(
open_to_public=False,
url=None,
),
model_revisions=[revision],
current_revision_id=None,
deploying_revision_id=deploying_rev_id,
sub_step=DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING,
),
)

async def test_registers_endpoint_for_deployment_created_without_revision(
self,
handler: DeployingProvisioningHandler,
mock_deployment_repo: AsyncMock,
mock_deployment_executor: AsyncMock,
deployment_created_without_revision: DeploymentWithHistory,
proxy_target: ScalingGroupProxyTarget,
) -> None:
"""BA-5557: execute() registers appproxy endpoint for a deployment that
was created without a revision and later ActivateRevision'd into DEPLOYING."""
mock_deployment_repo.fetch_scaling_group_proxy_targets.return_value = {
"default": proxy_target,
}

await handler.execute([deployment_created_without_revision])

info = deployment_created_without_revision.deployment_info
mock_deployment_executor.register_endpoint.assert_awaited_once_with(
info, proxy_target, info.deploying_revision_id
)
mock_deployment_repo.update_endpoint_urls_bulk.assert_awaited_once()
Loading