Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions docs/rfcs/0012-compaction-correctness-gate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# RFC: Compaction Correctness Gate

- Status: Draft
- Authors: Tonbo team
- Created: 2026-02-03
- Area: Compaction, Read Path, Testing

## Summary

Define a correctness gate for compaction that validates read consistency and MVCC semantics before and after compaction. The gate uses a small reference oracle plus deterministic scenarios as a minimal phase, and provides a structure for later expansion into model-based randomized sequences and broader invariants.

## Motivation

Compaction rewrites physical layout and can silently break logical correctness. A dedicated gate is needed to prove that compaction does not change visible results for a fixed snapshot and does not resurrect deleted or superseded values. This RFC establishes the semantic contract, the minimal gate, and a path to expand coverage as the compaction pipeline evolves.

## Goals

- Specify compaction correctness invariants in terms of visible read results.
- Deliver a minimal, deterministic gate that is reproducible and debuggable.
- Provide an oracle model that aligns with Tonbo MVCC semantics.
- Define a path to expand from deterministic scenarios to model-based randomized tests.

## Non-Goals

- Performance benchmarking or tuning.
- CI gating or long-running fuzzing (added later once validated).
- Full coverage of crash recovery or manifest/GC interactions (follow-on work).
- Guaranteeing correctness for future range tombstones or remote compaction (follow-on work).

## Design

### Invariants

The correctness gate validates that **logical results are unchanged by compaction** for a fixed snapshot. Invariants are grouped by phase to allow a minimal starting point.

**Phase 0 (minimal gate):**

1. **Snapshot consistency**
For a fixed logical snapshot, reads before and after compaction return identical results.

2. **No resurrection + last-write-wins**
A key deleted (or overwritten) before a snapshot does not reappear, and the latest visible version at that snapshot is returned.

3. **Range completeness**
Range scans at a fixed snapshot return the same key set and values before and after compaction.

**Phase 1+ (expanded invariants):**

4. **Tombstone pruning safety**
Pruning tombstones does not expose older versions that should remain hidden.

5. **Iterator/seek stability**
Iterator ordering and seek semantics remain stable across compaction.

6. **Reopen + snapshot durability**
Snapshot consistency remains valid across DB reopen boundaries.

### Oracle Model

The gate uses a small in-memory MVCC oracle to represent logical truth:

- **State model**: key -> ordered versions. Each version stores `(commit_ts, tombstone, value)`.
- **Visibility**: for snapshot `ts`, the visible version is the highest `commit_ts <= ts`.
- **Tie-breaks**: delete wins on equal `commit_ts` (consistent with SST merge semantics).
- **Read semantics**:
- `get(key, ts)` returns the visible version or not-found.
- `scan(range, ts)` returns all visible keys in order.

The oracle does not model compaction mechanics; it models only logical results so the compaction pipeline can change freely as long as results match.

### Test Types

**Deterministic scenarios (Phase 0):**

| Scenario | Purpose | Invariants |
| --- | --- | --- |
| Overwrite chain | Validate last-write-wins at fixed snapshot | 1, 2 |
| Delete-heavy | Validate no resurrection | 1, 2 |
| Range scan with deletes | Validate range completeness | 1, 3 |
| Cross-segment overlap | Validate compaction rewrite invariance | 1, 2, 3 |

Each scenario:
1. Build a controlled write/delete sequence.
2. Take snapshot `ts`.
3. Read (get/scan) before compaction; compare to oracle.
4. Force compaction.
5. Read after compaction at the same `ts`; compare to oracle.

**Model-based randomized sequences (Phase 1+):**

- Generate sequences of operations: put, delete, flush, compact, get, scan.
- Use seeded RNG; log seed and operation trace for reproduction.
- Validate after each read, and at periodic pre/post compaction checkpoints.

### Comparison with Other Systems

- TigerBeetle uses a model-based fuzzer for its LSM tree and compares scan/get results against a reference model.
- LevelDB maintains a ModelDB (in-memory map) and compares iterator results at intervals and across snapshots.
- RocksDB stress tests maintain an expected-state oracle that verifies latest values across runs.
- Turso uses proptest and differential oracles to compare results against SQLite, plus explicit property checks in concurrency simulation.

Tonbo adopts the same principle: compare logical results to a minimal oracle, then expand via randomized sequences once the baseline gate is validated.

### Reproducibility and Debuggability

The gate must emit enough context to replay failures:

- RNG seed (if randomized).
- Operation trace (human-readable and machine-parsable).
- Snapshot timestamps used for assertions.
- Diff of expected vs actual results (keys and versions).

### Scope and Coverage

Initial coverage targets **minor compaction** and the current read path across mutable + immutable + SST layers. Major compaction, manifest/GC, and remote execution are validated in Phase 1+ after the minimal gate is stable.

## References

- [Model-based testing](https://en.wikipedia.org/wiki/Model-based_testing)
- [Property-based testing](https://en.wikipedia.org/wiki/Property_testing)
- [Differential testing](https://en.wikipedia.org/wiki/Differential_testing)
- [Test oracle](https://en.wikipedia.org/wiki/Test_oracle)

## Future Work

- Add tombstone pruning safety tests and MVCC watermark checks.
- Extend coverage to major compaction, manifest updates, and GC integration.
- Enable CI gating and nightly fuzzing once test stability is confirmed.
- Add crash/reopen validation as part of the gate.
- Add range tombstone semantics when supported.
10 changes: 8 additions & 2 deletions src/compaction/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,10 +686,16 @@ where
let mut sleep = runtime.sleep(interval).fuse();
futures::select_biased! {
_ = sleep => Some(CompactionTriggerReason::Periodic),
msg = tick_rx.next() => msg.map(|_| CompactionTriggerReason::Kick),
msg = tick_rx.next() => match msg {
Some(CompactionTrigger::Kick) => Some(CompactionTriggerReason::Kick),
Some(CompactionTrigger::Shutdown) | None => None,
},
}
} else {
tick_rx.next().await.map(|_| CompactionTriggerReason::Kick)
match tick_rx.next().await {
Some(CompactionTrigger::Kick) => Some(CompactionTriggerReason::Kick),
Some(CompactionTrigger::Shutdown) | None => None,
}
}
}

Expand Down
34 changes: 30 additions & 4 deletions src/compaction/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
use std::marker::PhantomData;

use fusio::executor::Executor;
#[cfg(test)]
use fusio::executor::JoinHandle;
#[cfg(test)]
use futures::SinkExt;
use futures::{channel::mpsc, future::AbortHandle};

/// Handle to a background compaction worker.
Expand All @@ -12,10 +16,12 @@ use futures::{channel::mpsc, future::AbortHandle};
#[derive(Debug)]
pub(crate) enum CompactionTrigger {
Kick,
Shutdown,
}

pub(crate) struct CompactionHandle<E: Executor> {
abort: AbortHandle,
abort: Option<AbortHandle>,
join: Option<E::JoinHandle<()>>,
trigger: Option<mpsc::Sender<CompactionTrigger>>,
_marker: PhantomData<E>,
}
Expand All @@ -24,11 +30,12 @@ impl<E: Executor> CompactionHandle<E> {
/// Create a new compaction handle.
pub(crate) fn new(
abort: AbortHandle,
_join: Option<E::JoinHandle<()>>,
join: Option<E::JoinHandle<()>>,
trigger: Option<mpsc::Sender<CompactionTrigger>>,
) -> Self {
Self {
abort,
abort: Some(abort),
join,
trigger,
_marker: PhantomData,
}
Expand All @@ -41,10 +48,29 @@ impl<E: Executor> CompactionHandle<E> {
let _ = sender.try_send(CompactionTrigger::Kick);
}
}

/// Gracefully stop the compaction worker and wait for it to exit.
#[cfg(test)]
pub(crate) async fn shutdown(mut self) {
if let Some(mut sender) = self.trigger.take() {
let _ = sender.send(CompactionTrigger::Shutdown).await;
}
if let Some(join) = self.join.take() {
let _ = join.join().await;
}
self.abort.take();
}
}

impl<E: Executor> Drop for CompactionHandle<E> {
fn drop(&mut self) {
self.abort.abort();
if let Some(sender) = &self.trigger {
let mut sender = sender.clone();
let _ = sender.try_send(CompactionTrigger::Shutdown);
}
if let Some(abort) = self.abort.take() {
abort.abort();
}
let _ = self.join.take();
}
}
82 changes: 76 additions & 6 deletions src/db/tests/core/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1431,12 +1431,13 @@ async fn compaction_self_kick_advances_without_periodic_tick()
let id_allocator = Arc::new(AtomicU64::new(10));
let executor = LocalCompactionExecutor::with_id_allocator(Arc::clone(&sst_cfg), id_allocator);
let driver = Arc::new(db.compaction_driver());
let worker_config = CompactionWorkerConfig::new(None, 1, 1, CascadeConfig::default());
let worker_config =
CompactionWorkerConfig::new(None, 1, 1, CascadeConfig::new(0, Duration::from_millis(0)));
let handle = driver.spawn_worker(Arc::clone(&db.executor), planner, executor, worker_config);

handle.kick();

let deadline = Instant::now() + Duration::from_secs(2);
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let snapshot = db.manifest.snapshot_latest(db.manifest_table).await?;
if let Some(version) = snapshot.latest_version.as_ref() {
Expand All @@ -1457,6 +1458,7 @@ async fn compaction_self_kick_advances_without_periodic_tick()
"expected L2 compaction to complete without new writes"
);

handle.shutdown().await;
fs::remove_dir_all(&db_root)?;
Ok(())
}
Expand Down Expand Up @@ -2386,7 +2388,7 @@ async fn compaction_periodic_trigger_records_metrics() -> Result<(), Box<dyn std
wait_for_executions(&executed, 1).await,
"expected compaction execution"
);
drop(handle);
handle.shutdown().await;

let snapshot = metrics.snapshot();
assert!(snapshot.trigger_periodic >= 1);
Expand Down Expand Up @@ -2473,7 +2475,7 @@ async fn compaction_kick_triggers_without_periodic_tick() -> Result<(), Box<dyn
wait_for_executions(&executed, 1).await,
"expected compaction work after kick"
);
drop(handle);
handle.shutdown().await;

let snapshot = metrics.snapshot();
assert_eq!(snapshot.trigger_kick, 1);
Expand All @@ -2482,6 +2484,74 @@ async fn compaction_kick_triggers_without_periodic_tick() -> Result<(), Box<dyn
Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn cascade_scheduling_queue_size_one() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let executor = Arc::new(TokioExecutor::default());
let mut inner: DbInner<InMemoryFs, TokioExecutor> =
DB::new(mode_cfg, Arc::clone(&executor)).await?.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
inner.compaction_metrics = Some(Arc::clone(&metrics));

let entry = SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/001.parquet"),
None,
);
inner
.manifest
.apply_version_edits(
inner.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry],
}],
)
.await?;

let executed = Arc::new(StdMutex::new(Vec::new()));
let planner = CascadePlanner;
let executor = CountingExecutor::new(Arc::clone(&executed));
let driver = Arc::new(inner.compaction_driver());
let worker_config =
CompactionWorkerConfig::new(None, 1, 1, CascadeConfig::new(1, Duration::from_millis(0)));
let handle = driver.spawn_worker(
Arc::clone(&inner.executor),
planner,
executor,
worker_config,
);

handle.kick();
assert!(
wait_for_executions(&executed, 2).await,
"expected L0->L1 and L1->L2 executions with queue size 1"
);
handle.shutdown().await;

let tasks = executed.lock().expect("executed lock").clone();
let l0_to_l1 = tasks
.iter()
.filter(|(source, target)| *source == 0 && *target == 1)
.count();
let l1_to_l2 = tasks
.iter()
.filter(|(source, target)| *source == 1 && *target == 2)
.count();
assert_eq!(l0_to_l1, 1);
assert_eq!(l1_to_l2, 1);
Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn cascade_scheduling_respects_budget() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -2534,7 +2604,7 @@ async fn cascade_scheduling_respects_budget() -> Result<(), Box<dyn std::error::
wait_for_executions(&executed, 1).await,
"expected initial compaction execution"
);
drop(handle);
handle.shutdown().await;

let tasks = executed.lock().expect("executed lock").clone();
let l0_to_l1 = tasks
Expand Down Expand Up @@ -2617,7 +2687,7 @@ async fn cascade_scheduling_respects_cooldown() -> Result<(), Box<dyn std::error
"expected two L0->L1 executions and one cascade"
);

drop(handle);
handle.shutdown().await;

let tasks = executed.lock().expect("executed lock").clone();
let l0_to_l1 = tasks
Expand Down
Loading
Loading