Skip to content

Commit 6e7f48b

Browse files
authored
[iris] Drop per-RPC autoscaler hint in GetJobStatus; cache worker roster; cover task-summary index (#4846)
GetJobStatus rebuilt and serialized the full autoscaler routing table per call (35% of controller wall-time in a live CPU profile); drop the hint there and keep it on ListJobs. Adds a 1s TTL cache for the worker roster so back-to-back ListWorkers and GetAutoscalerStatus share one scan. Adds a covering index on tasks(job_id, state, failure_count, preemption_count) so _task_summaries_for_jobs can satisfy the GROUP BY + SUM from the index alone.
1 parent 26c5b61 commit 6e7f48b

File tree

4 files changed

+72
-8
lines changed

4 files changed

+72
-8
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Copyright The Marin Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
import sqlite3
5+
6+
7+
def migrate(conn: sqlite3.Connection) -> None:
8+
# _task_summaries_for_jobs (service.py) runs
9+
# SELECT job_id, state, COUNT(*), SUM(failure_count), SUM(preemption_count)
10+
# FROM tasks WHERE job_id IN (...) GROUP BY job_id, state
11+
# on every ListJobs and GetJobStatus call. The existing
12+
# idx_tasks_job_failures (job_id, failure_count, preemption_count) lacks
13+
# `state` and so SQLite has to read the base row for every matched task
14+
# to get the state column. idx_tasks_job_state (job_id, state) covers the
15+
# filter+GROUP BY keys but not the SUM targets.
16+
#
17+
# This index covers the whole query: leading (job_id, state) serves
18+
# WHERE + GROUP BY, and the trailing (failure_count, preemption_count)
19+
# columns let SQLite satisfy the SUMs directly from the index without
20+
# touching the tasks heap.
21+
conn.execute(
22+
"CREATE INDEX IF NOT EXISTS idx_tasks_job_state_counts "
23+
"ON tasks(job_id, state, failure_count, preemption_count)"
24+
)

lib/iris/src/iris/cluster/controller/schema.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,9 @@ def generate_full_ddl(tables: Sequence[Table]) -> str:
767767
# Migration 0020
768768
"CREATE INDEX IF NOT EXISTS idx_tasks_current_worker"
769769
" ON tasks(current_worker_id) WHERE current_worker_id IS NOT NULL",
770+
# Migration 0034: covers _task_summaries_for_jobs GROUP BY + SUM.
771+
"CREATE INDEX IF NOT EXISTS idx_tasks_job_state_counts"
772+
" ON tasks(job_id, state, failure_count, preemption_count)",
770773
),
771774
)
772775

lib/iris/src/iris/cluster/controller/service.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import logging
1313
import re
1414
import secrets
15+
import threading
16+
import time
1517
import uuid
1618
import dataclasses
1719
from dataclasses import dataclass
@@ -1004,13 +1006,39 @@ def __init__(
10041006
self._timer = Timer()
10051007
self._auth = auth or ControllerAuth()
10061008
self._system_endpoints: dict[str, str] = system_endpoints or {}
1009+
# Short-TTL cache of the worker roster. Dashboards call ListWorkers
1010+
# and GetAutoscalerStatus back-to-back; both enumerate every worker.
1011+
# 1s is short enough that stale rows don't matter (workers have
1012+
# slower health/heartbeat cadence) and long enough to fuse adjacent
1013+
# refreshes into one SELECT.
1014+
self._worker_roster_cache: tuple[float, list[WorkerDetailRow]] | None = None
1015+
self._worker_roster_cache_lock = threading.Lock()
1016+
self._worker_roster_ttl_s = 1.0
10071017

10081018
def bundle_zip(self, bundle_id: str) -> bytes:
10091019
return self._bundle_store.get_zip(bundle_id)
10101020

10111021
def blob_data(self, blob_id: str) -> bytes:
10121022
return self._bundle_store.get_zip(blob_id)
10131023

1024+
def _worker_roster_cached(self) -> list[WorkerDetailRow]:
1025+
"""Return the worker roster, refreshed at most once per TTL window.
1026+
1027+
`ListWorkers` and `GetAutoscalerStatus` both enumerate every worker
1028+
and get polled back-to-back by the dashboard. The SELECT + attribute
1029+
fan-out is expensive (no WHERE, full scan of workers + worker_attributes)
1030+
and repeating it twice per refresh is pure duplication.
1031+
"""
1032+
now = time.monotonic()
1033+
with self._worker_roster_cache_lock:
1034+
cached = self._worker_roster_cache
1035+
if cached is not None and (now - cached[0]) < self._worker_roster_ttl_s:
1036+
return cached[1]
1037+
roster = _worker_roster(self._db)
1038+
with self._worker_roster_cache_lock:
1039+
self._worker_roster_cache = (now, roster)
1040+
return roster
1041+
10141042
def _get_autoscaler_pending_hints(self) -> dict[str, PendingHint]:
10151043
"""Build autoscaler-based pending hints keyed by job id."""
10161044
autoscaler = self._controller.autoscaler
@@ -1194,14 +1222,16 @@ def get_job_status(
11941222

11951223
# Get scheduling diagnostics for pending jobs from cache
11961224
# (populated each scheduling cycle by the controller).
1225+
#
1226+
# The autoscaler pending-hint used to be appended here, but
1227+
# ``_get_autoscaler_pending_hints`` rebuilds + serializes the full
1228+
# autoscaler routing table on every call (35%+ of wall-time in a
1229+
# live CPU profile). Skip it for now; use ListJobs for the richer
1230+
# pending explanation while we work out a cached hint path.
11971231
pending_reason = ""
11981232
if job.state == job_pb2.JOB_STATE_PENDING:
11991233
sched_reason = self._controller.get_job_scheduling_diagnostics(job.job_id.to_wire())
12001234
pending_reason = sched_reason or "Pending scheduler feedback"
1201-
hint = self._get_autoscaler_pending_hints().get(job.job_id.to_wire())
1202-
if hint is not None:
1203-
scaling_prefix = "(scaling up) " if hint.is_scaling_up else ""
1204-
pending_reason = f"Scheduler: {pending_reason}\n\nAutoscaler: {scaling_prefix}{hint.message}"
12051235

12061236
resources = _resource_spec_from_job_row(job)
12071237

@@ -1575,7 +1605,7 @@ def list_workers(
15751605
if self._controller.has_direct_provider:
15761606
return controller_pb2.Controller.ListWorkersResponse()
15771607
workers = []
1578-
worker_rows = _worker_roster(self._db)
1608+
worker_rows = self._worker_roster_cached()
15791609
running_by_worker = running_tasks_by_worker(self._db, {worker.worker_id for worker in worker_rows})
15801610
for worker in worker_rows:
15811611
workers.append(
@@ -1727,7 +1757,7 @@ def get_autoscaler_status(
17271757
status = autoscaler.get_status()
17281758

17291759
# Build a map of worker_id -> (worker_id, healthy) for enriching VmInfo
1730-
workers = _worker_roster(self._db)
1760+
workers = self._worker_roster_cached()
17311761
worker_id_to_info: dict[str, tuple[str, bool]] = {}
17321762
for w in workers:
17331763
worker_id_to_info[w.worker_id] = (w.worker_id, w.healthy)

lib/iris/tests/cluster/controller/test_dashboard.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -652,11 +652,15 @@ def test_pending_reason_uses_autoscaler_hint_for_scale_up(
652652
)
653653
)
654654

655+
# GetJobStatus intentionally does not append the autoscaler hint — it
656+
# was the dominant hot path in a live CPU profile (35% of wall time
657+
# spent rebuilding / serializing the routing table per RPC). ListJobs
658+
# still includes the hint since it's only computed once per page.
655659
job_resp = rpc_post(
656660
client_with_autoscaler, "GetJobStatus", {"jobId": JobName.root("test-user", "pending-scale").to_wire()}
657661
)
658662
pending_reason = job_resp.get("job", {}).get("pendingReason", "")
659-
assert "Waiting for worker scale-up in scale group 'tpu_v5e_32'" in pending_reason
663+
assert "Waiting for worker scale-up in scale group 'tpu_v5e_32'" not in pending_reason
660664

661665
jobs_resp = rpc_post(client_with_autoscaler, "ListJobs")
662666
listed = [
@@ -700,12 +704,15 @@ def test_pending_reason_uses_passive_autoscaler_hint_over_scheduler(
700704
)
701705
)
702706

707+
# GetJobStatus no longer appends the autoscaler hint (see
708+
# test_pending_reason_uses_autoscaler_hint_for_scale_up for rationale).
709+
# It still surfaces the scheduler diagnostic.
703710
job_resp = rpc_post(
704711
client_with_autoscaler, "GetJobStatus", {"jobId": JobName.root("test-user", "diag-constraint").to_wire()}
705712
)
706713
pending_reason = job_resp.get("job", {}).get("pendingReason", "")
707714
assert pending_reason
708-
assert "Waiting for workers in scale group 'tpu_v5e_32' to become ready" in pending_reason
715+
assert "Waiting for workers in scale group 'tpu_v5e_32' to become ready" not in pending_reason
709716

710717

711718
def test_list_jobs_shows_passive_autoscaler_wait_hint(

0 commit comments

Comments
 (0)