Skip to content

Commit 9bea281

Browse files
committed
More twweaks.
1 parent 89a1423 commit 9bea281

14 files changed

Lines changed: 269 additions & 185 deletions

lib/finelog/src/finelog/store/duckdb_store.py

Lines changed: 66 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,31 @@
11
# Copyright The Marin Authors
22
# SPDX-License-Identifier: Apache-2.0
33

4-
"""Namespace registry over the DuckDB-backed log store.
5-
6-
:class:`DuckDBLogStore` is a thin façade over :class:`Catalog`. The
7-
catalog owns every piece of namespace state — schemas, segment rows,
8-
the live ``LogNamespaceProtocol`` dict, and the in-flight drop set —
9-
behind a single mutex; the store routes RPCs through it. The
10-
query-visibility rwlock on the store is held across reads so compaction
11-
can't unlink files mid-scan. Per-namespace ``_insertion_lock``s on each
12-
namespace serialize append/flush/compaction for that namespace only;
13-
writes against different namespaces don't contend. Schemas persist in
14-
the catalog DB and are rehydrated on startup. The ``log`` namespace is
15-
upserted on first boot for back-compat with deployments whose registry
16-
DB pre-dates the namespace registry.
4+
"""Service-implementation layer above :class:`Catalog`.
5+
6+
:class:`DuckDBLogStore` is what the RPC handlers sit on top of. Catalog
7+
owns the namespace state — schemas, segment rows, the live
8+
``LogNamespaceProtocol`` dict, the in-flight drop set — behind a single
9+
mutex. This class owns the I/O machinery and the cross-component
10+
orchestration around it:
11+
12+
* The read-side DuckDB :class:`ConnectionPool` that backs ``query()``.
13+
* The query-visibility :class:`RWLock`, held in read across queries
14+
(DuckDB opens parquet files lazily during scan) and in write briefly
15+
during ``drop_table``'s unlink phase.
16+
* The wire-format pipeline for ``write_rows`` (Arrow IPC decode, size
17+
checks, schema validation).
18+
* The SQL-execution path in ``query`` (snapshot live namespaces, build
19+
DuckDB views over their parquet + RAM Arrow tables, run the user's
20+
SQL, tear views down).
21+
* The five-step ``drop_table`` sequence (Catalog ``begin_drop`` → bg
22+
thread join → catalog row delete → rwlock-write + ``remove_local_storage``
23+
→ ``finish_drop``).
24+
* The factory that picks ``DiskLogNamespace`` vs ``MemoryLogNamespace``
25+
and wires their dependencies.
26+
27+
The ``log`` namespace is upserted on first boot for back-compat with
28+
deployments whose registry DB pre-dates the namespace registry.
1729
"""
1830

1931
from __future__ import annotations
@@ -203,14 +215,18 @@ def _validate_namespace_name(name: str, data_dir: Path | None) -> Path | None:
203215

204216

205217
class DuckDBLogStore:
206-
"""Namespace registry routing log + stats RPCs to per-namespace storage.
218+
"""RPC-handler-facing store wrapping :class:`Catalog` and namespaces.
207219
208-
Concurrency: :class:`Catalog` owns the registry mutex (covering
209-
schemas, segment rows, the live namespace dict, and the in-flight
210-
drop set). Per-namespace insertion mutexes live on each namespace
211-
and serialize append/flush/compaction for that namespace. The
212-
query-visibility rwlock is held across reads so compaction can't
213-
unlink files mid-scan.
220+
Catalog is the data-and-coordination layer (persistent rows + live
221+
registry under one mutex). This class wires it to the outside world:
222+
the read-side connection pool, the query-visibility rwlock, the IPC
223+
decoder, the namespace factory, and the multi-step drop sequence.
224+
See the module docstring for the full breakdown.
225+
226+
The ``catalog`` attribute is part of this class's public surface — RPC
227+
handlers and tests reach through it for live-registry introspection.
228+
Per-namespace insertion mutexes live on each namespace and serialize
229+
append/flush/compaction for that namespace.
214230
215231
Layout: per-namespace under ``{log_dir}/{name}/``; schema sidecar at
216232
``{log_dir}/_finelog_registry.duckdb``. ``log_dir=None`` selects
@@ -242,7 +258,7 @@ def __init__(
242258
threads=duckdb_threads,
243259
temp_directory=pool_tmp,
244260
)
245-
self._catalog = Catalog(self._data_dir)
261+
self.catalog = Catalog(self._data_dir)
246262

247263
# Disk-only kwargs; ignored by memory namespaces. ``duckdb_memory_limit``
248264
# in this dict feeds the per-namespace compaction connection in
@@ -277,24 +293,24 @@ def _make_namespace(self, name: str, schema: Schema, namespace_dir: Path | None)
277293
data_dir=namespace_dir,
278294
query_visibility_lock=self._query_visibility_lock,
279295
read_pool=self._pool,
280-
catalog=self._catalog,
296+
catalog=self.catalog,
281297
**self._disk_namespace_kwargs,
282298
)
283299

284300
def _rehydrate_from_registry(self) -> None:
285-
for name, schema in self._catalog.list_all().items():
301+
for name, schema in self.catalog.list_all().items():
286302
namespace_dir = self._namespace_dir(name)
287303
ns = self._make_namespace(name, schema, namespace_dir)
288-
self._catalog.insert_live(name, ns)
304+
self.catalog.insert_live(name, ns)
289305

290306
def _ensure_log_namespace_registered(self) -> None:
291307
"""First-boot fixup: materialize the ``log`` registry row if missing."""
292-
if LOG_NAMESPACE_NAME in self._catalog:
308+
if LOG_NAMESPACE_NAME in self.catalog:
293309
return
294310
log_dir = self._data_dir / LOG_NAMESPACE_DIR if self._data_dir is not None else None
295311
resolve_key_column(LOG_REGISTERED_SCHEMA)
296312
stored_schema = with_implicit_seq(LOG_REGISTERED_SCHEMA)
297-
self._catalog.register_or_evolve(
313+
self.catalog.register_or_evolve(
298314
LOG_NAMESPACE_NAME,
299315
stored_schema,
300316
lambda schema: self._make_namespace(LOG_NAMESPACE_NAME, schema, log_dir),
@@ -325,45 +341,42 @@ def on_existing(existing_ns: LogNamespaceProtocol) -> Schema:
325341
# merge_schemas raises SchemaConflictError on non-additive change.
326342
effective = merge_schemas(existing_ns.schema, stored_schema)
327343
if effective != existing_ns.schema:
328-
self._catalog.upsert(name, effective)
344+
self.catalog.upsert(name, effective)
329345
existing_ns.update_schema(effective)
330346
return effective
331347

332-
return self._catalog.register_or_evolve(
348+
return self.catalog.register_or_evolve(
333349
name,
334350
stored_schema,
335351
lambda effective_schema: self._make_namespace(name, effective_schema, namespace_dir),
336352
on_existing=on_existing,
337353
)
338354

339-
def list_namespaces(self) -> list[tuple[str, Schema]]:
340-
return [(name, ns.schema) for name, ns in self._catalog.snapshot_live()]
341-
342355
def list_namespaces_with_stats(self) -> list[tuple[str, Schema, NamespaceStats]]:
343-
"""Like :meth:`list_namespaces`, but also returns per-namespace stats.
356+
"""Return ``(name, schema, stats)`` for every live namespace.
344357
345-
Each entry's stats are read from the namespace's in-memory state,
346-
which is held in lockstep with the on-disk segment catalog. This is
347-
the read path that backs ``StatsService.ListNamespaces`` — the
348-
dashboard relies on it to render the namespace summary table without
349-
issuing per-namespace ``count(*)`` queries against parquet.
358+
Backs ``StatsService.ListNamespaces`` — the dashboard relies on
359+
it to render the summary table without issuing per-namespace
360+
``count(*)`` queries against parquet. Stats are read from the
361+
namespace's in-memory state (kept in lockstep with the on-disk
362+
segment catalog).
350363
"""
351364
# ns.stats() takes the per-namespace insertion lock; we want the
352365
# registry snapshot to release the catalog lock before any stats()
353366
# call so a slow namespace can't stall this call for every other.
354-
namespaces = self._catalog.snapshot_live()
367+
namespaces = self.catalog.snapshot_live()
355368
return [(name, ns.schema, ns.stats()) for name, ns in namespaces]
356369

357370
def get_table_schema(self, name: str) -> Schema:
358-
return self._catalog.require_live(name).schema
371+
return self.catalog.require_live(name).schema
359372

360373
def memory_summary(self) -> dict[str, int]:
361374
"""Aggregate ram_bytes / chunk_count across namespaces, for diagnostics.
362375
363376
Used by the periodic pool-diagnostics logger in the standalone server.
364377
``MemoryLogNamespace`` reports zeros (no in-RAM segmented buffer).
365378
"""
366-
namespaces = self._catalog.live_values()
379+
namespaces = self.catalog.live_values()
367380
total_ram_bytes = 0
368381
total_chunks = 0
369382
for ns in namespaces:
@@ -394,12 +407,12 @@ def write_rows(self, name: str, arrow_ipc_bytes: bytes) -> int:
394407
# validate_and_align_batch is the bulk of the CPU work per WriteRows,
395408
# and pinning it under the catalog lock would serialize every writer
396409
# across every namespace. Schema is monotonic-additive — if it
397-
# evolves between snapshot and append, ``_stamp_seq_column``
398-
# NULL-fills the new columns at projection time.
399-
ns = self._catalog.require_live(name)
410+
# evolves between snapshot and append, the stamp step in the
411+
# namespace NULL-fills the new columns at projection time.
412+
ns = self.catalog.require_live(name)
400413
schema = ns.schema
401414
aligned = validate_and_align_batch(batch, schema)
402-
ns.append_record_batch(aligned)
415+
ns.append_aligned_batch(aligned)
403416
return aligned.num_rows
404417

405418
def query(self, sql: str) -> pa.Table:
@@ -419,7 +432,7 @@ def query(self, sql: str) -> pa.Table:
419432
# Snapshot the live registry under the catalog lock so a
420433
# concurrent drop_table can't trigger "dictionary changed
421434
# size during iteration".
422-
ns_snapshot = self._catalog.snapshot_live()
435+
ns_snapshot = self.catalog.snapshot_live()
423436

424437
extra_registered: list[str] = []
425438
try:
@@ -479,26 +492,26 @@ def drop_table(self, name: str) -> None:
479492
if name == LOG_NAMESPACE_NAME:
480493
raise InvalidNamespaceError(f"namespace {name!r} is privileged and cannot be dropped via DropTable")
481494

482-
ns = self._catalog.begin_drop(name)
495+
ns = self.catalog.begin_drop(name)
483496
try:
484497
ns.stop_and_join()
485-
self._catalog.delete(name)
498+
self.catalog.delete(name)
486499

487500
self._query_visibility_lock.write_acquire()
488501
try:
489502
ns.remove_local_storage()
490503
finally:
491504
self._query_visibility_lock.write_release()
492505
finally:
493-
self._catalog.finish_drop(name)
506+
self.catalog.finish_drop(name)
494507

495508
def append(self, key: str, entries: list) -> None:
496509
if not entries:
497510
return
498511
self.append_batch([(key, entries)])
499512

500513
def append_batch(self, items: list[tuple[str, list]]) -> None:
501-
self._catalog[LOG_NAMESPACE_NAME].append_log_batch(items)
514+
self.catalog[LOG_NAMESPACE_NAME].append_log_batch(items)
502515

503516
def get_logs(
504517
self,
@@ -512,7 +525,7 @@ def get_logs(
512525
tail: bool = False,
513526
min_level: str = "",
514527
) -> LogReadResult:
515-
return self._catalog[LOG_NAMESPACE_NAME].get_logs(
528+
return self.catalog[LOG_NAMESPACE_NAME].get_logs(
516529
key,
517530
match_scope=match_scope,
518531
since_ms=since_ms,
@@ -533,16 +546,16 @@ def cursor(self, key: str):
533546
return LogCursor(self, key)
534547

535548
def close(self) -> None:
536-
for ns in self._catalog.live_values():
549+
for ns in self.catalog.live_values():
537550
ns.close()
538551
self._pool.close()
539-
self._catalog.close()
552+
self.catalog.close()
540553

541554
# Test hooks below; forward to the registered "log" namespace.
542555

543556
@property
544557
def _log_namespace(self) -> DiskLogNamespace:
545-
ns = self._catalog[LOG_NAMESPACE_NAME]
558+
ns = self.catalog[LOG_NAMESPACE_NAME]
546559
assert isinstance(ns, DiskLogNamespace), "test hook called on memory-mode store"
547560
return ns
548561

0 commit comments

Comments
 (0)