Skip to content

Commit 5ceef81

Browse files
committed
Cleanups.
1 parent 90b6cfb commit 5ceef81

13 files changed

Lines changed: 292 additions & 631 deletions

File tree

lib/iris/src/iris/cluster/controller/controller.py

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
from iris.cluster.controller.provider import TaskProvider
7676
from iris.cluster.controller.reads import SchedulableWorker
7777
from iris.cluster.controller.scheduler import (
78+
DEFAULT_MAX_ASSIGNMENTS_PER_WORKER,
7879
DEFAULT_MAX_BUILDING_TASKS_PER_WORKER,
7980
JobRequirements,
8081
Scheduler,
@@ -328,13 +329,24 @@ def compute_demand_entries(
328329
dry_run_workers = _inject_reservation_taints(snapshots, claims)
329330
dry_run_jobs = _inject_taint_constraints(jobs, has_reservation, has_direct_reservation)
330331

331-
context = scheduler.create_scheduling_context(
332-
dry_run_workers,
332+
# Dry-run scheduling context — only the per-(task, worker) matching loop
333+
# consumes capacities/jobs/pending_tasks, so the raw-read fields stay
334+
# empty. Building/assignment limits are disabled so big workers can
335+
# absorb multiple tasks (prevents false demand on idle clusters).
336+
context = SchedulingContext(
337+
workers=dry_run_workers,
333338
building_counts=building_counts,
334-
pending_tasks=task_ids,
335-
jobs=dry_run_jobs,
336339
max_building_tasks=_UNLIMITED,
337340
max_assignments_per_worker=_UNLIMITED,
341+
pending_tasks=task_ids,
342+
jobs=dry_run_jobs,
343+
pending_task_rows=[],
344+
user_spend={},
345+
user_budget_limits={},
346+
requested_bands={},
347+
reserved_job_ids=frozenset(),
348+
reservation_entry_counts={},
349+
user_budget_defaults=UserBudgetDefaults(),
338350
)
339351
result = scheduler.find_assignments(context)
340352
for task_id, _ in result.assignments:
@@ -1032,10 +1044,13 @@ def build_scheduling_context(
10321044

10331045
snapshots = [worker_snapshot_from_row(w, usage_by_worker.get(w.worker_id)) for w in workers]
10341046
sorted_pending = _sort_pending_tasks_by_resolved_band(pending, requested_bands)
1035-
return SchedulingContext.from_workers(
1036-
snapshots,
1047+
return SchedulingContext(
1048+
workers=snapshots,
10371049
building_counts=building_counts,
10381050
max_building_tasks=max_building_tasks,
1051+
max_assignments_per_worker=DEFAULT_MAX_ASSIGNMENTS_PER_WORKER,
1052+
pending_tasks=[],
1053+
jobs={},
10391054
pending_task_rows=sorted_pending,
10401055
user_spend=user_spend,
10411056
user_budget_limits=user_budget_limits,
@@ -1468,6 +1483,12 @@ def __init__(
14681483
self._scheduling_diagnostics: dict[str, str] = {}
14691484
self._scheduling_round: int = 0
14701485

1486+
# Last completed scheduling context — None until the first tick runs.
1487+
# The dashboard diagnostics path reads this instead of rebuilding from
1488+
# the DB. This is the only ``| None`` attribute on Controller: it is
1489+
# genuinely None before the first scheduling tick has run.
1490+
self._last_scheduling_context: SchedulingContext | None = None
1491+
14711492
# Set to True once start() is called. Used to gate operations that
14721493
# are only valid before the controller loops begin (e.g. LoadCheckpoint).
14731494
self._started = False
@@ -1974,6 +1995,7 @@ def _run_scheduling(self) -> SchedulingOutcome:
19741995

19751996
if not ctx.pending_task_rows:
19761997
self._scheduling_diagnostics = {}
1998+
self._last_scheduling_context = ctx
19771999
return SchedulingOutcome.NO_PENDING_TASKS
19782000

19792001
gated = apply_scheduling_gates(
@@ -1989,6 +2011,7 @@ def _run_scheduling(self) -> SchedulingOutcome:
19892011

19902012
if not gated.schedulable_task_ids:
19912013
self._scheduling_diagnostics = {}
2014+
self._last_scheduling_context = ctx
19922015
return SchedulingOutcome.NO_PENDING_TASKS
19932016

19942017
order = compute_scheduling_order(ctx, gated, trace=trace)
@@ -1998,6 +2021,9 @@ def _run_scheduling(self) -> SchedulingOutcome:
19982021
preemptions = self._apply_preemptions(order, tainted_jobs, all_assignments, claims, context)
19992022

20002023
self._cache_scheduling_diagnostics(context, tainted_jobs, all_assignments, order.ordered_task_ids)
2024+
# Post-taint context (or the un-tainted ctx when no claims were active)
2025+
# — exposed via ``last_scheduling_context`` for dashboard diagnostics.
2026+
self._last_scheduling_context = context
20012027

20022028
if all_assignments or preemptions:
20032029
log_event(
@@ -2048,11 +2074,12 @@ def _run_scheduler_pass(
20482074
if claims:
20492075
modified_workers = _inject_reservation_taints(list(ctx.workers), claims)
20502076
building_counts = {wid: cap.building_task_count for wid, cap in ctx.capacities.items()}
2051-
context = self._scheduler.create_scheduling_context(
2052-
modified_workers,
2053-
building_counts=building_counts,
2054-
pending_tasks=order.ordered_task_ids,
2077+
ctx.pending_tasks = list(order.ordered_task_ids)
2078+
context = ctx.evolve_with_workers(
2079+
workers=modified_workers,
20552080
jobs=modified_jobs,
2081+
building_counts=building_counts,
2082+
max_building_tasks=self._scheduler.max_building_tasks_per_worker,
20562083
)
20572084
else:
20582085
ctx.pending_tasks = list(order.ordered_task_ids)
@@ -2274,16 +2301,16 @@ def _mark_task_unschedulable(self, task: Any) -> None:
22742301
reason=f"Scheduling timeout exceeded ({timeout})",
22752302
)
22762303

2277-
def create_scheduling_context(self, workers: list[SchedulableWorker]) -> SchedulingContext:
2278-
"""Create a worker-only scheduling context for diagnostics and dashboard RPCs."""
2279-
with self._db.read_snapshot() as snap:
2280-
building_counts = reads.building_counts(snap, [w.worker_id for w in workers])
2281-
usage_by_worker = reads.resource_usage_by_worker(snap)
2282-
snapshots = [worker_snapshot_from_row(w, usage_by_worker.get(w.worker_id)) for w in workers]
2283-
return self._scheduler.create_scheduling_context(
2284-
snapshots,
2285-
building_counts=building_counts,
2286-
)
2304+
@property
2305+
def last_scheduling_context(self) -> "SchedulingContext | None":
2306+
"""Return the most recent finalized scheduling context.
2307+
2308+
``None`` before the first scheduling tick has run; otherwise the
2309+
post-taint context from the last completed ``_run_scheduling`` pass.
2310+
Consumed by dashboard diagnostics that need a snapshot of capacities
2311+
and pending tasks without rebuilding from the DB.
2312+
"""
2313+
return self._last_scheduling_context
22872314

22882315
# =========================================================================
22892316
# Worker lifecycle RPC dispatch (Reconcile / Ping)

lib/iris/src/iris/cluster/controller/reads.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,8 +1074,8 @@ class SchedulableWorker:
10741074
"""Worker shape consumed by the scheduler.
10751075
10761076
Field names mirror the :class:`scheduler.WorkerSnapshot` protocol so
1077-
instances flow into ``Scheduler.create_scheduling_context`` without
1078-
an adapter.
1077+
instances flow through ``worker_snapshot_from_row`` into
1078+
``SchedulingContext`` without an adapter.
10791079
"""
10801080

10811081
worker_id: WorkerId

lib/iris/src/iris/cluster/controller/scheduler.py

Lines changed: 78 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
The scheduler operates exclusively on scheduler-owned types (JobRequirements,
1212
WorkerCapacity, SchedulingContext) and has ZERO runtime imports from controller
1313
state. Callers project worker rows into ``WorkerSnapshot`` (via
14-
``worker_snapshot_from_row``) at the boundary before invoking
15-
``create_scheduling_context``.
14+
``worker_snapshot_from_row``) at the boundary before constructing
15+
``SchedulingContext`` directly.
1616
1717
"""
1818

@@ -328,100 +328,96 @@ class SchedulingContext:
328328
gates/order pipeline and derived structures (``index``, ``capacities``) for
329329
the per-(task, worker) matching hot loop.
330330
331+
Construction is direct: callers supply ``workers``, ``building_counts``,
332+
``max_building_tasks``, and the raw-read fields; ``__post_init__`` derives
333+
``capacities``, ``index``, and ``_str_to_wid`` once. To rebuild the index
334+
after taint injection mid-tick, use :meth:`evolve_with_workers` which reuses
335+
the raw-read fields and only redoes the per-worker derivation.
336+
331337
Posting lists are read-only after construction; capacity deductions don't
332338
touch them. Workers are tracked via ``assignment_counts`` to bound tasks
333339
per worker per cycle.
334340
"""
335341

336-
index: ConstraintIndex
337-
338-
# Worker capacities indexed by worker ID
339-
capacities: dict[WorkerId, WorkerCapacity]
340-
341-
# Reverse map from string ID back to WorkerId
342-
_str_to_wid: dict[str, WorkerId]
343-
344-
assignment_counts: dict[WorkerId, int] = field(default_factory=dict)
345-
max_assignments_per_worker: int = DEFAULT_MAX_ASSIGNMENTS_PER_WORKER
346-
347-
# Task IDs in scheduling priority order; populated after gates+order resolve.
348-
pending_tasks: list[JobName] = field(default_factory=list)
349-
jobs: dict[JobName, JobRequirements] = field(default_factory=dict)
350-
351-
# Raw per-tick reads — consumed by gates/order helpers; empty for diagnostics/dry-run.
352-
pending_task_rows: list[PendingTask] = field(default_factory=list)
353-
workers: list[WorkerSnapshot] = field(default_factory=list)
354-
user_spend: dict[str, int] = field(default_factory=dict)
355-
user_budget_limits: dict[str, int] = field(default_factory=dict)
356-
requested_bands: dict[JobName, int] = field(default_factory=dict)
357-
reserved_job_ids: frozenset[JobName] = field(default_factory=frozenset)
358-
reservation_entry_counts: dict[JobName, int] = field(default_factory=dict)
359-
user_budget_defaults: UserBudgetDefaults | None = None
342+
workers: list[WorkerSnapshot]
343+
building_counts: dict[WorkerId, int]
344+
max_building_tasks: int
345+
max_assignments_per_worker: int
346+
pending_tasks: list[JobName]
347+
jobs: dict[JobName, JobRequirements]
348+
pending_task_rows: list[PendingTask]
349+
user_spend: dict[str, int]
350+
user_budget_limits: dict[str, int]
351+
requested_bands: dict[JobName, int]
352+
reserved_job_ids: frozenset[JobName]
353+
reservation_entry_counts: dict[JobName, int]
354+
user_budget_defaults: UserBudgetDefaults
355+
356+
# Derived from ``workers`` in __post_init__.
357+
capacities: dict[WorkerId, WorkerCapacity] = field(init=False)
358+
index: ConstraintIndex = field(init=False)
359+
_str_to_wid: dict[str, WorkerId] = field(init=False)
360+
361+
# Per-cycle mutable state; always starts empty.
362+
assignment_counts: dict[WorkerId, int] = field(init=False)
360363

361364
# Scores memoized per (worker, soft-constraints) tuple; worker attributes are
362365
# stable within a tick so the same pair always yields the same score.
363-
_soft_score_cache: dict[tuple[WorkerId, tuple[Constraint, ...]], int] = field(default_factory=dict)
364-
365-
@property
366-
def all_worker_ids(self) -> set[WorkerId]:
367-
return {self._str_to_wid[s] for s in self.index._all_ids}
366+
_soft_score_cache: dict[tuple[WorkerId, tuple[Constraint, ...]], int] = field(init=False)
368367

369-
@classmethod
370-
def from_workers(
371-
cls,
372-
workers: list[WorkerSnapshot],
373-
building_counts: dict[WorkerId, int] | None = None,
374-
max_building_tasks: int = DEFAULT_MAX_BUILDING_TASKS_PER_WORKER,
375-
pending_tasks: list[JobName] | None = None,
376-
jobs: dict[JobName, JobRequirements] | None = None,
377-
max_assignments_per_worker: int = DEFAULT_MAX_ASSIGNMENTS_PER_WORKER,
378-
pending_task_rows: list[PendingTask] | None = None,
379-
user_spend: dict[str, int] | None = None,
380-
user_budget_limits: dict[str, int] | None = None,
381-
requested_bands: dict[JobName, int] | None = None,
382-
reserved_job_ids: frozenset[JobName] | None = None,
383-
reservation_entry_counts: dict[JobName, int] | None = None,
384-
user_budget_defaults: UserBudgetDefaults | None = None,
385-
) -> "SchedulingContext":
386-
"""Build scheduling context from worker list.
387-
388-
Creates capacity snapshots and a ConstraintIndex for fast attribute matching.
389-
"""
390-
building_counts = building_counts or {}
391-
392-
capacities = {
368+
def __post_init__(self) -> None:
369+
self.capacities = {
393370
w.worker_id: WorkerCapacity.from_worker(
394371
w,
395-
building_count=building_counts.get(w.worker_id, 0),
396-
max_building_tasks=max_building_tasks,
372+
building_count=self.building_counts.get(w.worker_id, 0),
373+
max_building_tasks=self.max_building_tasks,
397374
)
398-
for w in workers
375+
for w in self.workers
399376
}
400-
401377
str_to_wid: dict[str, WorkerId] = {}
402378
entity_attrs: dict[str, dict[str, AttributeValue]] = {}
403-
for wid, cap in capacities.items():
379+
for wid, cap in self.capacities.items():
404380
key = str(wid)
405381
str_to_wid[key] = wid
406382
entity_attrs[key] = dict(cap.attributes)
383+
self._str_to_wid = str_to_wid
384+
self.index = ConstraintIndex.build(entity_attrs)
385+
self.assignment_counts = {}
386+
self._soft_score_cache = {}
407387

408-
index = ConstraintIndex.build(entity_attrs)
409-
410-
return cls(
411-
index=index,
412-
capacities=capacities,
413-
_str_to_wid=str_to_wid,
414-
pending_tasks=pending_tasks or [],
415-
jobs=jobs or {},
416-
max_assignments_per_worker=max_assignments_per_worker,
417-
pending_task_rows=list(pending_task_rows or []),
418-
workers=list(workers),
419-
user_spend=dict(user_spend or {}),
420-
user_budget_limits=dict(user_budget_limits or {}),
421-
requested_bands=dict(requested_bands or {}),
422-
reserved_job_ids=frozenset(reserved_job_ids or ()),
423-
reservation_entry_counts=dict(reservation_entry_counts or {}),
424-
user_budget_defaults=user_budget_defaults,
388+
@property
389+
def all_worker_ids(self) -> set[WorkerId]:
390+
return {self._str_to_wid[s] for s in self.index._all_ids}
391+
392+
def evolve_with_workers(
393+
self,
394+
workers: list[WorkerSnapshot],
395+
jobs: dict[JobName, JobRequirements],
396+
building_counts: dict[WorkerId, int],
397+
max_building_tasks: int,
398+
) -> "SchedulingContext":
399+
"""Rebuild capacities/index for taint-injected workers.
400+
401+
Reuses all raw-read fields (``pending_task_rows``, ``user_spend``, etc.)
402+
verbatim. The caller supplies updated ``workers``/``jobs`` (e.g. after
403+
reservation taint injection) and fresh ``building_counts``. The
404+
returned context starts a fresh placement pass with empty
405+
``assignment_counts`` and an empty soft-score cache.
406+
"""
407+
return SchedulingContext(
408+
workers=workers,
409+
building_counts=building_counts,
410+
max_building_tasks=max_building_tasks,
411+
max_assignments_per_worker=self.max_assignments_per_worker,
412+
pending_tasks=self.pending_tasks,
413+
jobs=jobs,
414+
pending_task_rows=self.pending_task_rows,
415+
user_spend=self.user_spend,
416+
user_budget_limits=self.user_budget_limits,
417+
requested_bands=self.requested_bands,
418+
reserved_job_ids=self.reserved_job_ids,
419+
reservation_entry_counts=self.reservation_entry_counts,
420+
user_budget_defaults=self.user_budget_defaults,
425421
)
426422

427423
def matching_workers(self, constraints: Sequence[Constraint]) -> set[WorkerId]:
@@ -618,6 +614,11 @@ def __init__(
618614
):
619615
self._max_building_tasks_per_worker = max_building_tasks_per_worker
620616

617+
@property
618+
def max_building_tasks_per_worker(self) -> int:
619+
"""Per-worker BUILDING-state limit applied to fresh scheduling contexts."""
620+
return self._max_building_tasks_per_worker
621+
621622
def find_assignments(
622623
self,
623624
context: SchedulingContext,
@@ -816,35 +817,6 @@ def _group_soft_score(group_worker_ids: list[WorkerId]) -> int:
816817
)
817818
return None
818819

819-
def create_scheduling_context(
820-
self,
821-
workers: list[WorkerSnapshot],
822-
building_counts: dict[WorkerId, int] | None = None,
823-
pending_tasks: list[JobName] | None = None,
824-
jobs: dict[JobName, JobRequirements] | None = None,
825-
max_building_tasks: int | None = None,
826-
max_assignments_per_worker: int | None = None,
827-
) -> SchedulingContext:
828-
"""Create a scheduling context for the given workers.
829-
830-
Convenience wrapper for tests, diagnostics, and the autoscaler dry-run
831-
path. Does not populate the raw read fields (``pending_task_rows``,
832-
``user_spend``, etc.); use ``build_scheduling_context`` for the full
833-
scheduling-loop construction.
834-
"""
835-
limit = max_building_tasks if max_building_tasks is not None else self._max_building_tasks_per_worker
836-
assignments_limit = (
837-
max_assignments_per_worker if max_assignments_per_worker is not None else DEFAULT_MAX_ASSIGNMENTS_PER_WORKER
838-
)
839-
return SchedulingContext.from_workers(
840-
workers,
841-
building_counts=building_counts,
842-
max_building_tasks=limit,
843-
pending_tasks=pending_tasks,
844-
jobs=jobs,
845-
max_assignments_per_worker=assignments_limit,
846-
)
847-
848820
def get_job_scheduling_diagnostics(
849821
self,
850822
req: JobRequirements,

0 commit comments

Comments
 (0)