-
Notifications
You must be signed in to change notification settings - Fork 172
Expand file tree
/
Copy pathexecutor.py
More file actions
690 lines (618 loc) · 28 KB
/
executor.py
File metadata and controls
690 lines (618 loc) · 28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
"""Deployment execution logic."""
import asyncio
import logging
from collections.abc import Coroutine, Mapping, Sequence
from typing import Any, cast
from uuid import UUID
from ai.backend.common.clients.http_client.client_pool import (
ClientKey,
ClientPool,
)
from ai.backend.common.clients.valkey_client.valkey_stat.client import ValkeyStatClient
from ai.backend.common.config import ModelHealthCheck
from ai.backend.common.data.permission.types import RBACElementType
from ai.backend.common.exception import BackendAIError
from ai.backend.common.types import (
RuntimeVariant,
)
from ai.backend.logging import BraceStyleAdapter
from ai.backend.manager.clients.appproxy.client import AppProxyClient
from ai.backend.manager.clients.appproxy.types import (
CreateEndpointRequestBody,
EndpointTagsModel,
SessionTagsModel,
TagsModel,
)
from ai.backend.manager.config.provider import ManagerConfigProvider
from ai.backend.manager.data.deployment.scale import AutoScalingRule
from ai.backend.manager.data.deployment.types import (
DeploymentInfo,
RouteInfo,
RouteStatus,
RouteTrafficStatus,
)
from ai.backend.manager.data.permission.types import RBACElementRef
from ai.backend.manager.data.resource.types import ScalingGroupProxyTarget
from ai.backend.manager.errors.deployment import ReplicaCountMismatch
from ai.backend.manager.models.routing import RoutingRow
from ai.backend.manager.models.routing.conditions import RouteConditions
from ai.backend.manager.repositories.base.rbac.entity_creator import RBACEntityCreator
from ai.backend.manager.repositories.base.updater import BatchUpdater
from ai.backend.manager.repositories.deployment.creators import (
RouteBatchUpdaterSpec,
RouteCreatorSpec,
)
from ai.backend.manager.repositories.deployment.repository import (
AutoScalingMetricsData,
DeploymentRepository,
)
from ai.backend.manager.sokovan.deployment.recorder.context import DeploymentRecorderContext
from ai.backend.manager.sokovan.scheduling_controller import SchedulingController
from .types import (
DeploymentExecutionError,
DeploymentExecutionResult,
DeploymentWithHistory,
)
log = BraceStyleAdapter(logging.getLogger(__name__))
REGISTER_ENDPOINT_TIMEOUT_SEC = 30
def _extract_error_code(exception: BaseException) -> str | None:
"""Extract error code from exception if available.
Args:
exception: The exception to extract error code from.
Returns:
Error code string if exception is BackendAIError, None otherwise.
"""
if isinstance(exception, BackendAIError):
return str(exception.error_code())
return None
class DeploymentExecutor:
"""Executor for deployment operations."""
_deployment_repo: DeploymentRepository
_scheduling_controller: SchedulingController
_config_provider: ManagerConfigProvider
_client_pool: ClientPool
_valkey_stat: ValkeyStatClient
def __init__(
self,
deployment_repo: DeploymentRepository,
scheduling_controller: SchedulingController,
config_provider: ManagerConfigProvider,
client_pool: ClientPool,
valkey_stat: ValkeyStatClient,
) -> None:
"""Initialize the deployment executor."""
self._deployment_repo = deployment_repo
self._scheduling_controller = scheduling_controller
self._config_provider = config_provider
self._client_pool = client_pool
self._valkey_stat = valkey_stat
async def check_pending_deployments(
self, deployments: Sequence[DeploymentWithHistory]
) -> DeploymentExecutionResult:
"""Register endpoints in appproxy for deployments that need it."""
# Phase 1: Load configuration
with DeploymentRecorderContext.shared_phase("load_configuration"):
with DeploymentRecorderContext.shared_step("load_proxy_targets"):
scaling_groups = {
dep.deployment_info.metadata.resource_group for dep in deployments
}
scaling_group_targets = (
await self._deployment_repo.fetch_scaling_group_proxy_targets(scaling_groups)
)
# Collect registration tasks
registration_tasks: list[Coroutine[Any, Any, str]] = []
valid_deployments: list[DeploymentWithHistory] = []
skipped_deployments: list[DeploymentWithHistory] = []
for deployment in deployments:
info = deployment.deployment_info
targets = scaling_group_targets[info.metadata.resource_group]
if not targets:
log.warning(
"No proxy target found for scaling group {}, skipping deployment {}",
info.metadata.resource_group,
info.id,
)
skipped_deployments.append(deployment)
continue
if info.current_revision_id is None:
skipped_deployments.append(deployment)
continue
registration_tasks.append(
self.register_endpoint(info, targets, info.current_revision_id)
)
valid_deployments.append(deployment)
# Wait for all tasks to complete
successful_deployments: list[DeploymentWithHistory] = []
errors: list[DeploymentExecutionError] = []
url_updates: dict[UUID, str] = {}
# Phase 2: Register endpoints (per-deployment phase/step in _register_endpoint)
if registration_tasks:
results = await asyncio.gather(*registration_tasks, return_exceptions=True)
for deployment, result in zip(valid_deployments, results, strict=True):
dep_id = deployment.deployment_info.id
if isinstance(result, BaseException):
log.error(
"Failed to register endpoint for deployment {}: {}",
dep_id,
result,
)
errors.append(
DeploymentExecutionError(
deployment_info=deployment,
reason=str(result),
error_detail="Failed to register endpoint",
error_code=_extract_error_code(result),
)
)
else:
# Result is the endpoint URL string returned from _register_endpoint
url_updates[dep_id] = result
successful_deployments.append(deployment)
log.info(
"Successfully registered endpoint for deployment {} with URL: {}",
dep_id,
result,
)
# Phase 3: Update endpoint URLs (only for successful deployments)
if url_updates:
with DeploymentRecorderContext.shared_phase(
"update_endpoint_urls", entity_ids=set(url_updates.keys())
):
with DeploymentRecorderContext.shared_step("sync_endpoint_url"):
await self._deployment_repo.update_endpoint_urls_bulk(url_updates)
return DeploymentExecutionResult(
successes=successful_deployments,
failures=errors,
skipped=skipped_deployments,
)
async def check_ready_deployments_that_need_scaling(
self, deployments: Sequence[DeploymentWithHistory]
) -> DeploymentExecutionResult:
# Phase 1: Load routes
with DeploymentRecorderContext.shared_phase("load_routes"):
with DeploymentRecorderContext.shared_step("load_active_routes"):
endpoint_ids = {dep.deployment_info.id for dep in deployments}
route_map = await self._deployment_repo.fetch_active_routes_by_endpoint_ids(
endpoint_ids
)
successes: list[DeploymentWithHistory] = []
errors: list[DeploymentExecutionError] = []
# Phase 2: Verify replicas (per-deployment)
for deployment in deployments:
try:
self._verify_deployment_replicas(deployment.deployment_info, route_map)
successes.append(deployment)
except ReplicaCountMismatch as e:
log.warning(
"Deployment {} has mismatched active routes: {}",
deployment.deployment_info.id,
e,
)
errors.append(
DeploymentExecutionError(
deployment_info=deployment,
reason="Mismatched active routes",
error_detail=str(e),
error_code=_extract_error_code(e),
)
)
return DeploymentExecutionResult(
successes=successes,
failures=errors,
)
async def scale_deployment(
self, deployments: Sequence[DeploymentWithHistory]
) -> DeploymentExecutionResult:
# Phase 1: Load routes
with DeploymentRecorderContext.shared_phase("load_routes"):
with DeploymentRecorderContext.shared_step("load_active_routes"):
endpoint_ids = {dep.deployment_info.id for dep in deployments}
route_map = await self._deployment_repo.fetch_active_routes_by_endpoint_ids(
endpoint_ids
)
scale_out_creators: list[RBACEntityCreator[RoutingRow]] = []
scale_in_route_ids: list[UUID] = []
successes: list[DeploymentWithHistory] = []
skipped: list[DeploymentWithHistory] = []
errors: list[DeploymentExecutionError] = []
# Phase 2: Evaluate scaling (per-deployment)
for deployment in deployments:
info = deployment.deployment_info
if info.current_revision_id is None:
skipped.append(deployment)
continue
try:
out_creators, in_route_ids = self._evaluate_deployment_scaling(
info, route_map, info.current_revision_id
)
if out_creators or in_route_ids:
scale_out_creators.extend(out_creators)
scale_in_route_ids.extend(in_route_ids)
successes.append(deployment)
else:
# No scaling action needed
skipped.append(deployment)
except Exception as e:
log.warning("Failed to scale deployment {}: {}", deployment.deployment_info.id, e)
errors.append(
DeploymentExecutionError(
deployment_info=deployment,
reason=str(e),
error_detail="Failed to scale deployment",
error_code=_extract_error_code(e),
)
)
# Build BatchUpdater for scale in
scale_in_updater: BatchUpdater[RoutingRow] | None = None
if scale_in_route_ids:
scale_in_updater = BatchUpdater(
spec=RouteBatchUpdaterSpec(
status=RouteStatus.TERMINATING,
traffic_ratio=0.0,
traffic_status=RouteTrafficStatus.INACTIVE,
),
conditions=[RouteConditions.by_ids(scale_in_route_ids)],
)
# Phase 3: Apply scaling (only for successful deployments)
if scale_out_creators or scale_in_updater:
with DeploymentRecorderContext.shared_phase(
"apply_scaling", entity_ids={dep.deployment_info.id for dep in successes}
):
with DeploymentRecorderContext.shared_step("scale_routes"):
await self._deployment_repo.scale_routes(scale_out_creators, scale_in_updater)
return DeploymentExecutionResult(
successes=successes,
skipped=skipped,
failures=errors,
)
async def calculate_desired_replicas(
self, deployments: Sequence[DeploymentWithHistory]
) -> DeploymentExecutionResult:
# Phase 1: Load autoscaling configuration
with DeploymentRecorderContext.shared_phase("load_autoscaling_config"):
with DeploymentRecorderContext.shared_step("load_autoscaling_rules"):
endpoint_ids = {dep.deployment_info.id for dep in deployments}
auto_scaling_rules = (
await self._deployment_repo.fetch_auto_scaling_rules_by_endpoint_ids(
endpoint_ids
)
)
with DeploymentRecorderContext.shared_step("load_metrics"):
# Fetch all metrics data upfront
deployment_infos = [dep.deployment_info for dep in deployments]
metrics_data = await self._deployment_repo.fetch_metrics_for_autoscaling(
deployment_infos, auto_scaling_rules
)
successes: list[DeploymentWithHistory] = []
skipped: list[DeploymentWithHistory] = []
errors: list[DeploymentExecutionError] = []
desired_replicas_map: dict[UUID, int] = {}
# Phase 2: Calculate replicas (per-deployment via asyncio.gather)
calculation_tasks = [
self._calculate_deployment_replicas(
deployment.deployment_info, auto_scaling_rules, metrics_data
)
for deployment in deployments
]
results = await asyncio.gather(*calculation_tasks, return_exceptions=True)
for deployment, result in zip(deployments, results, strict=True):
dep_id = deployment.deployment_info.id
if isinstance(result, BaseException):
log.warning(
"Failed to calculate desired replicas for deployment {}: {}",
dep_id,
result,
)
errors.append(
DeploymentExecutionError(
deployment_info=deployment,
reason=str(result),
error_detail="Failed to calculate desired replicas",
error_code=_extract_error_code(result),
)
)
elif result is None:
skipped.append(deployment)
else:
desired_replicas_map[dep_id] = result
successes.append(deployment)
# Phase 3: Save scaling decision (only for successful deployments)
if desired_replicas_map:
with DeploymentRecorderContext.shared_phase(
"save_scaling_decision", entity_ids=set(desired_replicas_map.keys())
):
with DeploymentRecorderContext.shared_step("save_target_replicas"):
await self._deployment_repo.update_desired_replicas_bulk(desired_replicas_map)
return DeploymentExecutionResult(
successes=successes,
skipped=skipped,
failures=errors,
)
async def destroy_deployment(
self, deployments: Sequence[DeploymentWithHistory]
) -> DeploymentExecutionResult:
# Phase 1: Load termination configuration
with DeploymentRecorderContext.shared_phase("load_termination_config"):
with DeploymentRecorderContext.shared_step("load_routes"):
endpoint_ids = {dep.deployment_info.id for dep in deployments}
routes = await self._deployment_repo.fetch_active_routes_by_endpoint_ids(
endpoint_ids
)
with DeploymentRecorderContext.shared_step("load_proxy_config"):
scaling_groups = {
dep.deployment_info.metadata.resource_group for dep in deployments
}
proxy_targets = await self._deployment_repo.fetch_scaling_group_proxy_targets(
scaling_groups
)
route_ids: set[UUID] = set()
for route_list in routes.values():
for route in route_list:
route_ids.add(route.route_id)
# Phase 2: Terminate routes
with DeploymentRecorderContext.shared_phase("terminate_routes"):
with DeploymentRecorderContext.shared_step("mark_routes_terminating"):
await self._deployment_repo.mark_terminating_route_status_bulk(route_ids)
successes: list[DeploymentWithHistory] = []
errors: list[DeploymentExecutionError] = []
# Phase 3: Unregister endpoints (per-deployment via asyncio.gather)
unregister_tasks = [
self._unregister_endpoint(deployment.deployment_info, proxy_targets)
for deployment in deployments
]
results = await asyncio.gather(*unregister_tasks, return_exceptions=True)
for deployment, result in zip(deployments, results, strict=True):
if isinstance(result, BaseException):
log.warning(
"Failed to unregister endpoint {}: {}",
deployment.deployment_info.id,
result,
)
errors.append(
DeploymentExecutionError(
deployment_info=deployment,
reason="Failed to unregister endpoint",
error_detail=str(result),
error_code=_extract_error_code(result),
)
)
else:
successes.append(deployment)
return DeploymentExecutionResult(
successes=successes,
failures=errors,
)
# Private helper methods
async def register_endpoint(
self,
deployment: DeploymentInfo,
scaling_group_target: ScalingGroupProxyTarget,
revision_id: UUID,
) -> str:
"""Resolve the target revision's model definition and register the endpoint to the app proxy.
Returns the registered endpoint URL.
"""
pool = DeploymentRecorderContext.current_pool()
recorder = pool.recorder(deployment.id)
with recorder.phase("register_endpoint"):
with recorder.step("check_target_revision"):
target_revision = deployment.resolve_revision_spec(revision_id)
with recorder.step("extract_health_check_config"):
health_check_config = None
if target_revision.model_definition:
health_check_config = target_revision.model_definition.health_check_config()
if not health_check_config:
log.debug(
"No health check configuration found in model definition for deployment {}",
deployment.id,
)
with recorder.step("register_to_proxy"):
return await self._create_endpoint_in_proxy(
endpoint_id=deployment.id,
endpoint_name=deployment.metadata.name,
session_owner_id=deployment.metadata.session_owner,
project_id=deployment.metadata.project,
domain_name=deployment.metadata.domain,
runtime_variant=target_revision.execution.runtime_variant,
existing_url=deployment.network.url,
open_to_public=deployment.network.open_to_public,
health_check_config=health_check_config,
app_proxy_addr=scaling_group_target.addr,
app_proxy_api_token=scaling_group_target.api_token,
)
def _load_app_proxy_client(self, address: str, token: str) -> AppProxyClient:
"""Load or create a App Proxy client for the given address."""
client_session = self._client_pool.load_client_session(
ClientKey(
endpoint=address,
domain="wsproxy",
)
)
return AppProxyClient(client_session, address, token)
async def _create_endpoint_in_proxy(
self,
endpoint_id: UUID,
endpoint_name: str,
session_owner_id: UUID,
project_id: UUID,
domain_name: str,
runtime_variant: RuntimeVariant,
existing_url: str | None,
open_to_public: bool,
health_check_config: ModelHealthCheck | None,
app_proxy_addr: str,
app_proxy_api_token: str,
) -> str:
"""
Create an endpoint in WSProxy service.
Args:
endpoint_id: Endpoint UUID
endpoint_name: Name of the endpoint
session_owner_id: UUID of the session owner
project_id: UUID of the project
domain_name: Domain name
runtime_variant: Runtime variant
existing_url: Existing URL if any
open_to_public: Public accessibility flag
health_check_config: Optional health check configuration
wsproxy_addr: WSProxy service address
wsproxy_api_token: WSProxy API token
Returns:
Response from WSProxy service
"""
app_proxy_client = self._load_app_proxy_client(app_proxy_addr, app_proxy_api_token)
# Create request body using Pydantic model
request_body = CreateEndpointRequestBody(
version="v2",
service_name=endpoint_name,
tags=TagsModel(
session=SessionTagsModel(
user_uuid=str(session_owner_id),
group_id=str(project_id),
domain_name=domain_name,
),
endpoint=EndpointTagsModel(
id=str(endpoint_id),
runtime_variant=runtime_variant.value,
existing_url=str(existing_url) if existing_url else None,
),
),
apps={},
open_to_public=open_to_public,
health_check=health_check_config,
)
res = await app_proxy_client.create_endpoint(endpoint_id, request_body)
return cast(str, res["endpoint"])
async def _delete_endpoint_from_wsproxy(
self,
endpoint_id: UUID,
app_proxy_addr: str,
app_proxy_api_token: str,
) -> None:
"""
Delete an endpoint from WSProxy service.
Args:
endpoint_id: Endpoint UUID to delete
wsproxy_addr: WSProxy service address
wsproxy_api_token: WSProxy API token
"""
app_proxy_client = self._load_app_proxy_client(app_proxy_addr, app_proxy_api_token)
await app_proxy_client.delete_endpoint(endpoint_id)
def _verify_deployment_replicas(
self,
deployment: DeploymentInfo,
route_map: Mapping[UUID, Sequence[RouteInfo]],
) -> None:
"""Verify that deployment has the expected number of active routes."""
pool = DeploymentRecorderContext.current_pool()
recorder = pool.recorder(deployment.id)
with recorder.phase("verify_replicas"):
with recorder.step("compare_route_count"):
routes = route_map[deployment.id]
if len(routes) != deployment.replica_spec.target_replica_count:
raise ReplicaCountMismatch(
expected=deployment.replica_spec.target_replica_count,
actual=len(routes),
)
def _evaluate_deployment_scaling(
self,
deployment: DeploymentInfo,
route_map: Mapping[UUID, Sequence[RouteInfo]],
revision_id: UUID,
) -> tuple[list[RBACEntityCreator[RoutingRow]], list[UUID]]:
"""Evaluate scaling action for a deployment and return creators/route IDs."""
pool = DeploymentRecorderContext.current_pool()
recorder = pool.recorder(deployment.id)
scale_out_creators: list[RBACEntityCreator[RoutingRow]] = []
scale_in_route_ids: list[UUID] = []
with recorder.phase("evaluate_scaling"):
with recorder.step("calculate_scale_action"):
target_count = deployment.replica_spec.target_replica_count
routes = route_map[deployment.id]
if len(routes) < target_count:
# Build creators for scale out
new_replica_count = target_count - len(routes)
for _ in range(new_replica_count):
creator_spec = RouteCreatorSpec(
endpoint_id=deployment.id,
session_owner_id=deployment.metadata.session_owner,
domain=deployment.metadata.domain,
project_id=deployment.metadata.project,
revision_id=revision_id,
)
scale_out_creators.append(
RBACEntityCreator(
spec=creator_spec,
element_type=RBACElementType.ROUTING,
scope_ref=RBACElementRef(
element_type=RBACElementType.MODEL_DEPLOYMENT,
element_id=str(deployment.id),
),
)
)
elif len(routes) > target_count:
termination_route_candidates = sorted(
routes, key=lambda r: r.termination_priority
)
candidates = termination_route_candidates[: len(routes) - target_count]
scale_in_route_ids.extend(r.route_id for r in candidates)
return scale_out_creators, scale_in_route_ids
async def _calculate_deployment_replicas(
self,
deployment: DeploymentInfo,
auto_scaling_rules: Mapping[UUID, Sequence[AutoScalingRule]],
metrics_data: AutoScalingMetricsData,
) -> int | None:
"""Calculate desired replicas for a single deployment.
Returns:
int: new desired replica count
None: no change needed (skip)
Raises exception: on failure
"""
pool = DeploymentRecorderContext.current_pool()
recorder = pool.recorder(deployment.id)
with recorder.phase("calculate_replicas"):
auto_scaling_rule = auto_scaling_rules.get(deployment.id, [])
if not auto_scaling_rule:
with recorder.step("apply_manual_scaling"):
routes = metrics_data.routes_by_endpoint.get(deployment.id, [])
if deployment.replica_spec.replica_count != len(routes):
return deployment.replica_spec.replica_count
return None
with recorder.step("evaluate_autoscaling_rules"):
desired_replica = (
await self._deployment_repo.calculate_desired_replicas_for_deployment(
deployment,
auto_scaling_rule,
metrics_data,
)
)
if desired_replica is None:
log.debug(
"No change in desired replicas for deployment {}, skipping",
deployment.id,
)
return desired_replica
async def _unregister_endpoint(
self,
deployment: DeploymentInfo,
proxy_targets: Mapping[str, ScalingGroupProxyTarget | None],
) -> None:
"""Unregister an endpoint from the proxy service."""
pool = DeploymentRecorderContext.current_pool()
recorder = pool.recorder(deployment.id)
with recorder.phase("unregister_endpoint"):
with recorder.step("delete_from_proxy"):
target = proxy_targets.get(deployment.metadata.resource_group)
if not target:
log.warning(
"No proxy target found for scaling group {}, skipping unregister for {}",
deployment.metadata.resource_group,
deployment.id,
)
return
await self._delete_endpoint_from_wsproxy(
endpoint_id=deployment.id,
app_proxy_addr=target.addr,
app_proxy_api_token=target.api_token,
)