Skip to content

Commit 3a2e498

Browse files
[iris] Cache autoscaler pending hints per evaluate() cycle
GetJobStatus rebuilt the full AutoscalerStatus proto on every dashboard poll, re-running routing_decision_to_proto over every demand entry and unmet entry. Cache the routing-decision proto and derived pending-hint dict on Autoscaler, invalidated in evaluate(). Fixes #4844
1 parent c342900 commit 3a2e498

File tree

4 files changed

+95
-37
lines changed

4 files changed

+95
-37
lines changed

lib/iris/src/iris/cluster/controller/autoscaler/runtime.py

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
)
4848
from iris.cluster.controller.autoscaler.routing import job_feasibility, route_demand
4949
from iris.cluster.controller.autoscaler.scaling_group import ScalingGroup
50-
from iris.cluster.controller.autoscaler.status import routing_decision_to_proto
50+
from iris.cluster.controller.autoscaler.status import PendingHint, build_job_pending_hints, routing_decision_to_proto
5151
from iris.cluster.controller.autoscaler.worker_registry import TrackedWorker, WorkerRegistry
5252
from iris.cluster.controller.db import ControllerDB
5353
from iris.cluster.types import WorkerStatusMap
@@ -118,6 +118,13 @@ def __init__(
118118
self._last_scale_plan: ScalePlan | None = None
119119
self._last_evaluation: Timestamp = Timestamp.from_ms(0)
120120

121+
# Derived views of _last_scale_plan, built lazily and invalidated by
122+
# evaluate(). Dashboard polls (GetJobStatus, ListJobs) hit these on
123+
# every pending job; building them per request was the bottleneck
124+
# described in #4844.
125+
self._last_routing_decision_proto: vm_pb2.RoutingDecision | None = None
126+
self._last_pending_hints: dict[str, PendingHint] | None = None
127+
121128
# Thread management
122129
self._threads = threads if threads is not None else get_thread_container()
123130

@@ -246,6 +253,8 @@ def evaluate(
246253
routing_decision = route_demand(list(self._groups.values()), demand_entries, ts)
247254
scale_plan = build_scale_plan(self._groups, routing_decision, ts)
248255
self._last_scale_plan = scale_plan
256+
self._last_routing_decision_proto = None
257+
self._last_pending_hints = None
249258

250259
if routing_decision.unmet_entries:
251260
logger.debug(
@@ -555,6 +564,32 @@ def job_feasibility(
555564
result = job_feasibility(self._groups.values(), constraints, replicas=replicas)
556565
return result.reason
557566

567+
def get_last_routing_decision_proto(self) -> vm_pb2.RoutingDecision | None:
568+
"""Return the last routing decision as a proto, lazily built and cached.
569+
570+
The routing decision only changes in evaluate(); intermediate callers
571+
(GetJobStatus, ListJobs) reuse the cached proto without paying the
572+
per-entry conversion cost.
573+
"""
574+
if self._last_scale_plan is None:
575+
return None
576+
if self._last_routing_decision_proto is None:
577+
self._last_routing_decision_proto = routing_decision_to_proto(
578+
self._last_scale_plan.routing_decision,
579+
group_to_launch=self._last_scale_plan.launch_counts(),
580+
)
581+
return self._last_routing_decision_proto
582+
583+
def get_pending_hints(self) -> dict[str, PendingHint]:
584+
"""Return autoscaler pending hints keyed by job id.
585+
586+
Cached per evaluate() cycle so repeated GetJobStatus calls don't
587+
rebuild the hint dict (see #4844).
588+
"""
589+
if self._last_pending_hints is None:
590+
self._last_pending_hints = build_job_pending_hints(self.get_last_routing_decision_proto())
591+
return self._last_pending_hints
592+
558593
def get_status(self) -> vm_pb2.AutoscalerStatus:
559594
"""Build status for the status API."""
560595
status = vm_pb2.AutoscalerStatus(
@@ -563,13 +598,9 @@ def get_status(self) -> vm_pb2.AutoscalerStatus:
563598
last_evaluation=timestamp_to_proto(self._last_evaluation),
564599
recent_actions=list(self._action_log),
565600
)
566-
if self._last_scale_plan is not None:
567-
status.last_routing_decision.CopyFrom(
568-
routing_decision_to_proto(
569-
self._last_scale_plan.routing_decision,
570-
group_to_launch=self._last_scale_plan.launch_counts(),
571-
)
572-
)
601+
routing_proto = self.get_last_routing_decision_proto()
602+
if routing_proto is not None:
603+
status.last_routing_decision.CopyFrom(routing_proto)
573604
return status
574605

575606
def get_group(self, name: str) -> ScalingGroup | None:

lib/iris/src/iris/cluster/controller/service.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
WorkerRow,
8787
tasks_with_attempts,
8888
)
89-
from iris.cluster.controller.autoscaler.status import PendingHint, build_job_pending_hints
89+
from iris.cluster.controller.autoscaler.status import PendingHint
9090
from iris.cluster.controller.query import execute_raw_query
9191
from iris.rpc import query_pb2
9292
from iris.cluster.controller.scheduler import SchedulingContext
@@ -900,6 +900,10 @@ def get_status(self) -> vm_pb2.AutoscalerStatus:
900900
"""Get autoscaler status."""
901901
...
902902

903+
def get_pending_hints(self) -> dict[str, PendingHint]:
904+
"""Get cached pending-hint dict keyed by job id."""
905+
...
906+
903907
def get_vm(self, vm_id: str) -> vm_pb2.VmInfo | None:
904908
"""Get info for a specific VM."""
905909
...
@@ -1044,10 +1048,10 @@ def _get_autoscaler_pending_hints(self) -> dict[str, PendingHint]:
10441048
autoscaler = self._controller.autoscaler
10451049
if autoscaler is None:
10461050
return {}
1047-
status = autoscaler.get_status()
1048-
if not status.HasField("last_routing_decision"):
1049-
return {}
1050-
return build_job_pending_hints(status.last_routing_decision)
1051+
# Autoscaler caches the hint dict per evaluate() cycle; this avoids
1052+
# rebuilding the full AutoscalerStatus proto on every GetJobStatus
1053+
# RPC (#4844).
1054+
return autoscaler.get_pending_hints()
10511055

10521056
def _authorize_job_owner(self, job_id: JobName) -> None:
10531057
"""Raise PERMISSION_DENIED if the authenticated user doesn't own this job.

lib/iris/tests/cluster/controller/test_autoscaler.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,33 @@ def test_get_status_includes_last_routing_decision(self):
645645
assert status.HasField("last_routing_decision")
646646
assert "test-group" in status.last_routing_decision.routed_entries
647647

648+
def test_pending_hints_and_routing_proto_are_cached_between_evaluates(self):
649+
"""Dashboard polls reuse one proto + hint dict per evaluate() (#4844).
650+
651+
get_job_status calls this per pending job on every dashboard refresh.
652+
Rebuilding the status proto each time was measurably slow on busy
653+
clusters; repeated calls should return the same cached objects, and a
654+
new evaluate() must invalidate the cache.
655+
"""
656+
config = make_scale_group_config(name="test-group", buffer_slices=0, max_slices=5)
657+
group = ScalingGroup(config, make_mock_platform())
658+
autoscaler = make_autoscaler({"test-group": group})
659+
660+
autoscaler.evaluate(make_demand_entries(2, device_type=DeviceType.TPU, device_variant="v5p-8"))
661+
662+
# Cached: repeated reads return the same objects without rebuilding.
663+
proto_first = autoscaler.get_last_routing_decision_proto()
664+
hints_first = autoscaler.get_pending_hints()
665+
assert proto_first is autoscaler.get_last_routing_decision_proto()
666+
assert hints_first is autoscaler.get_pending_hints()
667+
# get_status() reuses the same cached routing-decision proto.
668+
assert autoscaler.get_status().last_routing_decision == proto_first
669+
670+
# Invalidated on next evaluate().
671+
autoscaler.evaluate(make_demand_entries(3, device_type=DeviceType.TPU, device_variant="v5p-8"))
672+
assert autoscaler.get_last_routing_decision_proto() is not proto_first
673+
assert autoscaler.get_pending_hints() is not hints_first
674+
648675

649676
class TestAutoscalerBootstrapLogs:
650677
"""Tests for bootstrap log reporting."""

lib/iris/tests/cluster/controller/test_dashboard.py

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from starlette.testclient import TestClient
1515

1616
from iris.cluster.bundle import BundleStore
17+
from iris.cluster.controller.autoscaler.status import PendingHint
1718
from iris.cluster.controller.codec import constraints_from_json, resource_spec_from_scalars
1819
from iris.cluster.controller.dashboard import ControllerDashboard
1920
from iris.log_server.server import LogServiceImpl
@@ -532,6 +533,7 @@ def test_get_autoscaler_status_returns_disabled_when_no_autoscaler(client):
532533
def mock_autoscaler():
533534
"""Create a mock autoscaler that returns a status proto."""
534535
autoscaler = Mock()
536+
autoscaler.get_pending_hints.return_value = {}
535537
autoscaler.get_status.return_value = vm_pb2.AutoscalerStatus(
536538
groups=[
537539
vm_pb2.ScaleGroupStatus(
@@ -642,15 +644,13 @@ def test_pending_reason_uses_autoscaler_hint_for_scale_up(
642644
"""Pending jobs surface autoscaler scale-up wait hints in job/detail APIs."""
643645
submit_job(state, "pending-scale", job_request)
644646

645-
task_id = JobName.root("test-user", "pending-scale").task(0).to_wire()
646-
mock_autoscaler.get_status.return_value = vm_pb2.AutoscalerStatus(
647-
last_routing_decision=vm_pb2.RoutingDecision(
648-
group_to_launch={"tpu_v5e_32": 1},
649-
routed_entries={
650-
"tpu_v5e_32": vm_pb2.DemandEntryStatusList(entries=[vm_pb2.DemandEntryStatus(task_ids=[task_id])])
651-
},
647+
job_wire = JobName.root("test-user", "pending-scale").to_wire()
648+
mock_autoscaler.get_pending_hints.return_value = {
649+
job_wire: PendingHint(
650+
message="Waiting for worker scale-up in scale group 'tpu_v5e_32' (1 slice(s) requested)",
651+
is_scaling_up=True,
652652
)
653-
)
653+
}
654654

655655
# GetJobStatus intentionally does not append the autoscaler hint — it
656656
# was the dominant hot path in a live CPU profile (35% of wall time
@@ -693,16 +693,14 @@ def test_pending_reason_uses_passive_autoscaler_hint_over_scheduler(
693693
],
694694
)
695695
submit_job(state, "diag-constraint", request)
696-
task_id = JobName.root("test-user", "diag-constraint").task(0).to_wire()
696+
job_wire = JobName.root("test-user", "diag-constraint").to_wire()
697697

698-
mock_autoscaler.get_status.return_value = vm_pb2.AutoscalerStatus(
699-
last_routing_decision=vm_pb2.RoutingDecision(
700-
group_to_launch={"tpu_v5e_32": 0},
701-
routed_entries={
702-
"tpu_v5e_32": vm_pb2.DemandEntryStatusList(entries=[vm_pb2.DemandEntryStatus(task_ids=[task_id])])
703-
},
698+
mock_autoscaler.get_pending_hints.return_value = {
699+
job_wire: PendingHint(
700+
message="Waiting for workers in scale group 'tpu_v5e_32' to become ready",
701+
is_scaling_up=False,
704702
)
705-
)
703+
}
706704

707705
# GetJobStatus no longer appends the autoscaler hint (see
708706
# test_pending_reason_uses_autoscaler_hint_for_scale_up for rationale).
@@ -723,16 +721,14 @@ def test_list_jobs_shows_passive_autoscaler_wait_hint(
723721
):
724722
"""ListJobs should show passive autoscaler wait hints for pending jobs."""
725723
submit_job(state, "pending-no-launch", job_request)
726-
task_id = JobName.root("test-user", "pending-no-launch").task(0).to_wire()
724+
job_wire = JobName.root("test-user", "pending-no-launch").to_wire()
727725

728-
mock_autoscaler.get_status.return_value = vm_pb2.AutoscalerStatus(
729-
last_routing_decision=vm_pb2.RoutingDecision(
730-
group_to_launch={"tpu_v5e_32": 0},
731-
routed_entries={
732-
"tpu_v5e_32": vm_pb2.DemandEntryStatusList(entries=[vm_pb2.DemandEntryStatus(task_ids=[task_id])])
733-
},
726+
mock_autoscaler.get_pending_hints.return_value = {
727+
job_wire: PendingHint(
728+
message="Waiting for workers in scale group 'tpu_v5e_32' to become ready",
729+
is_scaling_up=False,
734730
)
735-
)
731+
}
736732

737733
jobs_resp = rpc_post(client_with_autoscaler, "ListJobs")
738734
listed = [

0 commit comments

Comments
 (0)