Skip to content

Commit 9323ac2

Browse files
committed
[iris] Introduce stores layer between transitions.py and db.py
Phase 1 of the transitions -> stores -> db refactor: add ControllerStore bundling typed per-entity stores and fold EndpointRegistry into EndpointStore. Remaining stores (Job, Task, TaskAttempt, Worker, DispatchQueue, Reservation) are skeletons to be filled in subsequent phases.
1 parent e10e140 commit 9323ac2

18 files changed

Lines changed: 294 additions & 169 deletions

lib/iris/scripts/benchmark_db_queries.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
_worker_roster,
7373
)
7474
from iris.cluster.controller.schema import EndpointRow
75+
from iris.cluster.controller.stores import ControllerStore
7576
from iris.cluster.controller.transitions import (
7677
Assignment,
7778
ControllerTransitions,
@@ -252,7 +253,8 @@ def benchmark_scheduling(db: ControllerDB) -> None:
252253

253254
# --- Write-path benchmarks (use a lightweight clone) ---
254255
write_db = clone_db(db)
255-
write_txns = ControllerTransitions(write_db)
256+
write_store = ControllerStore(write_db)
257+
write_txns = ControllerTransitions(store=write_store)
256258

257259
try:
258260
# queue_assignments: the main write-lock holder in scheduling.
@@ -542,7 +544,7 @@ def _all_workers_running_tasks():
542544

543545
bench("running_tasks_by_worker", lambda: running_tasks_by_worker(db, worker_ids))
544546

545-
transitions = ControllerTransitions(db)
547+
transitions = ControllerTransitions(store=ControllerStore(db))
546548
bench(
547549
f"get_running_tasks_for_poll ({len(workers)} workers)",
548550
lambda: transitions.get_running_tasks_for_poll(),
@@ -594,7 +596,7 @@ def _all_workers_running_tasks():
594596
)
595597

596598
hb_db = clone_db(db)
597-
hb_transitions = ControllerTransitions(hb_db)
599+
hb_transitions = ControllerTransitions(store=ControllerStore(hb_db))
598600

599601
try:
600602
bench(
@@ -739,14 +741,16 @@ def benchmark_endpoints(db: ControllerDB) -> None:
739741
contention (matches the production Register p95 of 3-4s)
740742
"""
741743
# Read-path queries run against the source DB (cheap, no clone needed).
742-
bench("endpoint_registry.query (all)", lambda: db.endpoints.query())
744+
read_store = ControllerStore(db)
745+
bench("endpoint_store.query (all)", lambda: read_store.endpoints.query())
743746
bench(
744-
"endpoint_registry.query (prefix)",
745-
lambda: db.endpoints.query(EndpointQuery(name_prefix="test")),
747+
"endpoint_store.query (prefix)",
748+
lambda: read_store.endpoints.query(EndpointQuery(name_prefix="test")),
746749
)
747750

748751
write_db = clone_db(db)
749-
write_txns = ControllerTransitions(write_db)
752+
write_store = ControllerStore(write_db)
753+
write_txns = ControllerTransitions(store=write_store)
750754

751755
try:
752756
sample = _active_task_sample(write_db, limit=300)
@@ -762,7 +766,7 @@ def _do_single():
762766

763767
def _reset_single():
764768
write_db.execute("DELETE FROM endpoints WHERE name LIKE '/bench/endpoint/%'")
765-
write_db.endpoints._load_all()
769+
write_store.endpoints._load_all()
766770

767771
bench("add_endpoint (1 write)", _do_single, reset=_reset_single)
768772

@@ -793,7 +797,7 @@ def _do_burst_per_txn(tasks=tasks_for_burst):
793797
def _do_burst_one_txn(tasks=tasks_for_burst):
794798
with write_db.transaction() as cur:
795799
for t in tasks:
796-
write_db.endpoints.add(cur, _make_endpoint(t))
800+
write_store.endpoints.add(cur, _make_endpoint(t))
797801

798802
bench(
799803
f"add_endpoint burst x{burst_n} (1 txn)",
@@ -1365,7 +1369,7 @@ def benchmark_apply_contention(db: ControllerDB) -> None:
13651369
]
13661370

13671371
write_db = clone_db(db)
1368-
write_txns = ControllerTransitions(write_db)
1372+
write_txns = ControllerTransitions(store=ControllerStore(write_db))
13691373
try:
13701374
for scenario in scenarios:
13711375
_run_apply_under_contention(

lib/iris/src/iris/cluster/controller/actor_proxy.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from starlette.requests import Request
2121
from starlette.responses import JSONResponse, Response
2222

23-
from iris.cluster.controller.db import ControllerDB
23+
from iris.cluster.controller.stores import ControllerStore
2424

2525
logger = logging.getLogger(__name__)
2626

@@ -46,10 +46,10 @@
4646

4747

4848
class ActorProxy:
49-
"""Forwards ActorService RPCs to actors resolved from the endpoint registry."""
49+
"""Forwards ActorService RPCs to actors resolved from the endpoint store."""
5050

51-
def __init__(self, db: ControllerDB):
52-
self._db = db
51+
def __init__(self, store: ControllerStore):
52+
self._store = store
5353
self._client = httpx.AsyncClient(timeout=PROXY_TIMEOUT_SECONDS)
5454

5555
async def close(self) -> None:
@@ -97,8 +97,8 @@ async def handle(self, request: Request) -> Response:
9797
)
9898

9999
def _resolve_endpoint(self, name: str) -> str | None:
100-
"""Resolve an endpoint name to an address via the in-memory registry."""
101-
row = self._db.endpoints.resolve(name)
100+
"""Resolve an endpoint name to an address via the in-memory store."""
101+
row = self._store.endpoints.resolve(name)
102102
if row is None:
103103
return None
104104
return row.address

lib/iris/src/iris/cluster/controller/controller.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
)
9292
from iris.cluster.controller.auth import ControllerAuth
9393
from iris.cluster.controller.service import ControllerServiceImpl
94+
from iris.cluster.controller.stores import ControllerStore
9495
from iris.cluster.controller.transitions import (
9596
RESERVATION_HOLDER_JOB_NAME,
9697
Assignment,
@@ -1035,6 +1036,7 @@ def __init__(
10351036
self._db = db
10361037
else:
10371038
self._db = ControllerDB(db_dir=config.local_state_dir / "db")
1039+
self._store = ControllerStore(self._db)
10381040

10391041
# ThreadContainer must be initialized before the log service setup
10401042
# because _start_local_log_server spawns a uvicorn thread.
@@ -1072,7 +1074,7 @@ def __init__(
10721074

10731075
self._health = WorkerHealthTracker()
10741076
self._transitions = ControllerTransitions(
1075-
db=self._db,
1077+
store=self._store,
10761078
user_budget_defaults=config.user_budget_defaults,
10771079
health=self._health,
10781080
)
@@ -1082,7 +1084,7 @@ def __init__(
10821084

10831085
self._service = ControllerServiceImpl(
10841086
self._transitions,
1085-
self._db,
1087+
self._store,
10861088
controller=self,
10871089
bundle_store=self._bundle_store,
10881090
log_service=self._remote_log_service,

lib/iris/src/iris/cluster/controller/dashboard.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ def _create_app(self) -> ASGIApp:
299299

300300
rpc_app = WSGIMiddleware(rpc_wsgi_app)
301301

302-
self._actor_proxy = ActorProxy(self._service._db)
302+
self._actor_proxy = ActorProxy(self._service._store)
303303

304304
@requires_auth
305305
async def _proxy_actor_rpc(request: Request) -> Response:

lib/iris/src/iris/cluster/controller/db.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ class TransactionCursor:
237237
238238
Post-commit hooks registered via :meth:`on_commit` run after the wrapping
239239
``ControllerDB.transaction()`` block commits successfully. They are used
240-
by caches (e.g. ``EndpointRegistry``) to update in-memory state atomically
240+
by caches (e.g. ``EndpointStore``) to update in-memory state atomically
241241
with the DB write: rollback suppresses the hook so memory never drifts
242242
from disk.
243243
"""
@@ -321,19 +321,14 @@ def __init__(self, db_dir: Path):
321321
self._attr_cache: dict[WorkerId, dict[str, AttributeValue]] | None = None
322322
self._attr_cache_lock = Lock()
323323

324-
# Write-through in-memory cache over the ``endpoints`` table. Imported
325-
# locally to break the ``db -> endpoint_registry -> db`` import cycle;
326-
# this is the single exception to "no local imports" (see AGENTS.md).
327-
from iris.cluster.controller.endpoint_registry import EndpointRegistry
324+
# Callables invoked at the end of ``replace_from`` so callers with
325+
# caches over DB contents (e.g. ``ControllerStore``) can reload them
326+
# after a checkpoint restore. Registered via ``register_reopen_hook``.
327+
self._reopen_hooks: list[Callable[[], None]] = []
328328

329-
t0 = time.monotonic()
330-
self._endpoint_registry = EndpointRegistry(self)
331-
logger.info("EndpointRegistry initialized in %.2fs", time.monotonic() - t0)
332-
333-
@property
334-
def endpoints(self) -> EndpointRegistry: # noqa: F821
335-
"""Process-local cache for the ``endpoints`` table; authoritative for reads."""
336-
return self._endpoint_registry
329+
def register_reopen_hook(self, hook: Callable[[], None]) -> None:
330+
"""Register a no-arg callable to run at the end of ``replace_from``."""
331+
self._reopen_hooks.append(hook)
337332

338333
def _populate_attr_cache(self) -> dict[WorkerId, dict[str, AttributeValue]]:
339334
"""Load all worker attributes from the DB into the cache.
@@ -454,7 +449,7 @@ def transaction(self):
454449
455450
On successful commit, any hooks registered via ``TransactionCursor.on_commit``
456451
fire while the write lock is still held — keeping in-memory caches
457-
(e.g. ``EndpointRegistry``) in sync with the DB without exposing a
452+
(e.g. ``EndpointStore``) in sync with the DB without exposing a
458453
torn snapshot to concurrent readers.
459454
"""
460455
with self._lock:
@@ -751,7 +746,8 @@ def replace_from(self, source_dir: str | Path) -> None:
751746
self._conn.execute("ATTACH DATABASE ? AS profiles", (str(self._profiles_db_path),))
752747
self._init_read_pool()
753748
self.apply_migrations()
754-
self._endpoint_registry._load_all()
749+
for hook in self._reopen_hooks:
750+
hook()
755751

756752
# SQL-canonical read access is exposed through ``snapshot()`` and typed table
757753
# metadata at module scope. Legacy list/get/count helper methods were removed

lib/iris/src/iris/cluster/controller/service.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
from iris.cluster.controller.query import execute_raw_query
8989
from iris.rpc import query_pb2
9090
from iris.cluster.controller.scheduler import SchedulingContext
91+
from iris.cluster.controller.stores import ControllerStore
9192
from iris.cluster.controller.transitions import (
9293
TASK_RESOURCE_HISTORY_RETENTION,
9394
ControllerTransitions,
@@ -958,15 +959,16 @@ class ControllerServiceImpl:
958959
def __init__(
959960
self,
960961
transitions: ControllerTransitions,
961-
db: ControllerDB,
962+
store: ControllerStore,
962963
controller: ControllerProtocol,
963964
bundle_store: BundleStore,
964965
log_service: LogServiceImpl | LogServiceProxy,
965966
auth: ControllerAuth | None = None,
966967
system_endpoints: dict[str, str] | None = None,
967968
):
968969
self._transitions = transitions
969-
self._db = db
970+
self._store = store
971+
self._db = store._db
970972
self._controller = controller
971973
self._bundle_store = bundle_store
972974
self._log_service = log_service
@@ -1680,7 +1682,7 @@ def list_endpoints(
16801682
if prefix.startswith("/system/"):
16811683
return self._list_system_endpoints(prefix, exact=request.exact)
16821684

1683-
endpoints = self._db.endpoints.query(
1685+
endpoints = self._store.endpoints.query(
16841686
EndpointQuery(
16851687
exact_name=prefix if request.exact else None,
16861688
name_prefix=None if request.exact else prefix,

0 commit comments

Comments
 (0)