Skip to content

[finelog] Durable writes, async server, and catalog/append refactors#5777

Open
rjpower wants to merge 3 commits into
mainfrom
rjpower/20260515-finelog-tweaks
Open

[finelog] Durable writes, async server, and catalog/append refactors#5777
rjpower wants to merge 3 commits into
mainfrom
rjpower/20260515-finelog-tweaks

Conversation

@rjpower
Copy link
Copy Markdown
Collaborator

@rjpower rjpower commented May 15, 2026

Make WriteRows/PushLogs block until rows are persisted to an L0 parquet segment via a max_persisted_seq cursor polled on the event loop; the server moves to ASGI handlers and drops the flush interval to 5s. Bundles the prior catalog refactor (live namespace registry lifted into Catalog, leaf types extracted to break the catalog/namespace import cycle) and append-path slim down (AlignedBatch replaces RecordBatch repack). Adds a SlowRpcInterceptor on top of rigging.slow_log.

@rjpower rjpower added the agent-generated Created by automation/agent label May 15, 2026
@claude
Copy link
Copy Markdown
Contributor

claude Bot commented May 15, 2026

Claude finished @rjpower's task in 7m 18s —— View job


PR Review complete

  • Read github-pr-review skill instructions
  • Gather PR diff and changed files
  • Audit catalog/duckdb_store lock refactor
  • Audit log_namespace/types extraction & AlignedBatch path
  • Audit test changes
  • Post review

Verdict: No issues found. Checked for bugs and CLAUDE.md/AGENTS.md compliance.

Highlights of what I verified:

  • Catalog RLock correctly admits same-thread reentry from the namespace constructor (which calls list_segments / upsert_segment) while serializing the freshly-started bg thread on the foreign side.
  • register_or_evolve ordering (factory → upsert → publish) is safe: no FK on segments.namespace, and the bg thread is gated by the catalog lock until publish completes.
  • _stamp_seq_and_build fast path holds because merge_schemas appends extras (existing column order is preserved), and the slow-path NULL-fill only fires under benign mid-flight additive evolution.
  • TOCTOU window in write_rows during drop_table is intentional — pinned by test_drop_during_concurrent_write_is_safe (writes that race a successful drop are documented to evaporate).
  • L0 sort removal is sound: rows arrive seq-monotonic under the per-namespace _insertion_lock, _SealedBuffer.{min,max}_seq are still computed via pc.min/max, and the L0→L1 COPY sorts by (key_column, seq).
  • ram_bytes / chunk_count now take the per-namespace lock; the bg loop intentionally uses the inner self._buffers.* methods to avoid self-deadlock — the in-code comment calls that out.
  • list_namespaces was removed from DuckDBLogStore; the only remaining call (stats_service.list_namespaces RPC) routes through list_namespaces_with_stats, which is retained.
  • _upload switch to fs.put_file plus makedirs is fine: _sync_step gates on self._remote_namespace_dir before reaching _upload.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 9bea281eac

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +412 to +415
ns = self.catalog.require_live(name)
schema = ns.schema
aligned = validate_and_align_batch(batch, schema)
ns.append_aligned_batch(aligned)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Synchronize schema updates with write_rows append

This split write_rows path now snapshots ns.schema, validates, and appends later without any shared lock with register_table/update_schema, so a concurrent additive schema evolution can race the append and cause rows to be stamped with an older Arrow schema, silently dropping values for newly added columns instead of persisting them. The issue appears when register_table updates a live namespace while writes are in flight: validation succeeds against one schema version, but append_aligned_batch may project with another.

Useful? React with 👍 / 👎.

@claude
Copy link
Copy Markdown
Contributor

claude Bot commented May 15, 2026

Code review

No issues found. Checked for bugs and CLAUDE.md compliance.

PushLogs and WriteRows now block until the rows land in an L0 parquet
segment so callers never observe success on data that a crash could lose.
Each namespace exposes a max_persisted_seq cursor advanced by the bg
flush thread; the now-async ASGI handlers poll it with asyncio.sleep so
the wait parks a coroutine instead of a threadpool worker. Default flush
interval drops from 60s to 5s and every append wakes the bg loop, so the
typical persistence wait is well under one second.

Adds a SlowRpcInterceptor (delegates to rigging.log_setup.slow_log)
wired into both LogService and StatsService so query/write_rows that
exceed the threshold are logged once.

Also includes pre-existing branch tweaks: drop the Docker --cpus cap so
finelog isn't subject to CFS bandwidth parking spikes, and tests for the
ConnectionPool query-timeout watchdog.
@rjpower rjpower changed the title [finelog] Move live namespace registry into Catalog and slim the append path [finelog] Durable writes, async server, and catalog/append refactors May 15, 2026
@rjpower
Copy link
Copy Markdown
Collaborator Author

rjpower commented May 15, 2026

🤖 Specification (this PR is >500 LOC, per the pull-request skill)

Problem

  • Writes returned as soon as bytes landed in the in-RAM chunk list of DiskLogNamespace.append_aligned_batch (lib/finelog/src/finelog/store/log_namespace.py). The bg loop then flushed to L0 parquet up to flush_interval_sec later (default 60s) or when the buffer crossed segment_target_bytes. A crash in that window lost unflushed rows; callers had no way to know whether their write survived.
  • The server mounted the connectrpc WSGI app via Starlette's WSGIMiddleware. Sync handlers meant every in-flight request occupied a threadpool worker; a per-write durability wait would have parked one thread per caller.
  • The live namespace registry, registration ordinal, and in-flight drop set lived on DuckDBLogStore behind two locks, so register-or-evolve straddled two critical sections. The validate path repacked into a fresh RecordBatch for byte accounting.
  • Slow-RPC detection was reimplemented in iris's RequestTimingInterceptor; finelog had no equivalent, so slow queries went unlogged.

Approach

  • Catalog/types refactor (lib/finelog/src/finelog/store/types.py, lib/finelog/src/finelog/store/catalog.py): SegmentLocation, SegmentRow, NamespaceStats, LocalSegment, and LogNamespaceProtocol moved into the leaf types module to break the catalog↔namespace import cycle structurally. Live LogNamespaceProtocol dict + _dropping set now live on Catalog under one RLock. register_or_evolve is a single critical section.
  • Append path (lib/finelog/src/finelog/store/schema.py, duckdb_store.py): validate_and_align_batch returns a typed AlignedBatch(arrays, fields, num_rows, byte_size); the namespace consumes it directly so the WriteRows path skips the RecordBatch repack and gets a monotone byte size for flush accounting.
  • Durable writes (store/log_namespace.py, store/duckdb_store.py, server/persistence_wait.py):
    • Each namespace tracks _max_persisted_seq: int (atomic int, written under _flush_lock after _write_new_segment succeeds). append_aligned_batch / append_log_batch return the last_seq they allocated. MemoryLogNamespace treats every append as immediately persisted.
    • DEFAULT_FLUSH_INTERVAL_SEC drops from 60.0 to 5.0; every non-empty append now calls _wake.set(), and the bg loop treats an append-driven wake as a flush trigger so writers see L0 within one turnaround.
    • await_persisted() (server/persistence_wait.py) polls the cursor with asyncio.sleep(0.05). A negative target_seq (empty append) returns immediately. Timeout raises ConnectError(DEADLINE_EXCEEDED); buffered rows stay in RAM and the next successful flush picks them up.
  • Async server (server/service.py, server/stats_service.py, server/asgi.py, server/interceptors.py): handlers are async def. CPU/IO-bound store calls go through asyncio.to_thread so the loop never blocks. The build helper mounts LogServiceASGIApplication / StatsServiceASGIApplication directly (no WSGIMiddleware). ConcurrencyLimitInterceptor.intercept_unary wraps the blocking _acquire in asyncio.to_thread.
  • Slow-RPC (server/interceptors.py): SlowRpcInterceptor delegates to rigging.log_setup.slow_log, per-method thresholds with a default of 7000ms (chosen above the durable-write floor). Wired into both service chains via build_log_server_asgi(slow_rpc_threshold_ms=...).
  • iris consumer (lib/iris/src/iris/cluster/controller/dashboard.py): switched the mounted finelog stubs from WSGI to ASGI; back-compat path aliases /FetchLogs and /PushLogs registered via pre-resolution of _resolved_endpoints.
  • Deploy tweak (lib/finelog/src/finelog/deploy/bootstrap.py): drop the Docker --cpus cap because CFS bandwidth parking caused multi-hundred-ms spikes on parquet encode.

Key code

Cursor advance in _flush_step:

# log_namespace.py
try:
    self._write_new_segment(visible)
except Exception:
    self._buffers.restore_flush()
    return
# Atomic int write; service handlers polling max_persisted_seq() will
# observe this on their next tick.
self._max_persisted_seq = visible.max_seq

Async waiter:

# server/persistence_wait.py
async def await_persisted(log_store, namespace, target_seq, *, timeout=30.0,
                          poll_interval=0.05):
    if target_seq < 0:
        return
    deadline = time.monotonic() + timeout
    while log_store.max_persisted_seq(namespace) < target_seq:
        if time.monotonic() >= deadline:
            raise ConnectError(Code.DEADLINE_EXCEEDED, ...)
        await asyncio.sleep(poll_interval)

Wake-driven flush in _bg_loop:

# log_namespace.py
woken_by_append = self._wake.wait(timeout=min(self._flush_rl.time_until_next(), 1.0))
self._wake.clear()
writer_pending = woken_by_append and chunk_count > 0
if force_drain or writer_pending:
    self._flush_step()

Tests

  • tests/test_durable_writes.py (new, 6 cases): cursor advances after flush, memory-mode immediate persistence, async waiter early-return on empty / already-persisted, timeout maps to DEADLINE_EXCEEDED with rows still buffered, concurrent writers coalesce into fewer flushes than writers.
  • tests/test_interceptors.py (new, 9 cases incl. parameterized over Query/WriteRows/PushLogs/FetchLogs): slow-vs-fast paths, per-method override, default fallback, threshold=0 disables, async path.
  • Updated tests/test_write_rows.py, tests/test_concurrency.py for the new (rows_written, last_seq) return tuple.
  • Updated tests/test_server.py to await original_* and hop off the loop with asyncio.to_thread(release.wait, ...) so concurrent-cap tests work under ASGI.

Total: 244 tests pass in ~3s in lib/finelog/. iris dashboard/controller still imports cleanly.

@rjpower rjpower requested a review from yonromai May 15, 2026 23:13
@rjpower rjpower enabled auto-merge (squash) May 15, 2026 23:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

agent-generated Created by automation/agent

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant