|
| 1 | +# Iris: Introduce Stores Layer Between `transitions.py` and `db.py` |
| 2 | + |
| 3 | +## Context |
| 4 | + |
| 5 | +`lib/iris/src/iris/cluster/controller/transitions.py` has grown to ~3,350 lines containing ~174 inline SQL queries that operate directly against `ControllerDB`. SQL and domain logic are tangled, there's no typed API boundary around the DB, and write-through caches (like `EndpointRegistry`) live either inside `db.py` or floating beside `transitions.py`. |
| 6 | + |
| 7 | +Goal: introduce a **stores layer** so the dependency chain is: |
| 8 | + |
| 9 | +``` |
| 10 | +db.py — connections, migrations, transaction/snapshot context managers, no schema knowledge |
| 11 | +schema.py — table DDL, row dataclasses, projections (unchanged) |
| 12 | +stores.py — depends on { db, schema }; typed per-entity stores + ControllerStore wrapper |
| 13 | +transitions.py — depends on stores; NO direct db.py SQL |
| 14 | +``` |
| 15 | + |
| 16 | +All store operations take a transaction (read or write) explicitly, e.g. `JobStore.list_task_attempts(tx, job_id)`. Stores own their write-through caches (e.g. `EndpointStore` absorbs today's `EndpointRegistry`). |
| 17 | + |
| 18 | +This refactor is **low-risk and phased** — we do not try to migrate all 174 queries at once. Phase 1 scaffolds the layer and moves `EndpointRegistry` (the natural first candidate, since it's already a write-through cache). Subsequent phases migrate one entity at a time. |
| 19 | + |
| 20 | +## Current State (verified during exploration) |
| 21 | + |
| 22 | +- **DB access**: `ControllerDB.transaction()` yields `TransactionCursor`, `ControllerDB.read_snapshot()` yields `QuerySnapshot` (from a 32-conn pool). Post-commit hooks via `cur.on_commit(fn)` already underpin cache coherence. |
| 23 | +- **Existing typed rows/projections in [schema.py](lib/iris/src/iris/cluster/controller/schema.py)**: `JobRow`, `JobSchedulingRow`, `JobDetailRow`, `TaskRow`, `TaskDetailRow`, `WorkerRow`, `WorkerDetailRow`, `AttemptRow`, `EndpointRow`, `ApiKeyRow`, `UserBudgetRow`, with matching `*_PROJECTION` objects. These are the return types we'll lean on. |
| 24 | +- **Existing write-through cache**: [`EndpointRegistry`](lib/iris/src/iris/cluster/controller/endpoint_registry.py:43) — loads all rows at init, mutates memory in `cur.on_commit(...)` hooks. This is exactly the pattern stores will use. |
| 25 | +- **Existing in-DB cache**: `ControllerDB._attr_cache` ([db.py:321](lib/iris/src/iris/cluster/controller/db.py:321)) is a worker-attribute map that belongs semantically in `WorkerStore`. Still actively used by `healthy_active_workers_with_attributes` ([db.py:908](lib/iris/src/iris/cluster/controller/db.py:908)) from multiple controller.py call sites and written from transitions.py on worker register/remove — stays put in Phase 1, relocates in Phase 5. |
| 26 | +- **SQL entities in transitions.py** (count of queries per entity): jobs (~20), tasks (~35), task_attempts (~12), workers (~15), dispatch_queue (~5), endpoints (delegated to EndpointRegistry), task_resource_history (~6), worker_resource_history / worker_task_history (~6), reservation_claims (~2), meta (~3), users / user_budgets (~2). Full inventory captured during exploration. |
| 27 | + |
| 28 | +## Design |
| 29 | + |
| 30 | +### New file: `lib/iris/src/iris/cluster/controller/stores.py` |
| 31 | + |
| 32 | +```python |
| 33 | +from iris.cluster.controller.db import ControllerDB, TransactionCursor, QuerySnapshot |
| 34 | +from iris.cluster.controller.schema import JobRow, TaskRow, WorkerRow, AttemptRow, EndpointRow, ... |
| 35 | + |
| 36 | +# Type used by read methods that accept either a write cursor or a read snapshot. |
| 37 | +# Writes require TransactionCursor explicitly. |
| 38 | +Tx = TransactionCursor | QuerySnapshot |
| 39 | + |
| 40 | + |
| 41 | +class JobStore: |
| 42 | + def __init__(self, db: ControllerDB) -> None: |
| 43 | + self._db = db # opaque handle for the rare case a store needs the connection itself |
| 44 | + |
| 45 | + # reads |
| 46 | + def get(self, tx: Tx, job_id: JobName) -> JobRow | None: ... |
| 47 | + def get_config(self, tx: Tx, job_id: JobName) -> JobConfigRow | None: ... |
| 48 | + def list_descendants(self, tx: Tx, job_id: JobName) -> list[JobName]: ... |
| 49 | + def list_terminal_ids(self, tx: Tx) -> list[JobName]: ... |
| 50 | + |
| 51 | + # writes |
| 52 | + def insert(self, tx: TransactionCursor, job: JobInsert) -> None: ... |
| 53 | + def update_state(self, tx: TransactionCursor, job_id: JobName, state: JobState, |
| 54 | + error: str | None, finished_at_ms: int | None) -> None: ... |
| 55 | + def delete(self, tx: TransactionCursor, job_id: JobName) -> None: ... |
| 56 | + |
| 57 | + |
| 58 | +class TaskStore: ... # tasks table + task_resource_history |
| 59 | +class TaskAttemptStore: ... # task_attempts |
| 60 | +class WorkerStore: ... # workers + worker_attributes (+ the attr cache currently in db.py) |
| 61 | +class EndpointStore: ... # former EndpointRegistry, renamed and relocated |
| 62 | +class DispatchQueueStore: ... # dispatch_queue |
| 63 | +class ReservationStore: ... # reservation_claims + meta(last_submission_ms) |
| 64 | + |
| 65 | + |
| 66 | +class ControllerStore: |
| 67 | + """Bundle of per-entity stores with direct access to transactions/snapshots.""" |
| 68 | + def __init__(self, db: ControllerDB) -> None: |
| 69 | + self._db = db |
| 70 | + self.jobs = JobStore(db) |
| 71 | + self.tasks = TaskStore(db) |
| 72 | + self.attempts = TaskAttemptStore(db) |
| 73 | + self.workers = WorkerStore(db) |
| 74 | + self.endpoints = EndpointStore(db) |
| 75 | + self.dispatch = DispatchQueueStore(db) |
| 76 | + self.reservations = ReservationStore(db) |
| 77 | + |
| 78 | + def transaction(self): return self._db.transaction() |
| 79 | + def read_snapshot(self): return self._db.read_snapshot() |
| 80 | +``` |
| 81 | + |
| 82 | +### Transaction rule |
| 83 | + |
| 84 | +- **Reads**: accept `Tx = TransactionCursor | QuerySnapshot`. Store methods internally call `tx.fetchall(...)` / `tx.fetchone(...)`. (Both types already expose these; where signatures diverge, the store normalizes.) |
| 85 | +- **Writes**: require `TransactionCursor` specifically. Static typing enforces the invariant. |
| 86 | +- **No store method opens its own transaction.** Callers are responsible for transaction scope. This matches today's pattern in `transitions.py` and keeps batching/atomicity in caller control. |
| 87 | + |
| 88 | +### Validation in stores |
| 89 | + |
| 90 | +Light and unambitious for phase 1: |
| 91 | +- Reject writes with impossible state combinations (e.g., `update_state` asserting the new state is in `JobState`). |
| 92 | +- Decode rows into the existing `*Row` dataclasses at the boundary — callers never see `sqlite3.Row`. |
| 93 | +- No business rules (retry counts, cascade logic) — those stay in `transitions.py`. |
| 94 | + |
| 95 | +### EndpointStore (rename of EndpointRegistry) |
| 96 | + |
| 97 | +Move [`endpoint_registry.py`](lib/iris/src/iris/cluster/controller/endpoint_registry.py) → `EndpointStore` inside `stores.py`. Semantically identical: write-through cache keyed by id/name/task with post-commit hooks. |
| 98 | + |
| 99 | +- Replace `db.endpoints` accessor in [db.py:334](lib/iris/src/iris/cluster/controller/db.py:334) with `store.endpoints`. |
| 100 | +- Delete `endpoint_registry.py`; migrate its test file [test_endpoint_registry.py](lib/iris/tests/cluster/controller/test_endpoint_registry.py) to construct `EndpointStore` directly. |
| 101 | + |
| 102 | +### transitions.py integration |
| 103 | + |
| 104 | +Change the `ControllerTransitions` constructor: |
| 105 | + |
| 106 | +```python |
| 107 | +# before |
| 108 | +def __init__(self, db: ControllerDB, ...): self._db = db |
| 109 | + |
| 110 | +# after |
| 111 | +def __init__(self, store: ControllerStore, ...): |
| 112 | + self._store = store |
| 113 | + self._db = store._db # phased: kept only while unmigrated queries remain |
| 114 | +``` |
| 115 | + |
| 116 | +The `self._db` escape hatch exists **only during the phased migration** and is deleted at the end. This lets each phase move a subset of queries without breaking the file. |
| 117 | + |
| 118 | +## Phasing |
| 119 | + |
| 120 | +Low-risk means small, verifiable PRs. Proposed sequence: |
| 121 | + |
| 122 | +**Phase 1 (this PR) — scaffolding + EndpointStore** |
| 123 | +1. Create `stores.py` with empty `JobStore`, `TaskStore`, `TaskAttemptStore`, `WorkerStore`, `DispatchQueueStore`, `ReservationStore` skeletons. |
| 124 | +2. Fold `EndpointRegistry` into `stores.py` as `EndpointStore`. |
| 125 | +3. Add `ControllerStore`; instantiate in [controller.py:~1035](lib/iris/src/iris/cluster/controller/controller.py:1035) and pass to `ControllerTransitions`. |
| 126 | +4. Update transitions.py to take `store: ControllerStore`; route endpoint calls through `self._store.endpoints`; keep `self._db` as temporary escape hatch. |
| 127 | +5. Update tests: `make_controller_state` in [conftest.py:186](lib/iris/tests/cluster/controller/conftest.py:186) constructs `ControllerStore`; update [test_endpoint_registry.py](lib/iris/tests/cluster/controller/test_endpoint_registry.py). |
| 128 | + |
| 129 | +**Phase 2 — JobStore migration** |
| 130 | +Move the ~20 jobs/job_config/users/user_budgets queries from transitions.py into `JobStore`/`ReservationStore`. Prefer one method per call site; collapse duplicates only when obvious. |
| 131 | + |
| 132 | +**Phase 3 — TaskStore migration** |
| 133 | +Move ~35 task and task_resource_history queries. |
| 134 | + |
| 135 | +**Phase 4 — TaskAttemptStore migration** |
| 136 | +Move ~12 task_attempts queries. |
| 137 | + |
| 138 | +**Phase 5 — WorkerStore migration** |
| 139 | +Move ~15 worker queries + `worker_attributes` + `_attr_cache` from `ControllerDB` into `WorkerStore`. Remove `_attr_cache` / `get_worker_attributes` / `set_worker_attributes` / `remove_worker_from_attr_cache` from [db.py](lib/iris/src/iris/cluster/controller/db.py:338). |
| 140 | + |
| 141 | +**Phase 6 — Cleanup** |
| 142 | +DispatchQueueStore + ReservationStore remaining queries. Drop `self._db` escape hatch on `ControllerTransitions`. Confirm transitions.py has zero `self._db.transaction()` / `self._db.fetchone()` calls — only `self._store.transaction()` + store method calls. |
| 143 | + |
| 144 | +Out of scope for now: [service.py](lib/iris/src/iris/cluster/controller/service.py) (47 queries), [controller.py](lib/iris/src/iris/cluster/controller/controller.py) (22), [checkpoint.py](lib/iris/src/iris/cluster/controller/checkpoint.py), autoscaler. They keep using `ControllerDB` directly; the layering rule "transitions → stores, never db" is a per-file invariant, not global. We can migrate them later if the pattern proves out. |
| 145 | + |
| 146 | +## Files to Modify (Phase 1 only) |
| 147 | + |
| 148 | +- **New**: `lib/iris/src/iris/cluster/controller/stores.py` |
| 149 | +- **Delete**: `lib/iris/src/iris/cluster/controller/endpoint_registry.py` (folded into stores.py as `EndpointStore`) |
| 150 | +- **Edit**: [lib/iris/src/iris/cluster/controller/db.py](lib/iris/src/iris/cluster/controller/db.py) — remove `endpoints` property and `EndpointRegistry` import (lines ~327-334); `_attr_cache` stays for now (moves in Phase 5). |
| 151 | +- **Edit**: [lib/iris/src/iris/cluster/controller/transitions.py](lib/iris/src/iris/cluster/controller/transitions.py) — constructor signature, route `add_endpoint`/`remove_endpoint`/`delete_task_endpoints` through `self._store.endpoints`. |
| 152 | +- **Edit**: [lib/iris/src/iris/cluster/controller/controller.py:~1035](lib/iris/src/iris/cluster/controller/controller.py:1035) — construct `ControllerStore(db)`, pass to transitions. |
| 153 | +- **Edit**: [lib/iris/src/iris/cluster/controller/service.py:1683](lib/iris/src/iris/cluster/controller/service.py:1683) — replace `db.endpoints.query(...)` with `store.endpoints.query(...)` (or keep going through db if service still takes `db` — decide based on simplest diff). |
| 154 | +- **Edit**: [lib/iris/tests/cluster/controller/conftest.py:186](lib/iris/tests/cluster/controller/conftest.py:186) — `make_controller_state` wires `ControllerStore`. |
| 155 | +- **Edit**: [lib/iris/tests/cluster/controller/test_endpoint_registry.py](lib/iris/tests/cluster/controller/test_endpoint_registry.py) — rename or retarget to `EndpointStore`. |
| 156 | + |
| 157 | +## Reuse Notes |
| 158 | + |
| 159 | +- Row dataclasses + projections in [schema.py](lib/iris/src/iris/cluster/controller/schema.py) are the return types — do not invent parallel types. |
| 160 | +- Post-commit hook pattern (`cur.on_commit(fn)`) already powers `EndpointRegistry` — reuse it verbatim for any write-through caches inside stores. |
| 161 | +- `ProtoCache` ([schema.py:33](lib/iris/src/iris/cluster/controller/schema.py:33)) remains where it is; stores don't need to touch it. |
| 162 | +- Predicate helpers (`task_is_finished`, `attempt_is_terminal`, etc. in [db.py:111-185](lib/iris/src/iris/cluster/controller/db.py:111)) stay as top-level functions; stores can call them when validating. |
| 163 | + |
| 164 | +## Verification |
| 165 | + |
| 166 | +**Phase 1:** |
| 167 | +- `uv run pytest lib/iris/tests/cluster/controller/test_endpoint_registry.py` — existing behavior preserved under new class name. |
| 168 | +- `uv run pytest lib/iris/tests/cluster/controller/test_transitions.py` — endpoint paths through `ControllerTransitions` work. |
| 169 | +- `uv run pytest lib/iris/tests/cluster/controller/test_service.py` — RPC `list_endpoints` still serves from cache. |
| 170 | +- `uv run pytest lib/iris/tests/cluster/controller/` — full controller test suite green. |
| 171 | +- `./infra/pre-commit.py --all-files --fix` + `uv run pyrefly`. |
| 172 | + |
| 173 | +**Later phases (each phase):** |
| 174 | +- Same pytest targets — behavioral parity is the bar. |
| 175 | +- Spot-check that migrated SQL in the store matches original semantics (diff review). |
| 176 | +- Grep transitions.py after each phase: the count of `self._db.transaction()` / direct SQL strings should decrease monotonically. Final state: zero. |
0 commit comments