Skip to content

Commit f8e2714

Browse files
rjpowerclaude
andcommitted
[iris] tracker stores WorkerLiveness natively; drop hydrate overlay
Worker liveness state (healthy/active/consecutive_failures/last_heartbeat, committed_*) now lives natively on a single public WorkerLiveness dataclass inside WorkerHealthTracker. Readers query the tracker via tracker.liveness() or tracker.liveness_many() instead of relying on the _hydrate_worker_detail overlay seam. WorkerCommitTracker is folded into WorkerHealthTracker so a single mutex covers every per-worker transient signal. Stale ExtraField defaults for healthy / active / consecutive_failures / last_heartbeat / committed_* / available_* are removed from WORKER_ROW_PROJECTION and WORKER_DETAIL_PROJECTION; WorkerRow / WorkerDetailRow now carry only durable identity columns plus the one real ExtraField (attributes, sourced from worker_attributes). A new SchedulableWorker dataclass (in db.py) bundles the durable WorkerRow columns with the live committed totals for the scheduler hot path, replacing the hydrate-then-decorate dance in healthy_active_workers_with_attributes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 10aa760 commit f8e2714

10 files changed

Lines changed: 312 additions & 466 deletions

File tree

lib/iris/src/iris/cluster/controller/controller.py

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
from iris.cluster.controller.dashboard import ControllerDashboard
6868
from iris.cluster.controller.db import (
6969
ControllerDB,
70+
SchedulableWorker,
7071
healthy_active_workers_with_attributes,
7172
insert_task_profile,
7273
job_scheduling_deadline,
@@ -80,7 +81,6 @@
8081
Scheduler,
8182
SchedulingContext,
8283
WorkerCapacity,
83-
WorkerSnapshot,
8484
)
8585
from iris.cluster.controller.schema import (
8686
ATTEMPT_PROJECTION,
@@ -96,7 +96,6 @@
9696
TaskDetailRow,
9797
TaskRow,
9898
WorkerDetailRow,
99-
WorkerRow,
10099
proto_decoder,
101100
tasks_with_attempts,
102101
)
@@ -114,7 +113,7 @@
114113
TaskUpdate,
115114
log_event,
116115
)
117-
from iris.cluster.controller.worker_health import WorkerCommitTracker, WorkerHealthTracker
116+
from iris.cluster.controller.worker_health import WorkerHealthTracker
118117
from iris.cluster.log_store_helpers import CONTROLLER_LOG_KEY
119118
from iris.cluster.providers.k8s.tasks import K8sTaskProvider
120119
from iris.cluster.providers.types import find_free_port, resolve_external_host
@@ -201,7 +200,7 @@ class _SchedulingStateRead:
201200
"""Snapshot of pending tasks and workers read at the start of a scheduling cycle."""
202201

203202
pending_tasks: list[TaskRow]
204-
workers: list[WorkerRow]
203+
workers: list[SchedulableWorker]
205204
state_read_ms: int
206205

207206

@@ -245,7 +244,7 @@ def job_requirements_from_job(job: JobSchedulingRow) -> JobRequirements:
245244
def compute_demand_entries(
246245
queries: ControllerDB,
247246
scheduler: Scheduler | None = None,
248-
workers: list[WorkerSnapshot] | None = None,
247+
workers: list[SchedulableWorker] | None = None,
249248
reservation_claims: dict[WorkerId, ReservationClaim] | None = None,
250249
) -> list[DemandEntry]:
251250
"""Compute demand entries for the autoscaler from controller state.
@@ -708,7 +707,7 @@ def _tasks_by_ids_with_attempts(queries: ControllerDB, task_ids: set[JobName]) -
708707
return {task.task_id: task for task in tasks_with_attempts(tasks, attempts)}
709708

710709

711-
def _building_counts(queries: ControllerDB, workers: list[WorkerRow]) -> dict[WorkerId, int]:
710+
def _building_counts(queries: ControllerDB, workers: list[SchedulableWorker]) -> dict[WorkerId, int]:
712711
"""Count tasks in BUILDING or ASSIGNED state per worker, excluding reservation-holder jobs."""
713712
if not workers:
714713
return {}
@@ -763,7 +762,7 @@ def _task_worker_mapping(queries: ControllerDB, task_ids: set[JobName]) -> dict[
763762

764763

765764
def _worker_matches_reservation_entry(
766-
worker: WorkerRow,
765+
worker: SchedulableWorker,
767766
res_entry: job_pb2.ReservationEntry,
768767
) -> bool:
769768
"""Check if a worker is eligible for a reservation entry.
@@ -785,9 +784,9 @@ def _worker_matches_reservation_entry(
785784

786785

787786
def _inject_reservation_taints(
788-
workers: list[WorkerRow],
787+
workers: list[SchedulableWorker],
789788
claims: dict[WorkerId, ReservationClaim],
790-
) -> list[WorkerRow]:
789+
) -> list[SchedulableWorker]:
791790
"""Create modified worker copies with reservation taints and prioritization.
792791
793792
Claimed workers receive a ``reservation-job`` attribute set to the claiming
@@ -800,8 +799,8 @@ def _inject_reservation_taints(
800799
if not claims:
801800
return workers
802801

803-
claimed: list[WorkerRow] = []
804-
unclaimed: list[WorkerRow] = []
802+
claimed: list[SchedulableWorker] = []
803+
unclaimed: list[SchedulableWorker] = []
805804
for worker in workers:
806805
claim = claims.get(worker.worker_id)
807806
if claim is not None:
@@ -882,7 +881,6 @@ def _reservation_region_constraints(
882881
claims: dict[WorkerId, ReservationClaim],
883882
queries: ControllerDB,
884883
health: WorkerHealthTracker,
885-
committed: WorkerCommitTracker,
886884
existing_constraints: list[Constraint],
887885
) -> list[Constraint]:
888886
"""Derive region constraints from claimed reservation workers.
@@ -899,7 +897,7 @@ def _reservation_region_constraints(
899897
claimed_worker_ids = {worker_id for worker_id, claim in claims.items() if claim.job_id == job_id_wire}
900898
workers_by_id = {
901899
worker.worker_id: worker
902-
for worker in healthy_active_workers_with_attributes(queries, health, committed)
900+
for worker in healthy_active_workers_with_attributes(queries, health)
903901
if worker.worker_id in claimed_worker_ids
904902
}
905903
regions: set[str] = set()
@@ -1632,13 +1630,13 @@ def _profile_all_running_tasks(self) -> None:
16321630
Memory profiling via memray is currently disabled because memray attach
16331631
has been triggering segfaults in target processes.
16341632
"""
1635-
workers = healthy_active_workers_with_attributes(self._db, self._health, self._store.committed)
1633+
workers = healthy_active_workers_with_attributes(self._db, self._health)
16361634
if not workers:
16371635
return
16381636
workers_by_id = {w.worker_id: w for w in workers}
16391637
tasks_by_worker = running_tasks_by_worker(self._db, set(workers_by_id.keys()))
16401638

1641-
profile_targets: list[tuple[JobName, WorkerRow]] = []
1639+
profile_targets: list[tuple[JobName, SchedulableWorker]] = []
16421640
for worker_id, task_ids in tasks_by_worker.items():
16431641
worker = workers_by_id[worker_id]
16441642
for task_id in task_ids:
@@ -1656,7 +1654,7 @@ def _profile_all_running_tasks(self) -> None:
16561654

16571655
def _dispatch_profiles(
16581656
self,
1659-
targets: list[tuple[JobName, WorkerRow]],
1657+
targets: list[tuple[JobName, SchedulableWorker]],
16601658
profile_type: job_pb2.ProfileType,
16611659
profile_kind: str,
16621660
duration: int,
@@ -1674,7 +1672,7 @@ def _dispatch_profiles(
16741672
def _capture_one_profile(
16751673
self,
16761674
task_id: JobName,
1677-
worker: WorkerRow,
1675+
worker: SchedulableWorker,
16781676
profile_type: job_pb2.ProfileType,
16791677
profile_kind: str,
16801678
duration: int,
@@ -1776,7 +1774,7 @@ def _claim_workers_for_reservations(self, claims: dict[WorkerId, ReservationClai
17761774
persisted = True
17771775
claimed_entries: set[tuple[str, int]] = {(c.job_id, c.entry_idx) for c in claims.values()}
17781776
claimed_worker_ids: set[WorkerId] = set(claims.keys())
1779-
all_workers = healthy_active_workers_with_attributes(self._db, self._health, self._store.committed)
1777+
all_workers = healthy_active_workers_with_attributes(self._db, self._health)
17801778
changed = False
17811779

17821780
reservable_states = (
@@ -1794,8 +1792,6 @@ def _claim_workers_for_reservations(self, claims: dict[WorkerId, ReservationClai
17941792
for worker in all_workers:
17951793
if worker.worker_id in claimed_worker_ids:
17961794
continue
1797-
if not worker.healthy:
1798-
continue
17991795
if not _worker_matches_reservation_entry(worker, res_entry):
18001796
continue
18011797

@@ -1914,7 +1910,7 @@ def _read_scheduling_state(self) -> _SchedulingStateRead:
19141910
timer = Timer()
19151911
with slow_log(logger, "scheduling state reads", threshold_ms=50):
19161912
pending_tasks = _schedulable_tasks(self._db)
1917-
workers = healthy_active_workers_with_attributes(self._db, self._health, self._store.committed)
1913+
workers = healthy_active_workers_with_attributes(self._db, self._health)
19181914
return _SchedulingStateRead(
19191915
pending_tasks=pending_tasks,
19201916
workers=workers,
@@ -2238,7 +2234,7 @@ def _mark_task_unschedulable(self, task: TaskRow) -> None:
22382234
if result.tasks_to_kill:
22392235
self.kill_tasks_on_workers(result.tasks_to_kill, result.task_kill_workers)
22402236

2241-
def create_scheduling_context(self, workers: list[WorkerRow]) -> SchedulingContext:
2237+
def create_scheduling_context(self, workers: list[SchedulableWorker]) -> SchedulingContext:
22422238
"""Create a scheduling context for the given workers."""
22432239
building_counts = _building_counts(self._db, workers)
22442240
return self._scheduler.create_scheduling_context(
@@ -2376,7 +2372,7 @@ def _stop_tasks_direct(
23762372

23772373
def _get_active_worker_addresses(self) -> list[tuple[WorkerId, str | None]]:
23782374
"""Get healthy active workers as (worker_id, address) tuples for ping."""
2379-
workers = healthy_active_workers_with_attributes(self._db, self._health, self._store.committed)
2375+
workers = healthy_active_workers_with_attributes(self._db, self._health)
23802376
return [(w.worker_id, w.address) for w in workers]
23812377

23822378
def _run_ping_loop(self, stop_event: threading.Event) -> None:
@@ -2531,7 +2527,7 @@ def _run_autoscaler_once(self) -> None:
25312527

25322528
worker_status_map = self._build_worker_status_map()
25332529
self._autoscaler.refresh(worker_status_map)
2534-
workers = healthy_active_workers_with_attributes(self._db, self._health, self._store.committed)
2530+
workers = healthy_active_workers_with_attributes(self._db, self._health)
25352531
demand_entries = compute_demand_entries(
25362532
self._db,
25372533
self._scheduler,

lib/iris/src/iris/cluster/controller/db.py

Lines changed: 54 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence
1212
from contextlib import contextmanager
1313
from dataclasses import dataclass, field
14-
from dataclasses import replace as dc_replace
1514
from pathlib import Path
1615
from threading import Lock, RLock
1716
from typing import Any
@@ -20,7 +19,7 @@
2019

2120
from iris.cluster.constraints import AttributeValue
2221
from iris.cluster.controller.schema import decode_timestamp_ms, decode_worker_id
23-
from iris.cluster.controller.worker_health import WorkerCommitTracker, WorkerHealthTracker
22+
from iris.cluster.controller.worker_health import WorkerHealthTracker
2423
from iris.cluster.types import TERMINAL_TASK_STATES, JobName, WorkerId
2524
from iris.rpc import job_pb2
2625

@@ -917,16 +916,43 @@ def _worker_row_select() -> str:
917916
return WORKER_ROW_PROJECTION.select_clause()
918917

919918

919+
@dataclass(frozen=True, slots=True)
920+
class SchedulableWorker:
921+
"""Worker shape consumed by the scheduler.
922+
923+
Combines the durable :class:`WorkerRow` columns with the live committed-resource
924+
totals from :class:`WorkerHealthTracker`. Returned by
925+
:func:`healthy_active_workers_with_attributes` already filtered to healthy+active.
926+
Mirrors the field names in the :class:`scheduler.WorkerSnapshot` protocol so it
927+
can flow straight into ``Scheduler.create_scheduling_context`` without adapters.
928+
"""
929+
930+
worker_id: WorkerId
931+
address: str
932+
total_cpu_millicores: int
933+
total_memory_bytes: int
934+
total_gpu_count: int
935+
total_tpu_count: int
936+
device_type: str
937+
device_variant: str
938+
attributes: dict[str, AttributeValue]
939+
committed_cpu_millicores: int
940+
committed_mem: int
941+
committed_gpu: int
942+
committed_tpu: int
943+
healthy: bool = True
944+
945+
920946
def healthy_active_workers_with_attributes(
921947
db: ControllerDB,
922948
health: WorkerHealthTracker,
923-
committed: WorkerCommitTracker,
924-
) -> list:
925-
"""Fetch all healthy, active workers with their attributes populated.
949+
) -> list[SchedulableWorker]:
950+
"""Fetch all healthy, active workers with their attributes and committed totals populated.
926951
927-
Returns WorkerRow (scalar-only) so the scheduling loop avoids loading metadata columns.
928-
Health/active filtering reads the in-memory tracker; committed-resource
929-
arithmetic reads the in-memory commit tracker.
952+
Health/active filtering reads the in-memory tracker. The returned
953+
:class:`SchedulableWorker` rows include the live ``committed_*`` totals from
954+
the same tracker so the scheduler can compute ``available_*`` without a
955+
second lookup per worker.
930956
"""
931957
from iris.cluster.controller.schema import WORKER_ROW_PROJECTION
932958

@@ -936,38 +962,36 @@ def healthy_active_workers_with_attributes(
936962
return []
937963
placeholders = ",".join("?" for _ in healthy_active)
938964
with db.read_snapshot() as q:
939-
workers = WORKER_ROW_PROJECTION.decode(
965+
rows = WORKER_ROW_PROJECTION.decode(
940966
q.fetchall(
941967
f"SELECT {_worker_row_select()} FROM workers w WHERE w.worker_id IN ({placeholders})",
942968
tuple(str(wid) for wid in healthy_active),
943969
),
944970
)
945-
if not workers:
971+
if not rows:
946972
return []
947973
attrs_by_worker = db.get_worker_attributes()
948-
hydrated = []
949-
for w in workers:
950-
commit = committed.get(w.worker_id)
951-
l = liveness.get(w.worker_id)
952-
hydrated.append(
953-
dc_replace(
954-
w,
955-
healthy=True,
956-
active=True,
957-
consecutive_failures=l.consecutive_ping_failures if l is not None else 0,
958-
last_heartbeat=Timestamp.from_ms(l.last_heartbeat_ms) if l is not None else w.last_heartbeat,
959-
committed_cpu_millicores=commit.cpu_millicores,
960-
committed_mem=commit.memory_bytes,
961-
committed_gpu=commit.gpu,
962-
committed_tpu=commit.tpu,
974+
out: list[SchedulableWorker] = []
975+
for w in rows:
976+
l = liveness[w.worker_id]
977+
out.append(
978+
SchedulableWorker(
979+
worker_id=w.worker_id,
980+
address=w.address,
981+
total_cpu_millicores=w.total_cpu_millicores,
982+
total_memory_bytes=w.total_memory_bytes,
983+
total_gpu_count=w.total_gpu_count,
984+
total_tpu_count=w.total_tpu_count,
985+
device_type=w.device_type,
986+
device_variant=w.device_variant,
963987
attributes=attrs_by_worker.get(w.worker_id, {}),
964-
available_cpu_millicores=w.total_cpu_millicores - commit.cpu_millicores,
965-
available_memory=w.total_memory_bytes - commit.memory_bytes,
966-
available_gpus=w.total_gpu_count - commit.gpu,
967-
available_tpus=w.total_tpu_count - commit.tpu,
988+
committed_cpu_millicores=l.committed_cpu_millicores,
989+
committed_mem=l.committed_mem,
990+
committed_gpu=l.committed_gpu,
991+
committed_tpu=l.committed_tpu,
968992
)
969993
)
970-
return hydrated
994+
return out
971995

972996

973997
def insert_task_profile(

0 commit comments

Comments
 (0)