From 77af6e3ae3fab4bf17ab70be37a1bbd8ea7deb9f Mon Sep 17 00:00:00 2001 From: Roberto Bayardo Date: Tue, 21 Apr 2026 13:38:54 -0700 Subject: [PATCH] fix bitmap batch parent chain growth with rwlock Shares the committed bitmap behind Arc> so live batches can hold a reference while apply_batch mutates in place under the write lock. Removes Db::flatten, drops BitmapBatch's Layer chain in favor of apply-in-place, and adds valid_targets staleness checks. --- storage/src/qmdb/current/batch.rs | 437 +++++++++++++++++++-------- storage/src/qmdb/current/db.rs | 161 +++++----- storage/src/qmdb/current/mod.rs | 226 +++++++++----- storage/src/qmdb/current/sync/mod.rs | 2 +- 4 files changed, 538 insertions(+), 288 deletions(-) diff --git a/storage/src/qmdb/current/batch.rs b/storage/src/qmdb/current/batch.rs index 48358958e1f..6becb7120c6 100644 --- a/storage/src/qmdb/current/batch.rs +++ b/storage/src/qmdb/current/batch.rs @@ -28,7 +28,10 @@ use crate::{ use ahash::AHasher; use commonware_codec::Codec; use commonware_cryptography::{Digest, Hasher}; -use commonware_utils::bitmap::{Prunable as BitMap, Readable as BitmapReadable}; +use commonware_utils::{ + bitmap::{Prunable as BitMap, Readable as BitmapReadable}, + sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, +}; use std::{ collections::{BTreeSet, HashMap}, hash::BuildHasherDefault, @@ -86,11 +89,12 @@ impl ChunkOverlay { chunk[rel / 8] |= 1 << (rel % 8); } - /// Clear a single bit (used for superseded locations). - /// Skips locations in pruned chunks — those bits are already inactive. - fn clear_bit>(&mut self, base: &B, loc: u64) { + /// Clear a single bit (used for superseded locations). `pruned_chunks` is passed in by the + /// caller so the hot loop in `build_chunk_overlay` reads it once rather than per call. + /// Skips locations in pruned chunks since those bits are already inactive. + fn clear_bit>(&mut self, base: &B, pruned_chunks: usize, loc: u64) { let idx = BitMap::::to_chunk_index(loc); - if idx < base.pruned_chunks() { + if idx < pruned_chunks { return; } let rel = (loc % Self::CHUNK_BITS) as usize; @@ -279,11 +283,43 @@ where bitmap_parent: BitmapBatch, } -/// A speculative batch of operations whose root digest has been computed, -/// in contrast to [`UnmerkleizedBatch`]. +/// A speculative batch of operations whose root digest has been computed, in contrast to +/// [`UnmerkleizedBatch`]. +/// +/// Wraps an [`any::batch::MerkleizedBatch`] and adds the bitmap and grafted MMR state needed to +/// compute the canonical root. +/// +/// # Branch validity +/// +/// A `MerkleizedBatch` is a branch-scoped view rooted at a specific committed prefix of the DB. It +/// is not an immutable snapshot. +/// +/// Internally, the batch chain terminates in the DB's committed bitmap via `BitmapBatch::Base`. +/// That committed bitmap evolves in place as [`Db::apply_batch`](super::db::Db::apply_batch), +/// [`Db::prune`](super::db::Db::prune), and [`Db::rewind`](super::db::Db::rewind) update the DB. +/// +/// Reads through this batch's chain, constructing child batches from it, and applying it later are +/// only semantically correct while its ancestor chain is still the committed prefix of the DB. In +/// other words, every successful [`apply_batch`](super::db::Db::apply_batch) since this batch was +/// merkleized must have applied an ancestor of this batch. +/// +/// Once a non-ancestor batch is applied, this batch and all of its descendants become invalid +/// objects. The library does not guard against continued use after that point. /// -/// Wraps an [`any::batch::MerkleizedBatch`] and adds the bitmap and grafted MMR state needed -/// to compute the canonical root. +/// Applying an invalid batch is caught by the any-layer staleness check and returns +/// [`Error::StaleBatch`] without mutating committed state, so `apply_batch` itself cannot corrupt +/// the DB. The one exception is equal-size sibling branches (where both branches have the same +/// total operation count): the staleness check is size-based and cannot distinguish them, so +/// applying a descendant of one sibling after the other was already applied can silently corrupt +/// snapshot/log state. Callers must not apply batches from an orphaned branch. +/// +/// Rules of thumb: +/// - Drop any `Arc` you no longer intend to apply. +/// - Extending a batch after `apply_batch` has consumed it (building a child off the just-applied +/// parent) is safe. The committed bitmap now equals the parent's post-apply state, so child reads +/// are consistent. +/// - Extending a batch after a different branch has been applied is not safe. Do not call `get`, +/// `new_batch`, or `apply_batch` on that branch again. pub struct MerkleizedBatch where Operation: Send + Sync, @@ -446,13 +482,14 @@ where { let total_bits = base.len() + batch_len as u64; let mut overlay = ChunkOverlay::new(total_bits); + let pruned_chunks = base.pruned_chunks(); // 1. CommitFloor (last op) is always active. let commit_loc = batch_base + batch_len as u64 - 1; overlay.set_bit(base, commit_loc); // 2. Inactivate previous CommitFloor. - overlay.clear_bit(base, batch_base - 1); + overlay.clear_bit(base, pruned_chunks, batch_base - 1); // 3. Set active bits + clear superseded locations from the diff. for (key, entry) in diff { @@ -473,7 +510,7 @@ where } } if let Some(old) = prev_loc { - overlay.clear_bit(base, *old); + overlay.clear_bit(base, pruned_chunks, *old); } } @@ -595,9 +632,9 @@ where // compute_db_root sees newly completed chunks. Using bitmap_parent alone would miss chunks // that transitioned from partial to complete in this batch. let bitmap_batch = BitmapBatch::Layer(Arc::new(BitmapBatchLayer { - pruned_chunks: bitmap_parent.pruned_chunks(), parent: bitmap_parent.clone(), overlay: Arc::new(overlay), + shared: Arc::clone(bitmap_parent.shared()), })); // Compute canonical root. The grafted batch alone cannot resolve committed nodes, @@ -637,13 +674,95 @@ where })) } -/// Immutable bitmap state at any point in a batch chain. +/// The committed bitmap shared between the [`Db`](super::db::Db) and live batches. +/// +/// Wrapped in a [`RwLock`] so that [`Db::apply_batch`](super::db::Db::apply_batch), +/// [`Db::prune`](super::db::Db::prune), and [`Db::rewind`](super::db::Db::rewind) can mutate +/// the bitmap in place while live batches concurrently read through it. +/// +/// # Why in-place mutation under a lock +/// +/// Snapshot-based alternatives (per-apply clone, page-level copy-on-write, etc.) all require +/// cloning at least the bitmap's top-level pointer structure on every apply. For large DBs that +/// cost grows linearly with the total bit count and every live batch retains its snapshot's +/// memory until dropped, so memory use would grow with both bitmap size and batch lifetime. +/// Mutating in place keeps memory bounded by the actual bitmap size regardless of how many +/// batches are alive or how long they live. The per-call read lock is the cost we pay for that. +/// +/// # Reading through invalid batches +/// +/// The bitmap behind this lock represents *committed* state. If a caller holds a +/// [`MerkleizedBatch`] that has become invalid (see its "Branch validity" docs for the +/// conditions), reads through that batch's chain will silently return inconsistent data (the +/// chain's overlays mixed with post-divergence committed chunks). The library does not guard +/// against this; callers must avoid reading through invalid batches. +pub(crate) struct SharedBitmap { + inner: RwLock>, +} + +impl SharedBitmap { + pub(crate) const fn new(bitmap: BitMap) -> Self { + Self { + inner: RwLock::new(bitmap), + } + } + + /// Acquire a shared read guard over the committed bitmap. Kept private so external callers + /// go through [`BitmapReadable`] (which doesn't expose a guard across `.await`). + fn read(&self) -> RwLockReadGuard<'_, BitMap> { + self.inner.read() + } + + /// Acquire an exclusive write guard. By convention only + /// [`Db::apply_batch`](super::db::Db::apply_batch), [`Db::prune`](super::db::Db::prune), and + /// [`Db::rewind`](super::db::Db::rewind) mutate the shared bitmap. + pub(super) fn write(&self) -> RwLockWriteGuard<'_, BitMap> { + self.inner.write() + } +} + +impl std::fmt::Debug for SharedBitmap { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SharedBitmap") + .field("bitmap_len", &BitmapReadable::::len(&*self.read())) + .finish() + } +} + +/// [`BitmapReadable`] over the DB's committed bitmap. Each call acquires the read lock briefly. +impl BitmapReadable for SharedBitmap { + fn complete_chunks(&self) -> usize { + self.read().complete_chunks() + } + + fn get_chunk(&self, idx: usize) -> [u8; N] { + *self.read().get_chunk(idx) + } + + fn last_chunk(&self) -> ([u8; N], u64) { + let guard = self.read(); + let (chunk, bits) = guard.last_chunk(); + (*chunk, bits) + } + + fn pruned_chunks(&self) -> usize { + self.read().pruned_chunks() + } + + fn len(&self) -> u64 { + BitmapReadable::::len(&*self.read()) + } +} + +/// A view of the committed bitmap plus zero or more speculative overlay `Layer`s. /// -/// Mirrors the [`crate::merkle::batch::MerkleizedBatch`] pattern. +/// The chain terminates in a `Base` that references the shared committed bitmap. No validity +/// check is performed. Callers must ensure they only read through batches whose chains are +/// still valid prefixes of committed state (see [`SharedBitmap`]'s docs). #[derive(Clone, Debug)] pub(crate) enum BitmapBatch { - /// Committed bitmap (chain terminal). - Base(Arc>), + /// Chain terminal: shared reference to the committed bitmap. + Base(Arc>), /// Speculative layer on top of a parent batch. Layer(Arc>), } @@ -654,14 +773,47 @@ pub(crate) struct BitmapBatchLayer { pub(crate) parent: BitmapBatch, /// Chunk-level overlay: materialized bytes for every chunk that differs from parent. pub(crate) overlay: Arc>, - /// Pruned-chunk count, copied from `parent` at construction. Invariant across the whole - /// layer chain (pruning only happens on the committed base), so caching here lets - /// `BitmapBatch::pruned_chunks` return in O(1) instead of walking to the Base. - pub(crate) pruned_chunks: usize, + /// Cached terminal [`SharedBitmap`] so [`BitmapBatch::shared`] and + /// [`BitmapBatch::pruned_chunks`] answer in O(1) instead of walking the chain. + pub(crate) shared: Arc>, } impl BitmapBatch { const CHUNK_SIZE_BITS: u64 = BitMap::::CHUNK_SIZE_BITS; + + /// Return the terminal [`SharedBitmap`] at the bottom of the chain. + fn shared(&self) -> &Arc> { + match self { + Self::Base(s) => s, + Self::Layer(layer) => &layer.shared, + } + } + + /// Return a chain equivalent to `self` with any `Layer` whose overlay is now fully committed + /// replaced by a direct reference to the committed bitmap. Since `apply_batch` commits + /// contiguous prefixes, committed `Layer`s are always at the bottom of the chain. + fn trim_committed(&self) -> Self { + let shared = self.shared(); + let committed = shared.read().len(); + let mut kept = Vec::new(); + let mut current = self; + while let Self::Layer(layer) = current { + if layer.overlay.len <= committed { + break; + } + kept.push(Arc::clone(&layer.overlay)); + current = &layer.parent; + } + let mut result = Self::Base(Arc::clone(shared)); + for overlay in kept.into_iter().rev() { + result = Self::Layer(Arc::new(BitmapBatchLayer { + parent: result, + overlay, + shared: Arc::clone(shared), + })); + } + result + } } impl BitmapReadable for BitmapBatch { @@ -675,7 +827,7 @@ impl BitmapReadable for BitmapBatch { let mut current = self; loop { match current { - Self::Base(bm) => return *bm.get_chunk(idx), + Self::Base(shared) => return shared.get_chunk(idx), Self::Layer(layer) => { if let Some(&chunk) = layer.overlay.get(idx) { return chunk; @@ -702,96 +854,17 @@ impl BitmapReadable for BitmapBatch { } fn pruned_chunks(&self) -> usize { - match self { - Self::Base(bm) => bm.pruned_chunks(), - Self::Layer(layer) => layer.pruned_chunks, - } + self.shared().pruned_chunks() } fn len(&self) -> u64 { match self { - Self::Base(bm) => BitmapReadable::::len(bm.as_ref()), + Self::Base(shared) => BitmapReadable::::len(shared.as_ref()), Self::Layer(layer) => layer.overlay.len, } } } -impl BitmapBatch { - /// Apply a chunk overlay to this bitmap. When `self` is `Base` with sole ownership, writes - /// overlay chunks directly into the bitmap. Otherwise creates a new `Layer`. - pub(super) fn apply_overlay(&mut self, overlay: Arc>) { - // Fast path: write overlay chunks directly into the Base bitmap. - if let Self::Base(base) = self { - if let Some(bitmap) = Arc::get_mut(base) { - // Extend bitmap to the overlay's length. - bitmap.extend_to(overlay.len); - // Overwrite dirty chunks. - for (&idx, chunk_bytes) in &overlay.chunks { - if idx >= bitmap.pruned_chunks() { - bitmap.set_chunk_by_index(idx, chunk_bytes); - } - } - return; - } - } - - // Slow path: create a new layer. - let pruned_chunks = self.pruned_chunks(); - let parent = self.clone(); - *self = Self::Layer(Arc::new(BitmapBatchLayer { - parent, - overlay, - pruned_chunks, - })); - } - - /// Flatten all layers back to a single `Base(Arc>)`. - /// - /// After flattening, the new `Base` Arc has refcount 1 (assuming no external clones - /// are held). - pub(super) fn flatten(&mut self) { - if matches!(self, Self::Base(_)) { - return; - } - - // Take ownership of the chain so that Arc refcounts are not - // artificially inflated by a clone. - let mut owned = std::mem::replace(self, Self::Base(Arc::new(BitMap::default()))); - - // Collect overlays from tip to base. - let mut overlays = Vec::new(); - let base = loop { - match owned { - Self::Base(bm) => break bm, - Self::Layer(layer) => match Arc::try_unwrap(layer) { - Ok(inner) => { - overlays.push(inner.overlay); - owned = inner.parent; - } - Err(arc) => { - overlays.push(arc.overlay.clone()); - owned = arc.parent.clone(); - } - }, - } - }; - - // Apply overlays from base to tip. - let mut bitmap = Arc::try_unwrap(base).unwrap_or_else(|arc| (*arc).clone()); - for overlay in overlays.into_iter().rev() { - // Extend bitmap to the overlay's length. - bitmap.extend_to(overlay.len); - // Apply dirty chunks. - for (&idx, chunk_bytes) in &overlay.chunks { - if idx >= bitmap.pruned_chunks() { - bitmap.set_chunk_by_index(idx, chunk_bytes); - } - } - } - *self = Self::Base(Arc::new(bitmap)); - } -} - impl MerkleizedBatch where @@ -818,6 +891,10 @@ where /// All uncommitted ancestors in the chain must be kept alive until the child (or any /// descendant) is merkleized. Dropping an uncommitted ancestor causes data /// loss detected at `apply_batch` time. + /// + /// This is only valid while `self` is still on the winning branch. If a different branch has + /// been applied since `self` was created, `self` is no longer a valid parent and must not be + /// extended. pub fn new_batch(self: &Arc) -> UnmerkleizedBatch where H: Hasher, @@ -825,11 +902,14 @@ where UnmerkleizedBatch::new( self.inner.new_batch::(), Arc::clone(&self.grafted), - self.bitmap.clone(), + self.bitmap.trim_committed(), ) } /// Read through: local diff -> ancestor diffs -> committed DB. + /// + /// This is only valid while `self` remains on the committed prefix. If a non-ancestor batch + /// has been applied since `self` was merkleized, do not read through it. pub async fn get( &self, key: &U::Key, @@ -855,13 +935,17 @@ where H: Hasher, Operation: Codec, { - /// Create an initial [`MerkleizedBatch`] from the committed DB state. + /// Create an initial [`MerkleizedBatch`] from the current committed DB state. + /// + /// The returned batch is rooted at the current committed prefix, but it is not a persistent + /// snapshot across later divergent commits. If some other branch is applied afterward, this + /// batch is no longer valid and must not be read through, extended, or applied. pub fn to_batch(&self) -> Arc> { let grafted = self.grafted_snapshot(); Arc::new(MerkleizedBatch { inner: self.any.to_batch(), grafted, - bitmap: self.status.clone(), + bitmap: BitmapBatch::Base(Arc::clone(&self.status)), canonical_root: self.root, }) } @@ -1302,32 +1386,123 @@ mod tests { assert_eq!(scan.next_candidate(Location::new(0), 0), None); } + // ---- trim_committed tests ---- + // + // `trim_committed` is called from `MerkleizedBatch::new_batch` to strip any `Layer`s whose + // overlays have already been absorbed into the shared committed bitmap by a prior apply. + // The implementation is a single loop that collects uncommitted overlays top-down and + // rebuilds a fresh chain rooted at `Base`. These tests cover distinct input shapes directly, + // without going through the full Db/batch machinery, so the function's structural output + // can be asserted. + + /// Build a chain `Base(shared) -> Layer(len=L1) -> Layer(len=L2) -> ...` from a list of + /// overlay lengths (bottom to top). Each constructed `Layer` caches `shared` per the + /// struct's invariant. + fn make_chain(shared: &Arc>, overlay_lens: &[u64]) -> BitmapBatch { + let mut chain = BitmapBatch::Base(Arc::clone(shared)); + for &len in overlay_lens { + chain = BitmapBatch::Layer(Arc::new(BitmapBatchLayer { + parent: chain, + overlay: Arc::new(ChunkOverlay::new(len)), + shared: Arc::clone(shared), + })); + } + chain + } + + /// Walk a chain and return its overlay lengths in bottom-to-top order. Used to assert the + /// structural output of `trim_committed` without touching private fields. Panics if the + /// chain isn't terminated by a single `Base` at the bottom. + fn chain_overlays(batch: &BitmapBatch) -> Vec { + let mut lens = Vec::new(); + let mut current = batch; + while let BitmapBatch::Layer(layer) = current { + lens.push(layer.overlay.len); + current = &layer.parent; + } + assert!(matches!(current, BitmapBatch::Base(_))); + lens.reverse(); + lens + } + + /// Input is already a bare `Base` with no speculative layers on top — the loop body never + /// runs, `kept` stays empty, and the result is a freshly constructed `Base` pointing at the + /// same `SharedBitmap`. Real-world trigger: `MerkleizedBatch::new_batch` on a batch whose + /// chain was previously trimmed flat (e.g., immediately after an apply collapsed everything). #[test] - fn test_apply_overlay() { - // Base: 8 bits all set, sole owner -> fast path. - let base = make_bitmap(&[true; 8]); - let mut batch = BitmapBatch::Base(Arc::new(base)); - - let mut overlay = ChunkOverlay::new(12); - let mut c0 = [0u8; N]; - c0[0] = 0b1111_0111; // bits 0-7 set except bit 3 - c0[1] = 0b0000_0100; // bit 10 set - overlay.chunks.insert(0, c0); - batch.apply_overlay(Arc::new(overlay)); - - // Fast path keeps Base, extends length, applies chunks. - assert!(matches!(batch, BitmapBatch::Base(_))); - assert_eq!(batch.len(), 12); - assert_eq!(batch.get_chunk(0)[0] & (1 << 3), 0); - assert_ne!(batch.get_chunk(0)[1] & (1 << 2), 0); - - // Shared Arc -> slow path creates Layer. - let BitmapBatch::Base(ref base_arc) = batch else { - panic!("expected Base"); - }; - let _shared = Arc::clone(base_arc); - let overlay2 = ChunkOverlay::new(12); - batch.apply_overlay(Arc::new(overlay2)); - assert!(matches!(batch, BitmapBatch::Layer(_))); + fn trim_committed_already_base() { + let shared = Arc::new(SharedBitmap::::new(make_bitmap(&[true; 64]))); + let base = BitmapBatch::Base(Arc::clone(&shared)); + let result = base.trim_committed(); + // Still `Base`, pointing at the same shared terminal. + match result { + BitmapBatch::Base(s) => assert!(Arc::ptr_eq(&s, &shared)), + BitmapBatch::Layer(_) => panic!("expected Base"), + } + } + + /// Every layer has been absorbed by prior applies — the loop breaks on the first iteration + /// and `kept` stays empty, so the result is a bare `Base`. This is the steady-state + /// "extend a just-applied batch" flow: after `apply_batch(A)`, `A`'s own layer has + /// `overlay.len == committed` and the next `new_batch` call should start from a clean + /// terminal. + #[test] + fn trim_committed_all_committed() { + // `shared.len() == 64`; the single layer's `overlay.len == 32 (<= 64)`, so it's committed. + let shared = Arc::new(SharedBitmap::::new(make_bitmap(&[true; 64]))); + let chain = make_chain(&shared, &[32]); + let result = chain.trim_committed(); + // Collapsed to a bare Base, pointing at the original shared. + match result { + BitmapBatch::Base(s) => assert!(Arc::ptr_eq(&s, &shared)), + BitmapBatch::Layer(_) => panic!("expected Base after full trim"), + } + } + + /// Every layer is still speculative — the loop walks all the way to `Base` without + /// breaking, and `kept` holds every overlay. The rebuilt chain is structurally equivalent + /// to the input (same overlay lens, same shared terminal). Real-world trigger: speculating + /// multiple batches deep (A, then B off A, then C off B) without `apply_batch` in between. + #[test] + fn trim_committed_none_committed() { + // `shared.len() == 32`; both overlays have `len > 32`, so neither is committed. + let shared = Arc::new(SharedBitmap::::new(make_bitmap(&[true; 32]))); + let chain = make_chain(&shared, &[64, 96]); + let result = chain.trim_committed(); + // Structure must be preserved in bottom-to-top order. + assert_eq!(chain_overlays(&result), vec![64, 96]); + } + + /// Exactly one layer is uncommitted (the newest) on top of a committed prefix — the + /// dominant pattern in chained growth. The loop collects the one uncommitted overlay, and + /// the rebuild produces `Layer(Base, overlay_B)`. Also verifies the rebuilt layer carries + /// the cached `shared` reference correctly. Real-world trigger: apply parent A, then B + /// held alive off A, then `B.new_batch()` to build C. + #[test] + fn trim_committed_exactly_one_uncommitted() { + // `shared.len() == 64`; committed layer (`overlay.len == 64`) + uncommitted (`96`). + let shared = Arc::new(SharedBitmap::::new(make_bitmap(&[true; 64]))); + let chain = make_chain(&shared, &[64, 96]); + let result = chain.trim_committed(); + // The committed layer is gone; only the uncommitted overlay remains. + assert_eq!(chain_overlays(&result), vec![96]); + // And the rebuilt layer's `shared` field still points at the original terminal. + assert!(Arc::ptr_eq(result.shared(), &shared)); + } + + /// Two or more uncommitted layers on top of a committed prefix — exercises the loop's + /// iterated `kept.push` and the rebuild's iterated `Arc::new(BitmapBatchLayer)`, including + /// the cached `shared` wire-through on every reconstructed layer. Real-world trigger: + /// build A, then B off A, then C off B; apply only A; then call `C.new_batch()`. + #[test] + fn trim_committed_multiple_uncommitted() { + // `shared.len() == 64`; committed layer (64), then two uncommitted (96, 128). + let shared = Arc::new(SharedBitmap::::new(make_bitmap(&[true; 64]))); + let chain = make_chain(&shared, &[64, 96, 128]); + let result = chain.trim_committed(); + // Committed layer dropped; uncommitted pair preserved in order. + assert_eq!(chain_overlays(&result), vec![96, 128]); + // Every reconstructed layer must still cache the original shared terminal. + assert!(Arc::ptr_eq(result.shared(), &shared)); } } diff --git a/storage/src/qmdb/current/db.rs b/storage/src/qmdb/current/db.rs index 60c68812158..5ee5fc5ea6a 100644 --- a/storage/src/qmdb/current/db.rs +++ b/storage/src/qmdb/current/db.rs @@ -19,7 +19,7 @@ use crate::{ operation::{update::Update, Operation}, }, current::{ - batch::BitmapBatch, + batch::{BitmapBatch, SharedBitmap}, grafting, proof::{OperationProof, OpsRootWitness, RangeProof}, }, @@ -65,9 +65,10 @@ pub struct Db< /// The bitmap over the activity status of each operation. Supports augmenting [Db] proofs in /// order to further prove whether a key _currently_ has a specific value. /// - /// Stored as a [`BitmapBatch`] so that `apply_batch` can - /// push layers in O(batch) instead of deep-cloning. - pub(super) status: BitmapBatch, + /// Shared behind an `Arc>` so that live batches can hold a reference to the + /// committed bitmap while [`Db::apply_batch`] mutates it in place under the write lock. See + /// [`SharedBitmap`]'s doc for the branch-validity caveat that callers must respect. + pub(super) status: Arc>, /// Each leaf corresponds to a complete bitmap chunk at the grafting height. /// See the [grafted leaf formula](super) in the module documentation. @@ -186,8 +187,8 @@ where ) -> Result, Error> { let storage = self.grafted_storage(); let grafted_root = - compute_grafted_root::(hasher, &self.status, &storage).await?; - let partial_chunk = partial_chunk::<_, N>(&self.status) + compute_grafted_root::(hasher, self.status.as_ref(), &storage).await?; + let partial_chunk = partial_chunk::<_, N>(self.status.as_ref()) .map(|(chunk, next_bit)| (next_bit, hasher.digest(&chunk))); Ok(OpsRootWitness { grafted_root, @@ -205,7 +206,7 @@ where super::batch::UnmerkleizedBatch::new( self.any.new_batch(), self.grafted_snapshot(), - self.status.clone(), + BitmapBatch::Base(Arc::clone(&self.status)), ) } @@ -217,7 +218,7 @@ where ) -> Result, Error> { let storage = self.grafted_storage(); let ops_root = self.any.log.root(); - OperationProof::new(hasher, &self.status, &storage, loc, ops_root).await + OperationProof::new(hasher, self.status.as_ref(), &storage, loc, ops_root).await } /// Returns a proof that the specified range of operations are part of the database, along with @@ -241,7 +242,7 @@ where let ops_root = self.any.log.root(); RangeProof::new_with_ops( hasher, - &self.status, + self.status.as_ref(), &storage, &self.any.log, start_loc, @@ -379,18 +380,6 @@ where Self::pair_absorption_threshold(self.status.pruned_chunks() as u64) } - /// Collapse the accumulated bitmap `Layer` chain into a flat `Base`. - /// - /// Each [`Db::apply_batch`] pushes a new `Layer` on the bitmap. These layers are cheap - /// to create but make subsequent reads walk the full chain. Calling `flatten` collapses - /// the chain into a single `Base`, bounding lookup cost. - /// - /// This is called automatically by [`Db::prune`]. Callers that apply many batches without - /// pruning should call this periodically. - pub fn flatten(&mut self) { - self.status.flatten(); - } - /// Prunes historical operations prior to `prune_loc`. This does not affect the db's root or /// snapshot. /// @@ -409,13 +398,8 @@ where return Err(Error::PruneBeyondMinRequired(prune_loc, sync_boundary)); } - self.flatten(); - // Prune bitmap chunks to the sync boundary (most aggressive safe location). - let BitmapBatch::::Base(base) = &mut self.status else { - unreachable!("flatten() guarantees Base"); - }; - Arc::make_mut(base).prune_to_bit(*sync_boundary); + self.status.write().prune_to_bit(*sync_boundary); // Prune the grafted tree to match the bitmap's pruned chunks. let pruned_chunks = self.status.pruned_chunks() as u64; @@ -481,8 +465,6 @@ where /// A successful rewind is not restart-stable until a subsequent [`Db::commit`] or /// [`Db::sync`]. pub async fn rewind(&mut self, size: Location) -> Result<(), Error> { - self.flatten(); - let rewind_size = *size; let current_size = *self.any.last_commit_loc + 1; if rewind_size == current_size { @@ -541,28 +523,22 @@ where // handle may be internally diverged and must be dropped by the caller. let restored_locs = self.any.rewind(size).await?; - // Patch bitmap: truncate to rewound size, then mark restored locations as active. + // Patch shared bitmap under the write lock: truncate to rewound size, then mark restored + // locations as active. Live batches built pre-rewind will silently return wrong data on + // any chunk read that falls through to the committed bitmap; callers must drop them. { - let BitmapBatch::::Base(base) = &mut self.status else { - unreachable!("flatten() guarantees Base"); - }; - let status: &mut BitMap = Arc::get_mut(base).expect("flatten ensures sole owner"); - status.truncate(rewind_size); + let mut guard = self.status.write(); + guard.truncate(rewind_size); for loc in &restored_locs { - status.set_bit(**loc, true); + guard.set_bit(**loc, true); } - status.set_bit(rewind_size - 1, true); + guard.set_bit(rewind_size - 1, true); } - let BitmapBatch::Base(status) = &self.status else { - unreachable!("flatten() guarantees Base"); - }; - let status = status.as_ref(); - // Rebuild grafted tree and canonical root for the patched bitmap. let hasher = StandardHasher::::new(); let grafted_tree = build_grafted_tree::( &hasher, - status, + self.status.as_ref(), &pinned_nodes, &self.any.log.merkle, self.thread_pool.as_ref(), @@ -574,9 +550,16 @@ where &self.any.log.merkle, hasher.clone(), ); - let partial_chunk = partial_chunk(status); + let partial_chunk = partial_chunk(self.status.as_ref()); let ops_root = self.any.log.root(); - let root = compute_db_root(&hasher, status, &storage, partial_chunk, &ops_root).await?; + let root = compute_db_root( + &hasher, + self.status.as_ref(), + &storage, + partial_chunk, + &ops_root, + ) + .await?; self.grafted_tree = grafted_tree; self.root = root; @@ -589,15 +572,15 @@ where let mut metadata = self.metadata.lock().await; metadata.clear(); + // Snapshot the pruning boundary under the read lock; the guard drops before any await. + let pruned_chunks_u64 = self.status.pruned_chunks() as u64; + // Write the number of pruned chunks. let key = U64::new(PRUNED_CHUNKS_PREFIX, 0); - metadata.put( - key, - (self.status.pruned_chunks() as u64).to_be_bytes().to_vec(), - ); + metadata.put(key, pruned_chunks_u64.to_be_bytes().to_vec()); // Write the pinned nodes of the grafted tree. - let pruned_chunks = Location::::new(self.status.pruned_chunks() as u64); + let pruned_chunks = Location::::new(pruned_chunks_u64); for (i, grafted_pos) in F::nodes_to_pin(pruned_chunks).enumerate() { let digest = self .grafted_tree @@ -675,30 +658,45 @@ where // 1. Apply inner any-layer batch (handles snapshot + journal partial skipping). let range = self.any.apply_batch(Arc::clone(&batch.inner)).await?; - // 2. Apply bitmap overlay. The batch's bitmap is a Layer whose overlay - // contains all dirty chunks. Walk the layer chain to collect and apply - // all uncommitted ancestor overlays + this batch's overlay. - { - let mut overlays = Vec::new(); - let mut current = &batch.bitmap; - while let super::batch::BitmapBatch::Layer(layer) = current { - if layer.overlay.len <= db_size { - break; - } - overlays.push(Arc::clone(&layer.overlay)); - current = &layer.parent; - } - // Apply in chronological order (deepest ancestor first). - for overlay in overlays.into_iter().rev() { - self.status.apply_overlay(overlay); + // 2. Collect bitmap overlays from the batch chain. The `Arc`s we push here + // are independent of the batch's layer chain, so the batch can be dropped before we + // touch the shared bitmap below. + let mut overlays = Vec::new(); + let mut current = &batch.bitmap; + while let super::batch::BitmapBatch::Layer(layer) = current { + if layer.overlay.len <= db_size { + break; } + overlays.push(Arc::clone(&layer.overlay)); + current = &layer.parent; } // 3. Apply grafted tree (merkle layer handles partial ancestor skipping). self.grafted_tree.apply_batch(&batch.grafted)?; - // 4. Canonical root. - self.root = batch.canonical_root; + // 4. Snapshot the canonical root before releasing the batch. + let canonical_root = batch.canonical_root; + + // 5. Release the batch so its chain's refs drop before we mutate the shared bitmap. + drop(batch); + + // 6. Apply overlays in place under the write lock. + { + let mut guard = self.status.write(); + if let Some(newest) = overlays.first() { + guard.extend_to(newest.len); + } + let pruned = guard.pruned_chunks(); + for overlay in overlays.into_iter().rev() { + for (&idx, chunk) in &overlay.chunks { + if idx >= pruned { + guard.set_chunk_by_index(idx, chunk); + } + } + } + } + + self.root = canonical_root; Ok(range) } @@ -935,7 +933,7 @@ pub(super) async fn compute_grafted_leaves( hasher: &StandardHasher, - bitmap: &BitMap, + bitmap: &impl BitmapReadable, pinned_nodes: &[H::Digest], ops_tree: &impl MerkleStorage, pool: Option<&ThreadPool>, @@ -948,7 +946,7 @@ pub(super) async fn build_grafted_tree( hasher, ops_tree, - (pruned_chunks..complete_chunks).map(|chunk_idx| (chunk_idx, *bitmap.get_chunk(chunk_idx))), + (pruned_chunks..complete_chunks).map(|chunk_idx| (chunk_idx, bitmap.get_chunk(chunk_idx))), pool, ) .await?; @@ -1098,38 +1096,35 @@ mod tests { #[test] fn combine_roots_deterministic() { - let h1 = StandardHasher::::new(); - let h2 = StandardHasher::::new(); + let hasher = StandardHasher::::new(); let ops = Sha256::hash(b"ops"); let grafted = Sha256::hash(b"grafted"); - let r1 = combine_roots(&h1, &ops, &grafted, None); - let r2 = combine_roots(&h2, &ops, &grafted, None); + let r1 = combine_roots(&hasher, &ops, &grafted, None); + let r2 = combine_roots(&hasher, &ops, &grafted, None); assert_eq!(r1, r2); } #[test] fn combine_roots_with_partial_differs() { - let h1 = StandardHasher::::new(); - let h2 = StandardHasher::::new(); + let hasher = StandardHasher::::new(); let ops = Sha256::hash(b"ops"); let grafted = Sha256::hash(b"grafted"); let partial_digest = Sha256::hash(b"partial"); - let without = combine_roots(&h1, &ops, &grafted, None); - let with = combine_roots(&h2, &ops, &grafted, Some((5, &partial_digest))); + let without = combine_roots(&hasher, &ops, &grafted, None); + let with = combine_roots(&hasher, &ops, &grafted, Some((5, &partial_digest))); assert_ne!(without, with); } #[test] fn combine_roots_different_ops_root() { - let h1 = StandardHasher::::new(); - let h2 = StandardHasher::::new(); + let hasher = StandardHasher::::new(); let ops_a = Sha256::hash(b"ops_a"); let ops_b = Sha256::hash(b"ops_b"); let grafted = Sha256::hash(b"grafted"); - let r1 = combine_roots(&h1, &ops_a, &grafted, None); - let r2 = combine_roots(&h2, &ops_b, &grafted, None); + let r1 = combine_roots(&hasher, &ops_a, &grafted, None); + let r2 = combine_roots(&hasher, &ops_b, &grafted, None); assert_ne!(r1, r2); } @@ -1181,7 +1176,7 @@ mod tests { let mut next_idx = 0; populate_fixed_db::(&mut db, next_idx, 256).await; next_idx += 256; - while partial_chunk::<_, 32>(&db.status).is_some() { + while partial_chunk::<_, 32>(db.status.as_ref()).is_some() { populate_fixed_db::(&mut db, next_idx, 1).await; next_idx += 1; } diff --git a/storage/src/qmdb/current/mod.rs b/storage/src/qmdb/current/mod.rs index 11a7b85c38b..d270d989ceb 100644 --- a/storage/src/qmdb/current/mod.rs +++ b/storage/src/qmdb/current/mod.rs @@ -6,6 +6,29 @@ //! See [`crate::qmdb::any`] for batch API examples (forking, sequential //! commit, staleness). The Current layer uses the same batch API. //! +//! # Batch validity +//! +//! Current batches are branch-scoped views, not immutable snapshots. +//! +//! A batch remains valid only while its ancestor chain is still the committed prefix of the DB. +//! Once a non-ancestor batch is applied, that batch and all of its descendants are invalid +//! objects: do not read through them, do not build children from them, and do not attempt to +//! apply them. +//! +//! A short rule of thumb: +//! - A batch is only usable while it stays on the winning branch. +//! +//! Valid: +//! - Build `A`, apply `A`, then build `B` from `A` and read or merkleize `B`. +//! - Call [`Db::to_batch`](db::Db::to_batch) and use the returned batch only while no divergent +//! branch has been applied. +//! +//! Invalid: +//! - Build siblings `B1` and `B2`, apply `B1`, then call `B2.get()`, `B2.new_batch()`, or +//! `apply_batch(B2)`. +//! - Hold `snapshot = db.to_batch()`, mutate the DB through another branch, then use `snapshot` +//! again. +//! //! # Motivation //! //! An [crate::qmdb::any] ("Any") database can prove that a key had a particular value at some @@ -162,7 +185,7 @@ //! //! To avoid this, [`Db::prune`](db::Db::prune) defers bitmap pruning for chunks whose //! chunk-pair parent has not yet been born in the ops tree (see -//! `Db::settled_bitmap_prune_loc`). Once the parent is born, every ops peak within +//! `Db::sync_boundary`). Once the parent is born, every ops peak within //! the pruned region is at height `gh+1` or above, and maps to a pinned peak or an //! ancestor of pinned peaks that can be reconstructed by hashing children (see //! `grafting::Storage::reconstruct_grafted_node`). @@ -357,7 +380,7 @@ where Ok(db::Db { any, - status: batch::BitmapBatch::Base(Arc::new(status)), + status: Arc::new(batch::SharedBitmap::new(status)), grafted_tree, metadata: AsyncMutex::new(metadata), thread_pool, @@ -403,7 +426,10 @@ pub mod tests { use commonware_utils::{NZUsize, NZU16, NZU64}; use core::future::Future; use rand::{rngs::StdRng, RngCore, SeedableRng}; - use std::num::{NonZeroU16, NonZeroUsize}; + use std::{ + num::{NonZeroU16, NonZeroUsize}, + sync::Arc, + }; use tracing::warn; type Error = crate::qmdb::Error; @@ -2959,120 +2985,175 @@ pub mod tests { }); } - /// flatten() is a no-op on a freshly initialized DB (no layers to collapse). + /// A live batch (built off the committed state) must remain readable and applicable after + /// [`Db::prune`] advances the shared bitmap's pruning boundary. Pruning only discards + /// chunks for inactive bits (below the inactivity floor); the batch's own chain and + /// overlays operate at or above the floor, so no reads should land in the pruned region. #[test_traced("INFO")] - fn test_flatten_noop_on_fresh_db() { + fn test_current_live_batch_safe_across_prune() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let ctx = context.with_label("db"); - let mut db: UnorderedVariableDb = - UnorderedVariableDb::init(ctx.clone(), variable_config::("fl-noop", &ctx)) - .await - .unwrap(); + let mut db: UnorderedVariableDb = UnorderedVariableDb::init( + ctx.clone(), + variable_config::("prune-live", &ctx), + ) + .await + .unwrap(); - let root_before = db.root(); - db.flatten(); - assert_eq!(db.root(), root_before); + // Seed enough ops to span multiple bitmap chunks. + let mut seed = db.new_batch(); + for i in 0u64..300 { + seed = seed.write(key(i), Some(val(i))); + } + let seed_m = seed.merkleize(&db, None).await.unwrap(); + db.apply_batch(seed_m).await.unwrap(); + db.commit().await.unwrap(); - db.destroy().await.unwrap(); - }); - } + // Overwrite keys 0..250 so the inactivity floor advances past chunk 0. + let mut p = db.new_batch(); + for i in 0u64..250 { + p = p.write(key(i), Some(val(i + 10_000))); + } + let p_m = p.merkleize(&db, None).await.unwrap(); + db.apply_batch(Arc::clone(&p_m)).await.unwrap(); + db.commit().await.unwrap(); - /// flatten() preserves the root and data after multiple apply_batch calls. - #[test_traced("INFO")] - fn test_flatten_preserves_root_after_batches() { - let executor = deterministic::Runner::default(); - executor.start(|context| async move { - let ctx = context.with_label("db"); - let mut db: UnorderedVariableDb = - UnorderedVariableDb::init(ctx.clone(), variable_config::("fl-root", &ctx)) - .await - .unwrap(); + // Build c off p_m; c is live and shares the committed bitmap via its chain. + let c = p_m + .new_batch::() + .write(key(250), Some(val(99_999))) + .merkleize(&db, None) + .await + .unwrap(); - // Apply several batches to accumulate layers. - for i in 0u64..5 { - let m = db - .new_batch() - .write(key(i), Some(val(i))) - .merkleize(&db, None) - .await - .unwrap(); - db.apply_batch(m).await.unwrap(); - } + // Prune with c still alive. This advances pruned_chunks on the shared bitmap. + db.prune(db.sync_boundary()).await.unwrap(); - let root_before = db.root(); - db.flatten(); - assert_eq!(db.root(), root_before); + // Sanity: c's pending write is still readable via the any-layer diff chain. + assert_eq!(c.get(&key(250), &db).await.unwrap(), Some(val(99_999))); - // Data is still readable. - for i in 0u64..5 { - assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i))); - } + // The actual prune-interaction test: apply c after prune. apply_batch skips overlay + // chunks below the current pruned boundary. + db.apply_batch(c).await.unwrap(); + assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(10_000))); + assert_eq!(db.get(&key(250)).await.unwrap(), Some(val(99_999))); db.destroy().await.unwrap(); }); } - /// flatten() is idempotent: a second call is a no-op. + /// Regression: extending a batch after it has been applied (building a child off the + /// just-applied parent) must produce correct data. + /// + /// With the shared-bitmap `RwLock` design, applying `A` mutates the committed bitmap in + /// place; reads through `A`'s chain after apply fall through to the committed bitmap (which + /// now reflects `A`'s state), and `A`'s own overlays applied on top are consistent with + /// committed. So `A.new_batch()` followed by merkleize + apply is the right-by-construction + /// case, and this test locks it in. #[test_traced("INFO")] - fn test_flatten_idempotent() { + fn test_current_extend_applied_batch() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let ctx = context.with_label("db"); let mut db: UnorderedVariableDb = - UnorderedVariableDb::init(ctx.clone(), variable_config::("fl-idem", &ctx)) + UnorderedVariableDb::init(ctx.clone(), variable_config::("xtend", &ctx)) .await .unwrap(); - let m = db + // Apply A, retaining our Arc so we can extend it post-apply. + let a = db .new_batch() .write(key(0), Some(val(0))) .merkleize(&db, None) .await .unwrap(); - db.apply_batch(m).await.unwrap(); + db.apply_batch(Arc::clone(&a)).await.unwrap(); - db.flatten(); - let root_after_first = db.root(); + // Build B off A after A was applied. B's chain walks through A's layer and falls + // through to the committed bitmap (now post-A). B's merkleize must read consistent + // state from both sources. + let b = a + .new_batch::() + .write(key(1), Some(val(1))) + .merkleize(&db, None) + .await + .unwrap(); + db.apply_batch(b).await.unwrap(); - db.flatten(); - assert_eq!(db.root(), root_after_first); + assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0))); + assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1))); + + // Extend once more to lock in multi-generation behavior. + let c = db + .new_batch() + .write(key(2), Some(val(2))) + .merkleize(&db, None) + .await + .unwrap(); + db.apply_batch(c).await.unwrap(); + + assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0))); + assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1))); + assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2))); db.destroy().await.unwrap(); }); } - /// New batches built after flatten() produce correct roots and can be applied. + /// Build a child batch from a still-live parent whose apply was followed by a prune, then + /// merkleize and apply the child. The parent's `BitmapBatch` chain terminates in the shared + /// committed bitmap, and `prune` mutates that bitmap's pruning boundary in place. When the + /// child is constructed via `parent.new_batch()`, the internal `trim_committed` call must + /// observe the advanced boundary and produce a correct child chain; merkleize and apply must + /// then produce correct state for keys at and beyond the advanced floor. #[test_traced("INFO")] - fn test_flatten_then_new_batch() { + fn test_current_live_batch_child_after_prune() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let ctx = context.with_label("db"); - let mut db: UnorderedVariableDb = - UnorderedVariableDb::init(ctx.clone(), variable_config::("fl-then", &ctx)) - .await - .unwrap(); + let mut db: UnorderedVariableDb = UnorderedVariableDb::init( + ctx.clone(), + variable_config::("child-after-prune", &ctx), + ) + .await + .unwrap(); - // Apply a batch, flatten, then apply another. - let m = db - .new_batch() - .write(key(0), Some(val(0))) - .merkleize(&db, None) - .await - .unwrap(); - db.apply_batch(m).await.unwrap(); - db.flatten(); + // Seed enough ops to span multiple bitmap chunks. + let mut seed = db.new_batch(); + for i in 0u64..300 { + seed = seed.write(key(i), Some(val(i))); + } + let seed_m = seed.merkleize(&db, None).await.unwrap(); + db.apply_batch(seed_m).await.unwrap(); + db.commit().await.unwrap(); - let m = db - .new_batch() - .write(key(1), Some(val(1))) + // Overwrite keys 0..250 so the inactivity floor advances past chunk 0. + let mut a_batch = db.new_batch(); + for i in 0u64..250 { + a_batch = a_batch.write(key(i), Some(val(i + 10_000))); + } + let a = a_batch.merkleize(&db, None).await.unwrap(); + db.apply_batch(Arc::clone(&a)).await.unwrap(); + db.commit().await.unwrap(); + + // Prune while `a` is still live. Mutates the shared bitmap's pruning boundary in place. + db.prune(db.sync_boundary()).await.unwrap(); + + // Extend `a` into `b` AFTER the prune. Building `b` off `a` triggers + // `trim_committed` on `a`'s chain, which must correctly see the advanced pruning + // boundary on the shared bitmap. + let b = a + .new_batch::() + .write(key(300), Some(val(300))) .merkleize(&db, None) .await .unwrap(); - db.apply_batch(m).await.unwrap(); - assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0))); - assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1))); + db.apply_batch(b).await.unwrap(); + assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(10_000))); + assert_eq!(db.get(&key(249)).await.unwrap(), Some(val(10_249))); + assert_eq!(db.get(&key(300)).await.unwrap(), Some(val(300))); db.destroy().await.unwrap(); }); @@ -3315,7 +3396,6 @@ pub mod tests { db.prune(db.sync_boundary()).await.unwrap(); db.apply_batch(c_m).await.unwrap(); - db.flatten(); db.destroy().await.unwrap(); }); diff --git a/storage/src/qmdb/current/sync/mod.rs b/storage/src/qmdb/current/sync/mod.rs index c368772f106..6b5b34e0398 100644 --- a/storage/src/qmdb/current/sync/mod.rs +++ b/storage/src/qmdb/current/sync/mod.rs @@ -228,7 +228,7 @@ where let current_db = db::Db { any, - status: crate::qmdb::current::batch::BitmapBatch::Base(Arc::new(status)), + status: Arc::new(crate::qmdb::current::batch::SharedBitmap::new(status)), grafted_tree, metadata: AsyncMutex::new(metadata), thread_pool,