Skip to content

Commit b0b1d10

Browse files
[iris] Cache autoscaler pending hints per evaluate() cycle
GetJobStatus rebuilt the full AutoscalerStatus proto on every dashboard poll, re-running routing_decision_to_proto over every demand entry and unmet entry. Cache the routing-decision proto and derived pending-hint dict on Autoscaler, invalidated in evaluate(). Fixes #4844
1 parent bb36920 commit b0b1d10

File tree

4 files changed

+95
-37
lines changed

4 files changed

+95
-37
lines changed

lib/iris/src/iris/cluster/controller/autoscaler/runtime.py

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
)
4848
from iris.cluster.controller.autoscaler.routing import job_feasibility, route_demand
4949
from iris.cluster.controller.autoscaler.scaling_group import ScalingGroup
50-
from iris.cluster.controller.autoscaler.status import routing_decision_to_proto
50+
from iris.cluster.controller.autoscaler.status import PendingHint, build_job_pending_hints, routing_decision_to_proto
5151
from iris.cluster.controller.autoscaler.worker_registry import TrackedWorker, WorkerRegistry
5252
from iris.cluster.controller.db import ControllerDB
5353
from iris.cluster.types import WorkerStatusMap
@@ -118,6 +118,13 @@ def __init__(
118118
self._last_scale_plan: ScalePlan | None = None
119119
self._last_evaluation: Timestamp = Timestamp.from_ms(0)
120120

121+
# Derived views of _last_scale_plan, built lazily and invalidated by
122+
# evaluate(). Dashboard polls (GetJobStatus, ListJobs) hit these on
123+
# every pending job; building them per request was the bottleneck
124+
# described in #4844.
125+
self._last_routing_decision_proto: vm_pb2.RoutingDecision | None = None
126+
self._last_pending_hints: dict[str, PendingHint] | None = None
127+
121128
# Thread management
122129
self._threads = threads if threads is not None else get_thread_container()
123130

@@ -246,6 +253,8 @@ def evaluate(
246253
routing_decision = route_demand(list(self._groups.values()), demand_entries, ts)
247254
scale_plan = build_scale_plan(self._groups, routing_decision, ts)
248255
self._last_scale_plan = scale_plan
256+
self._last_routing_decision_proto = None
257+
self._last_pending_hints = None
249258

250259
if routing_decision.unmet_entries:
251260
logger.debug(
@@ -555,6 +564,32 @@ def job_feasibility(
555564
result = job_feasibility(self._groups.values(), constraints, replicas=replicas)
556565
return result.reason
557566

567+
def get_last_routing_decision_proto(self) -> vm_pb2.RoutingDecision | None:
568+
"""Return the last routing decision as a proto, lazily built and cached.
569+
570+
The routing decision only changes in evaluate(); intermediate callers
571+
(GetJobStatus, ListJobs) reuse the cached proto without paying the
572+
per-entry conversion cost.
573+
"""
574+
if self._last_scale_plan is None:
575+
return None
576+
if self._last_routing_decision_proto is None:
577+
self._last_routing_decision_proto = routing_decision_to_proto(
578+
self._last_scale_plan.routing_decision,
579+
group_to_launch=self._last_scale_plan.launch_counts(),
580+
)
581+
return self._last_routing_decision_proto
582+
583+
def get_pending_hints(self) -> dict[str, PendingHint]:
584+
"""Return autoscaler pending hints keyed by job id.
585+
586+
Cached per evaluate() cycle so repeated GetJobStatus calls don't
587+
rebuild the hint dict (see #4844).
588+
"""
589+
if self._last_pending_hints is None:
590+
self._last_pending_hints = build_job_pending_hints(self.get_last_routing_decision_proto())
591+
return self._last_pending_hints
592+
558593
def get_status(self) -> vm_pb2.AutoscalerStatus:
559594
"""Build status for the status API."""
560595
status = vm_pb2.AutoscalerStatus(
@@ -563,13 +598,9 @@ def get_status(self) -> vm_pb2.AutoscalerStatus:
563598
last_evaluation=timestamp_to_proto(self._last_evaluation),
564599
recent_actions=list(self._action_log),
565600
)
566-
if self._last_scale_plan is not None:
567-
status.last_routing_decision.CopyFrom(
568-
routing_decision_to_proto(
569-
self._last_scale_plan.routing_decision,
570-
group_to_launch=self._last_scale_plan.launch_counts(),
571-
)
572-
)
601+
routing_proto = self.get_last_routing_decision_proto()
602+
if routing_proto is not None:
603+
status.last_routing_decision.CopyFrom(routing_proto)
573604
return status
574605

575606
def get_group(self, name: str) -> ScalingGroup | None:

lib/iris/src/iris/cluster/controller/service.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
WorkerRow,
8585
tasks_with_attempts,
8686
)
87-
from iris.cluster.controller.autoscaler.status import PendingHint, build_job_pending_hints
87+
from iris.cluster.controller.autoscaler.status import PendingHint
8888
from iris.cluster.controller.query import execute_raw_query
8989
from iris.rpc import query_pb2
9090
from iris.cluster.controller.scheduler import SchedulingContext
@@ -898,6 +898,10 @@ def get_status(self) -> vm_pb2.AutoscalerStatus:
898898
"""Get autoscaler status."""
899899
...
900900

901+
def get_pending_hints(self) -> dict[str, PendingHint]:
902+
"""Get cached pending-hint dict keyed by job id."""
903+
...
904+
901905
def get_vm(self, vm_id: str) -> vm_pb2.VmInfo | None:
902906
"""Get info for a specific VM."""
903907
...
@@ -1016,10 +1020,10 @@ def _get_autoscaler_pending_hints(self) -> dict[str, PendingHint]:
10161020
autoscaler = self._controller.autoscaler
10171021
if autoscaler is None:
10181022
return {}
1019-
status = autoscaler.get_status()
1020-
if not status.HasField("last_routing_decision"):
1021-
return {}
1022-
return build_job_pending_hints(status.last_routing_decision)
1023+
# Autoscaler caches the hint dict per evaluate() cycle; this avoids
1024+
# rebuilding the full AutoscalerStatus proto on every GetJobStatus
1025+
# RPC (#4844).
1026+
return autoscaler.get_pending_hints()
10231027

10241028
def _authorize_job_owner(self, job_id: JobName) -> None:
10251029
"""Raise PERMISSION_DENIED if the authenticated user doesn't own this job.

lib/iris/tests/cluster/controller/test_autoscaler.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,33 @@ def test_get_status_includes_last_routing_decision(self):
645645
assert status.HasField("last_routing_decision")
646646
assert "test-group" in status.last_routing_decision.routed_entries
647647

648+
def test_pending_hints_and_routing_proto_are_cached_between_evaluates(self):
649+
"""Dashboard polls reuse one proto + hint dict per evaluate() (#4844).
650+
651+
get_job_status calls this per pending job on every dashboard refresh.
652+
Rebuilding the status proto each time was measurably slow on busy
653+
clusters; repeated calls should return the same cached objects, and a
654+
new evaluate() must invalidate the cache.
655+
"""
656+
config = make_scale_group_config(name="test-group", buffer_slices=0, max_slices=5)
657+
group = ScalingGroup(config, make_mock_platform())
658+
autoscaler = make_autoscaler({"test-group": group})
659+
660+
autoscaler.evaluate(make_demand_entries(2, device_type=DeviceType.TPU, device_variant="v5p-8"))
661+
662+
# Cached: repeated reads return the same objects without rebuilding.
663+
proto_first = autoscaler.get_last_routing_decision_proto()
664+
hints_first = autoscaler.get_pending_hints()
665+
assert proto_first is autoscaler.get_last_routing_decision_proto()
666+
assert hints_first is autoscaler.get_pending_hints()
667+
# get_status() reuses the same cached routing-decision proto.
668+
assert autoscaler.get_status().last_routing_decision == proto_first
669+
670+
# Invalidated on next evaluate().
671+
autoscaler.evaluate(make_demand_entries(3, device_type=DeviceType.TPU, device_variant="v5p-8"))
672+
assert autoscaler.get_last_routing_decision_proto() is not proto_first
673+
assert autoscaler.get_pending_hints() is not hints_first
674+
648675

649676
class TestAutoscalerBootstrapLogs:
650677
"""Tests for bootstrap log reporting."""

lib/iris/tests/cluster/controller/test_dashboard.py

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from starlette.testclient import TestClient
1515

1616
from iris.cluster.bundle import BundleStore
17+
from iris.cluster.controller.autoscaler.status import PendingHint
1718
from iris.cluster.controller.codec import constraints_from_json, resource_spec_from_scalars
1819
from iris.cluster.controller.dashboard import ControllerDashboard
1920
from iris.log_server.server import LogServiceImpl
@@ -532,6 +533,7 @@ def test_get_autoscaler_status_returns_disabled_when_no_autoscaler(client):
532533
def mock_autoscaler():
533534
"""Create a mock autoscaler that returns a status proto."""
534535
autoscaler = Mock()
536+
autoscaler.get_pending_hints.return_value = {}
535537
autoscaler.get_status.return_value = vm_pb2.AutoscalerStatus(
536538
groups=[
537539
vm_pb2.ScaleGroupStatus(
@@ -642,15 +644,13 @@ def test_pending_reason_uses_autoscaler_hint_for_scale_up(
642644
"""Pending jobs surface autoscaler scale-up wait hints in job/detail APIs."""
643645
submit_job(state, "pending-scale", job_request)
644646

645-
task_id = JobName.root("test-user", "pending-scale").task(0).to_wire()
646-
mock_autoscaler.get_status.return_value = vm_pb2.AutoscalerStatus(
647-
last_routing_decision=vm_pb2.RoutingDecision(
648-
group_to_launch={"tpu_v5e_32": 1},
649-
routed_entries={
650-
"tpu_v5e_32": vm_pb2.DemandEntryStatusList(entries=[vm_pb2.DemandEntryStatus(task_ids=[task_id])])
651-
},
647+
job_wire = JobName.root("test-user", "pending-scale").to_wire()
648+
mock_autoscaler.get_pending_hints.return_value = {
649+
job_wire: PendingHint(
650+
message="Waiting for worker scale-up in scale group 'tpu_v5e_32' (1 slice(s) requested)",
651+
is_scaling_up=True,
652652
)
653-
)
653+
}
654654

655655
job_resp = rpc_post(
656656
client_with_autoscaler, "GetJobStatus", {"jobId": JobName.root("test-user", "pending-scale").to_wire()}
@@ -689,16 +689,14 @@ def test_pending_reason_uses_passive_autoscaler_hint_over_scheduler(
689689
],
690690
)
691691
submit_job(state, "diag-constraint", request)
692-
task_id = JobName.root("test-user", "diag-constraint").task(0).to_wire()
692+
job_wire = JobName.root("test-user", "diag-constraint").to_wire()
693693

694-
mock_autoscaler.get_status.return_value = vm_pb2.AutoscalerStatus(
695-
last_routing_decision=vm_pb2.RoutingDecision(
696-
group_to_launch={"tpu_v5e_32": 0},
697-
routed_entries={
698-
"tpu_v5e_32": vm_pb2.DemandEntryStatusList(entries=[vm_pb2.DemandEntryStatus(task_ids=[task_id])])
699-
},
694+
mock_autoscaler.get_pending_hints.return_value = {
695+
job_wire: PendingHint(
696+
message="Waiting for workers in scale group 'tpu_v5e_32' to become ready",
697+
is_scaling_up=False,
700698
)
701-
)
699+
}
702700

703701
job_resp = rpc_post(
704702
client_with_autoscaler, "GetJobStatus", {"jobId": JobName.root("test-user", "diag-constraint").to_wire()}
@@ -716,16 +714,14 @@ def test_list_jobs_shows_passive_autoscaler_wait_hint(
716714
):
717715
"""ListJobs should show passive autoscaler wait hints for pending jobs."""
718716
submit_job(state, "pending-no-launch", job_request)
719-
task_id = JobName.root("test-user", "pending-no-launch").task(0).to_wire()
717+
job_wire = JobName.root("test-user", "pending-no-launch").to_wire()
720718

721-
mock_autoscaler.get_status.return_value = vm_pb2.AutoscalerStatus(
722-
last_routing_decision=vm_pb2.RoutingDecision(
723-
group_to_launch={"tpu_v5e_32": 0},
724-
routed_entries={
725-
"tpu_v5e_32": vm_pb2.DemandEntryStatusList(entries=[vm_pb2.DemandEntryStatus(task_ids=[task_id])])
726-
},
719+
mock_autoscaler.get_pending_hints.return_value = {
720+
job_wire: PendingHint(
721+
message="Waiting for workers in scale group 'tpu_v5e_32' to become ready",
722+
is_scaling_up=False,
727723
)
728-
)
724+
}
729725

730726
jobs_resp = rpc_post(client_with_autoscaler, "ListJobs")
731727
listed = [

0 commit comments

Comments
 (0)