Skip to content

fix(dedup): serialize in-process Delta commits to unwedge dedup sweeps#68

Closed
tonyalaribe wants to merge 1 commit into
masterfrom
fix/serialize-delta-commits
Closed

fix(dedup): serialize in-process Delta commits to unwedge dedup sweeps#68
tonyalaribe wants to merge 1 commit into
masterfrom
fix/serialize-delta-commits

Conversation

@tonyalaribe

Copy link
Copy Markdown
Contributor

Problem (active prod outage driver)

dedup_partition's replace_where commit carries bare-string timestamp bounds — the only predicate form delta-rs can stringify. Whenever a flush append lands between dedup's snapshot and its commit, the OCC conflict checker re-evaluates that predicate via delta-kernel and errors (arrow_cast should have been simplified to cast), aborting the commit. On a busy table some append always interleaves, so every sweep fails after 4×15 retries — materializing the chunk, uploading parquet to R2, and abandoning it on each attempt, every 5 minutes, per probe-positive project.

Observed on prod today (image ae5a9ab):

  • 165 dedup OCC failure log lines in 20 minutes
  • RSS climbing to the 69.7GB memcg ceiling — 3 kernel OOM kills (16:17/17:22/19:32Z) plus health-check SIGKILLs → the known ~35min restart loop
  • the monoscope span-list query (... ORDER BY timestamp DESC LIMIT 501) hanging 60s+ behind the churn, vs <1s on a freshly restarted container
  • each crash replays WAL → re-mints duplicates → keeps every project probe-positive → self-sustaining

Fix

A process-wide delta_commit_lock (tokio Mutex) serializing the two in-process commit paths: flush appends (insert_batch write loop) and dedup replace_where. With commits unable to interleave, dedup's rebase sees no newer versions and the conflict checker — and its predicate-eval bug — never runs. The lock is dropped before retry backoff sleeps so peers don't queue behind a sleeping writer. Appends-vs-appends pay a short queue wait (sub-second commits vs the 10-min flush cadence).

Test

dedup_commits_despite_concurrent_appends: runs dedup_partition while a task commits appends continuously into the same partition date. Fails on master (dedup_partition write failed: ... arrow_cast should have been simplified), passes deterministically with the lock.

Full dedup_compaction_test suite + lib tests pass (test_batch_queue_under_load pre-existing failure on master).

dedup_partition's replace_where commit carries bare-string timestamp
bounds — the only predicate form delta-rs can stringify. Whenever a flush
append lands between dedup's snapshot and commit, the OCC checker
re-evaluates that predicate with delta-kernel and errors ('arrow_cast
should have been simplified to cast'), aborting the commit. On a busy
table some append always interleaves, so every sweep failed after 4x15
retries, materializing and uploading chunk parquet each attempt and
abandoning it — every 5 minutes, per probe-positive project. Observed in
prod as 165 OCC failures/20min, RSS climbing to the 69.7GB memcg ceiling
(3 kernel OOM kills today) and the ~35min restart loop; queries hung
behind the churn (60s+ for a 10-min-window LIMIT 501 that runs in <1s on
a fresh container).

Fix: a process-wide delta_commit_lock serializing flush appends and dedup
replace_where commits. With no commit able to interleave, the rebase sees
no newer versions and the conflict checker (and its predicate-eval bug)
never runs. Lock is dropped before retry backoff sleeps so peers don't
queue behind a sleeping writer.

Regression test runs dedup under continuous concurrent appends: fails on
master, deterministic pass with the lock.
@claude

claude Bot commented Jun 11, 2026

Copy link
Copy Markdown

Code Review — fix(dedup): serialize in-process Delta commits

Summary: The fix is sound — a per-Database tokio::sync::Mutex<()> acquired inside both the insert_batch and dedup_partition retry loops serializes all in-process Delta commits, eliminating the race window where delta-kernel's OCC checker evaluates the bare-string replace_where predicate against a concurrent append and errors. The regression test (dedup_commits_despite_concurrent_appends) correctly reproduces the failure and verifies the fix.


🔴 Critical — Accidental root-level file pollution

The diff adds two large files at the repository root that should not be there:

  • database.rs (5,767 lines) — a full copy of src/database.rs
  • dedup_compaction_test.rs (173 lines) — a full copy of tests/dedup_compaction_test.rs

These appear to be working copies accidentally committed (perhaps from a cp or editor save outside the src/ tree). They're already confirmed present on the current checkout. They must be removed before merge — they'll confuse cargo (it may attempt to compile them as orphan crates), pollute git log, and mislead future readers.

git rm database.rs dedup_compaction_test.rs

🟡 Moderate — Lock granularity serializes all tables, not just the racing pair

delta_commit_lock lives on the Database struct. Since a Database manages all tables for a project (or process), every commit to any table — otel_logs_and_spans, metrics, etc. — now queues behind every other commit, even when the tables are completely independent.

The OCC predicate bug is specific to dedup_partition's replace_where commits racing with flush appends to the same table. A per-table lock (e.g. Arc<Mutex<()>> stored per entry in unified_tables) would confine the serialization to the actual conflict scope and preserve cross-table write parallelism.

This matters most if the process manages several actively-written tables simultaneously.


🟢 Lock correctness — looks right

The critical detail is that commit_guard is acquired at the top of the while loop in insert_batch (not outside it), so every retry re-acquires the lock after the sleep. The drop(commit_guard) before tokio::time::sleep(...) is a deliberate optimization — peers don't queue behind a sleeping writer — but the next iteration immediately re-acquires before taking a new snapshot. The pattern in dedup_partition is identical: lock-per-attempt, drop-before-sleep. Both paths are consistent. ✓


🟡 Moderate — Test race assertion could be fragile

assert!(appended > 0, "appender must have raced at least one commit");

On a heavily-loaded CI runner with worker_threads = 4, dedup might complete before the spawned appender task gets scheduled for its first commit. The assertion then fails for the wrong reason (a scheduling artifact, not a correctness bug).

Consider a small pre-flight: wait until the appender has committed at least one row before starting dedup, or insert a tokio::task::yield_now().await after spawning the appender to give it a head-start before calling dedup_partition. Adding a tokio::time::sleep(Duration::from_millis(5)).await between spawning the appender and calling dedup is another pragmatic option.


🟢 Minor — Atomic ordering on the stop flag

stop.load(std::sync::atomic::Ordering::Relaxed)
stop.store(true, std::sync::atomic::Ordering::Relaxed)

Relaxed is technically safe here (the appender.await? provides the sequencing guarantee via the join), but using Release/Acquire makes the intent clearer and is the idiomatic choice for stop-flags in Rust.


🟢 Minor — Missing failure message on post-dedup assertion

assert_eq!(post[0].column(0).as_primitive::<Int64Type>().value(0), 1);

The earlier assertions in the same test pass a message string. This one doesn't, making CI failures harder to diagnose. Suggest:

assert_eq!(
    post[0].column(0).as_primitive::<Int64Type>().value(0), 1,
    "post-dedup: dup_id row should be collapsed to 1"
);

✅ Positives

  • The comment on the struct field is unusually thorough and directly explains the delta-kernel OCC bug — anyone debugging this in the future will understand immediately why the lock exists.
  • Dropping the lock before the backoff sleep (both paths) is the right call; holding a mutex across an exponential sleep is a classic anti-pattern.
  • The regression test structure (duplicate pair in a sealed partition + continuous append fire from a separate task + deterministic assertion on row count) is exactly the right way to gate this class of fix.
  • The dedup_commits_despite_concurrent_appends test correctly uses #[serial] and flavor = "multi_thread" — the serial attribute prevents other #[serial] tests from contaminating shared storage, and multi-thread gives the appender real concurrency.

Verdict: The core fix is correct and well-tested. The root-level file pollution is the only blocker — everything else is improvement feedback. Clean up those two stray files and this is good to merge.

@tonyalaribe

Copy link
Copy Markdown
Contributor Author

Closing as superseded. The identical delta_commit_lock serialization (struct field, init, both retry-loop guards) and the dedup_commits_despite_concurrent_appends regression test already landed on master via #69 (commit c28f2bf, "insert backpressure + 5-min buckets"). Both PRs were developed in parallel from ae5a9ab and converged on the same fix.

Rebasing this branch onto current master produces an empty diff (identical tree, zero net commits) — there is nothing left to merge. The fix is live in master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant