diff --git a/examples/sync/src/databases/immutable.rs b/examples/sync/src/databases/immutable.rs index 52b67bced0b..b1b55201e7c 100644 --- a/examples/sync/src/databases/immutable.rs +++ b/examples/sync/src/databases/immutable.rs @@ -22,7 +22,7 @@ use tracing::error; pub type Database = immutable::variable::Db; /// Operation type alias. -pub type Operation = immutable::variable::Operation; +pub type Operation = immutable::variable::Operation; /// Create a database configuration with appropriate partitioning for Immutable. pub fn create_config(context: &impl BufferPooler) -> Config> { @@ -74,12 +74,12 @@ pub fn create_test_operations(count: usize, seed: u64) -> Vec { operations.push(Operation::Set(key, value)); if (i + 1) % 10 == 0 { - operations.push(Operation::Commit(None)); + operations.push(Operation::Commit(None, Location::new(0))); } } // Always end with a commit - operations.push(Operation::Commit(Some(Sha256::fill(1)))); + operations.push(Operation::Commit(Some(Sha256::fill(1)), Location::new(0))); operations } @@ -110,8 +110,8 @@ where Operation::Set(key, value) => { batch = batch.set(key, value); } - Operation::Commit(metadata) => { - let merkleized = batch.merkleize(self, metadata); + Operation::Commit(metadata, floor) => { + let merkleized = batch.merkleize(self, metadata, floor); self.apply_batch(merkleized).await?; self.commit().await?; batch = self.new_batch(); @@ -130,7 +130,7 @@ where } async fn sync_boundary(&self) -> Location { - self.sync_boundary().await + self.sync_boundary() } fn historical_proof( diff --git a/storage/conformance.toml b/storage/conformance.toml index 5588241d38e..34c2ac27b28 100644 --- a/storage/conformance.toml +++ b/storage/conformance.toml @@ -208,7 +208,7 @@ hash = "ba4c1ce0c2975f27231e13920129ecd63ae173c10a6e481af388ada2383ea15e" ["commonware_storage::qmdb::conformance::ImmutableMmbVariableConf"] n_cases = 200 -hash = "aee77e198354d5cee1dae5e572cfad11248b0e8635fdc9c19245442e6785fa25" +hash = "d148ed1d4ce0fce52bfb24463e40f777bdc5915cbb7710d31e155d5b13237d1f" ["commonware_storage::qmdb::conformance::ImmutableMmrFixedConf"] n_cases = 200 @@ -216,7 +216,7 @@ hash = "6a25e6efa15b15bfd6f39e588fca23c1ec431eec1f148722d8a00491876c4252" ["commonware_storage::qmdb::conformance::ImmutableMmrVariableConf"] n_cases = 200 -hash = "f32fda4f6c0bbde62f8f1959edeb2ac67b28f5347bd108e4118e740aa1fd8fd1" +hash = "cafbb288de6f8d5c73ddb19df64d2e1429b4f2e273124cb1b20be3026c54d288" ["commonware_storage::qmdb::conformance::KeylessMmbFixedConf"] n_cases = 200 @@ -236,15 +236,15 @@ hash = "aa70f866ae6b4104e94c741f46806a430b2ce76e3af5798ba2fb23dff6d1fbd9" ["commonware_storage::qmdb::immutable::operation::fixed::tests::conformance::CodecConformance"] n_cases = 65536 -hash = "4f75cdf8952431729e7a3dfad38a8d5bfbb92bf6b54b1ca180a66745aed618d5" +hash = "1737c49c112fc9dfdc4dde3626d8ab08a9df8353b9a702e488d3a1fd8e9428dc" ["commonware_storage::qmdb::immutable::operation::variable::tests::conformance::CodecConformance"] n_cases = 65536 -hash = "cce5f888e506282f861e0e49e176b26c65f92e457f0c5f5353fc9b7196f07478" +hash = "bdee433c53489ee67a42ad2029081d3f233799bd360d6e5e7f29cbaec87c9064" ["commonware_storage::qmdb::immutable::operation::variable::tests::conformance::CodecConformance"] n_cases = 65536 -hash = "fdca5df62d243b28676ee15034663694cd219d5ef80749079126b0ed73effe0d" +hash = "7952a9bb3a9cec87a95af6dd96a5b88b535106f0241770e0bfdb8aeaf9421056" ["commonware_storage::qmdb::keyless::operation::tests::conformance::CodecConformance>>"] n_cases = 65536 diff --git a/storage/fuzz/fuzz_targets/qmdb_immutable.rs b/storage/fuzz/fuzz_targets/qmdb_immutable.rs index 5d290714064..a85d22755e2 100644 --- a/storage/fuzz/fuzz_targets/qmdb_immutable.rs +++ b/storage/fuzz/fuzz_targets/qmdb_immutable.rs @@ -40,6 +40,7 @@ enum ImmutableOperation { Commit { has_metadata: bool, metadata_size: usize, + advance_floor: bool, }, Prune { loc: u64, @@ -179,6 +180,7 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { ImmutableOperation::Commit { has_metadata, metadata_size, + advance_floor, } => { let metadata = if has_metadata { Some(generate_value(&mut rng, metadata_size)) @@ -186,9 +188,11 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { None }; + let end = db.bounds().await.end; + let pending_count = pending_sets.len() as u64; assign_pending_locations( &pending_sets, - db.bounds().await.end, + end, &mut keys_set, &mut set_locations, ); @@ -196,7 +200,15 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { for (k, v) in pending_sets.drain(..) { batch = batch.set(k, v); } - let merkleized = batch.merkleize(&db, metadata); + let floor = if advance_floor { + // Advance floor to the commit location (end of this batch). + // total_size = end + pending_count + 1 (commit op). + // Floor at the commit op is the maximum valid value. + Location::new(*end + pending_count) + } else { + db.inactivity_floor_loc() + }; + let merkleized = batch.merkleize(&db, metadata, floor); db.apply_batch(merkleized).await.unwrap(); db.commit().await.unwrap(); last_commit_loc = Some(db.bounds().await.end - 1); @@ -216,7 +228,10 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { for (k, v) in pending_sets.drain(..) { batch = batch.set(k, v); } - let merkleized = batch.merkleize(&db, None); + // Set the floor to at least safe_loc so the prune succeeds, + // but never below the current floor (monotonicity). + let floor = safe_loc.max(db.inactivity_floor_loc()); + let merkleized = batch.merkleize(&db, None, floor); db.apply_batch(merkleized).await.unwrap(); db.commit().await.unwrap(); last_commit_loc = Some(db.bounds().await.end - 1); @@ -247,7 +262,8 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { for (k, v) in pending_sets.drain(..) { batch = batch.set(k, v); } - let merkleized = batch.merkleize(&db, None); + let floor = db.inactivity_floor_loc(); + let merkleized = batch.merkleize(&db, None, floor); db.apply_batch(merkleized).await.unwrap(); db.commit().await.unwrap(); last_commit_loc = Some(db.bounds().await.end - 1); @@ -272,7 +288,8 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { let safe_max_ops = NonZeroU64::new((max_ops % MAX_PROOF_OPS).max(1)).unwrap(); - let batch = db.new_batch().merkleize(&db, None); + let floor = db.inactivity_floor_loc(); + let batch = db.new_batch().merkleize(&db, None, floor); db.apply_batch(batch).await.unwrap(); db.commit().await.unwrap(); last_commit_loc = Some(db.bounds().await.end - 1); @@ -307,7 +324,8 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { for (k, v) in pending_sets.drain(..) { batch = batch.set(k, v); } - let merkleized = batch.merkleize(&db, None); + let floor = db.inactivity_floor_loc(); + let merkleized = batch.merkleize(&db, None, floor); db.apply_batch(merkleized).await.unwrap(); db.commit().await.unwrap(); last_commit_loc = Some(db.bounds().await.end - 1); @@ -326,7 +344,8 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { for (k, v) in pending_sets.drain(..) { batch = batch.set(k, v); } - let merkleized = batch.merkleize(&db, None); + let floor = db.inactivity_floor_loc(); + let merkleized = batch.merkleize(&db, None, floor); db.apply_batch(merkleized).await.unwrap(); db.destroy().await.unwrap(); } diff --git a/storage/src/qmdb/conformance.rs b/storage/src/qmdb/conformance.rs index 0f1325d6094..22b4c12fcfd 100644 --- a/storage/src/qmdb/conformance.rs +++ b/storage/src/qmdb/conformance.rs @@ -237,11 +237,12 @@ async fn apply_writes>( /// Apply a batch of immutable sets to the database. macro_rules! apply_sets { ($db:ident, $ops:expr) => {{ + let floor = $db.inactivity_floor_loc(); let mut batch = $db.new_batch(); for (k, v) in $ops { batch = batch.set(k, v); } - let merkleized = batch.merkleize(&$db, None); + let merkleized = batch.merkleize(&$db, None, floor); $db.apply_batch(merkleized).await.unwrap(); }}; } @@ -641,18 +642,20 @@ macro_rules! assert_immutable_order_independent { ops.push((colliding_digest(0xCD, i), to_val(i, 100))); } + let fwd_floor = $fwd.inactivity_floor_loc(); let mut batch = $fwd.new_batch(); for &(k, v) in &ops { batch = batch.set(k, v); } - let merkleized = batch.merkleize(&$fwd, None); + let merkleized = batch.merkleize(&$fwd, None, fwd_floor); $fwd.apply_batch(merkleized).await.unwrap(); + let rev_floor = $rev.inactivity_floor_loc(); let mut batch = $rev.new_batch(); for &(k, v) in ops.iter().rev() { batch = batch.set(k, v); } - let merkleized = batch.merkleize(&$rev, None); + let merkleized = batch.merkleize(&$rev, None, rev_floor); $rev.apply_batch(merkleized).await.unwrap(); assert_eq!( diff --git a/storage/src/qmdb/immutable/batch.rs b/storage/src/qmdb/immutable/batch.rs index 78af34c657f..76116043e55 100644 --- a/storage/src/qmdb/immutable/batch.rs +++ b/storage/src/qmdb/immutable/batch.rs @@ -44,7 +44,7 @@ where H: CHasher, { /// Authenticated journal batch for computing the speculative Merkle root. - journal_batch: authenticated::UnmerkleizedBatch>, + journal_batch: authenticated::UnmerkleizedBatch>, /// Pending mutations. mutations: BTreeMap, @@ -65,7 +65,7 @@ where #[derive(Clone)] pub struct MerkleizedBatch { /// Authenticated journal batch (Merkle state + local items). - pub(super) journal_batch: Arc>>, + pub(super) journal_batch: Arc>>, /// This batch's local key-level changes only (not accumulated from ancestors). /// Sorted by key with no duplicates; queried via `lookup_sorted` (binary search). @@ -92,6 +92,9 @@ pub struct MerkleizedBatch { /// 1:1 with `ancestor_diffs`: `ancestor_diff_ends[i]` is the boundary for /// `ancestor_diffs[i]`. A batch is committed when `ancestor_diff_ends[i] <= db_size`. pub(super) ancestor_diff_ends: Vec, + + /// The inactivity floor declared by this batch's commit operation. + pub(super) new_inactivity_floor_loc: Location, } impl UnmerkleizedBatch @@ -100,7 +103,7 @@ where K: Key, V: ValueEncoding, H: CHasher, - Operation: EncodeShared, + Operation: EncodeShared, { /// Create a batch from a committed DB (no parent chain). pub(super) fn new( @@ -109,7 +112,7 @@ where ) -> Self where E: Context, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, T: Translator, { @@ -139,7 +142,7 @@ where ) -> Result, Error> where E: Context, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, T: Translator, { @@ -164,14 +167,18 @@ where } /// Resolve mutations into operations, merkleize, and return an `Arc`. + /// + /// `inactivity_floor` declares that all operations before this location are inactive. + /// It must be >= the database's current inactivity floor (monotonically non-decreasing). pub fn merkleize( self, db: &Immutable, metadata: Option, + inactivity_floor: Location, ) -> Arc> where E: Context, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, T: Translator, { @@ -179,7 +186,7 @@ where // Build operations: one Set per key, then Commit. `self.mutations` is a BTreeMap, so // iteration yields keys in sorted order, which `diff` relies on for binary search. - let mut ops: Vec> = Vec::with_capacity(self.mutations.len() + 1); + let mut ops: Vec> = Vec::with_capacity(self.mutations.len() + 1); let mut diff: DiffVec = Vec::with_capacity(self.mutations.len()); for (key, value) in self.mutations { @@ -189,7 +196,7 @@ where } debug_assert!(diff.is_sorted_by(|a, b| a.0 < b.0)); - ops.push(Operation::Commit(metadata)); + ops.push(Operation::Commit(metadata, inactivity_floor)); let total_size = base + ops.len() as u64; @@ -220,13 +227,14 @@ where db_size: self.db_size, ancestor_diffs, ancestor_diff_ends, + new_inactivity_floor_loc: inactivity_floor, }) } } impl MerkleizedBatch where - Operation: EncodeShared, + Operation: EncodeShared, { /// Return the speculative root. pub fn root(&self) -> D { @@ -251,7 +259,7 @@ where ) -> Result, Error> where E: Context, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, H: CHasher, T: Translator, @@ -292,7 +300,7 @@ where E: Context, K: Key, V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, H: CHasher, T: Translator, @@ -309,6 +317,7 @@ where db_size: journal_size, ancestor_diffs: Vec::new(), ancestor_diff_ends: Vec::new(), + new_inactivity_floor_loc: self.inactivity_floor_loc, }) } } diff --git a/storage/src/qmdb/immutable/fixed.rs b/storage/src/qmdb/immutable/fixed.rs index aaabe9b8655..e4ca76514ed 100644 --- a/storage/src/qmdb/immutable/fixed.rs +++ b/storage/src/qmdb/immutable/fixed.rs @@ -20,13 +20,14 @@ use commonware_runtime::{Clock, Metrics, Storage}; use commonware_utils::Array; /// Type alias for a fixed-size operation. -pub type Operation = BaseOperation>; +pub type Operation = BaseOperation>; /// Type alias for the fixed-size immutable database. pub type Db = - Immutable, fixed::Journal>, H, T>; + Immutable, fixed::Journal>, H, T>; -type Journal = authenticated::Journal>, H>; +type Journal = + authenticated::Journal>, H>; /// Configuration for a fixed-size immutable authenticated db. pub type Config = BaseConfig; @@ -47,7 +48,7 @@ impl< context.clone(), cfg.merkle_config, cfg.log, - Operation::::is_commit, + Operation::::is_commit, ) .await?; Self::init_from_journal(journal, context, cfg.translator).await @@ -225,10 +226,10 @@ mod tests { } #[test_traced("INFO")] - fn test_fixed_prune_beyond_commit() { + fn test_fixed_prune_beyond_floor() { let executor = deterministic::Runner::default(); executor.start(|ctx| async move { - test::test_immutable_prune_beyond_commit(ctx, open::).await; + test::test_immutable_prune_beyond_floor(ctx, open::).await; }); } @@ -486,10 +487,10 @@ mod tests { } #[test_traced("INFO")] - fn test_fixed_prune_beyond_commit_mmb() { + fn test_fixed_prune_beyond_floor_mmb() { let executor = deterministic::Runner::default(); executor.start(|ctx| async move { - test::test_immutable_prune_beyond_commit(ctx, open::).await; + test::test_immutable_prune_beyond_floor(ctx, open::).await; }); } @@ -671,4 +672,136 @@ mod tests { .await; }); } + + #[test_traced("INFO")] + fn test_fixed_inactivity_floor_tracking() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_inactivity_floor_tracking(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_floor_monotonicity() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_floor_monotonicity(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_floor_monotonicity_violation() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_floor_monotonicity_violation(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_floor_beyond_size() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_floor_beyond_size(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_rewind_restores_floor() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_rewind_restores_floor(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_inactivity_floor_tracking_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_inactivity_floor_tracking(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_floor_monotonicity_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_floor_monotonicity(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_floor_monotonicity_violation_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_floor_monotonicity_violation(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_floor_beyond_size_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_floor_beyond_size(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_rewind_restores_floor_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_rewind_restores_floor(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_single_commit_live_set() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_single_commit_live_set(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_single_commit_live_set_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_single_commit_live_set(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_rewind_after_reopen_with_floor_change() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_rewind_after_reopen_with_floor_change(ctx, open::) + .await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_rewind_after_reopen_with_floor_change_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_rewind_after_reopen_with_floor_change(ctx, open::) + .await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_rewind_after_reopen_partial_floor_gap() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_rewind_after_reopen_partial_floor_gap(ctx, open::) + .await; + }); + } + + #[test_traced("INFO")] + fn test_fixed_rewind_after_reopen_partial_floor_gap_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_rewind_after_reopen_partial_floor_gap(ctx, open::) + .await; + }); + } } diff --git a/storage/src/qmdb/immutable/mod.rs b/storage/src/qmdb/immutable/mod.rs index c8b00424407..4fe021a128b 100644 --- a/storage/src/qmdb/immutable/mod.rs +++ b/storage/src/qmdb/immutable/mod.rs @@ -5,30 +5,50 @@ //! - [fixed]: For fixed-size values. //! - [variable]: For variable-size values. //! +//! # Inactivity floor +//! +//! Each commit carries an inactivity floor: a location before which the application +//! declares operations are no longer needed. The floor is embedded in the operation +//! log and included in the Merkle root, so all replicas processing the same operations +//! arrive at the same floor. +//! +//! The floor controls two things: +//! - **Pruning**: [`Immutable::prune`] only allows pruning up to the floor. +//! - **Reconstruction**: on restart or sync, the snapshot is rebuilt from the floor +//! onward. Keys set before the floor are not loaded into memory. +//! +//! The floor must be monotonically non-decreasing across commits and must not exceed +//! the batch's total operation count. Pass `db.inactivity_floor_loc()` to keep the +//! floor unchanged, or a higher value to advance it. +//! //! # Examples //! //! ```ignore //! // Simple mode: apply a batch, then durably commit it. +//! // The third argument to merkleize is the inactivity floor -- operations +//! // before this location are declared inactive by the application. +//! let floor = db.inactivity_floor_loc(); //! let merkleized = db.new_batch() //! .set(key, value) -//! .merkleize(&db, None); +//! .merkleize(&db, None, floor); //! db.apply_batch(merkleized).await?; //! db.commit().await?; //! ``` //! //! ```ignore //! // Batches can still fork before you apply them. +//! let floor = db.inactivity_floor_loc(); //! let parent = db.new_batch() //! .set(key_a, value_a) -//! .merkleize(&db, None); +//! .merkleize(&db, None, floor); //! //! let child_a = parent.new_batch::() //! .set(key_b, value_b) -//! .merkleize(&db, None); +//! .merkleize(&db, None, floor); //! //! let child_b = parent.new_batch::() //! .set(key_c, value_c) -//! .merkleize(&db, None); +//! .merkleize(&db, None, floor); //! //! db.apply_batch(child_a).await?; //! db.commit().await?; @@ -37,16 +57,17 @@ //! ```ignore //! // Advanced mode: while the previous batch is being committed, build exactly //! // one child batch from the newly published state. +//! let floor = db.inactivity_floor_loc(); //! let parent = db.new_batch() //! .set(key_a, value_a) -//! .merkleize(&db, None); +//! .merkleize(&db, None, floor); //! db.apply_batch(parent).await?; //! //! let (child, commit_result) = futures::join!( //! async { //! db.new_batch() //! .set(key_b, value_b) -//! .merkleize(&db, None) +//! .merkleize(&db, None, floor) //! }, //! db.commit(), //! ); @@ -64,7 +85,12 @@ use crate::{ Error as JournalError, }, merkle::{journaled::Config as MmrConfig, Family, Location, Proof}, - qmdb::{any::ValueEncoding, build_snapshot_from_log, delete_known_loc, operation::Key, Error}, + qmdb::{ + any::ValueEncoding, + build_snapshot_from_log, + operation::{Key, Operation as _}, + Error, + }, translator::Translator, Context, Persistable, }; @@ -108,7 +134,7 @@ pub struct Immutable< E: Context, K: Key, V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: CHasher, T: Translator, > where @@ -126,6 +152,10 @@ pub struct Immutable< /// The location of the last commit operation. pub(crate) last_commit_loc: Location, + + /// The inactivity floor declared by the last committed batch. + /// Operations before this location are considered inactive by the application. + pub(crate) inactivity_floor_loc: Location, } // Shared read-only functionality. @@ -135,7 +165,7 @@ where E: Context, K: Key, V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, H: CHasher, T: Translator, @@ -151,37 +181,51 @@ where ) -> Result> { if journal.size().await == 0 { warn!("Authenticated log is empty, initialized new db."); - journal.append(&Operation::Commit(None)).await?; + journal + .append(&Operation::Commit(None, Location::new(0))) + .await?; journal.sync().await?; } let mut snapshot = Index::new(context.with_label("snapshot"), translator); - let last_commit_loc = { - // Get the start of the log. + let (last_commit_loc, inactivity_floor_loc) = { let reader = journal.journal.reader().await; - let start_loc = Location::new(reader.bounds().start); - - // Build snapshot from the log. - build_snapshot_from_log::(start_loc, &reader, &mut snapshot, |_, _| {}) - .await?; - - Location::new( - reader - .bounds() - .end - .checked_sub(1) - .expect("commit should exist"), + let bounds = reader.bounds(); + let last_commit_loc = + Location::new(bounds.end.checked_sub(1).expect("commit should exist")); + + // Read the floor from the last commit operation. + let last_op = reader.read(*last_commit_loc).await?; + let inactivity_floor_loc = last_op + .has_floor() + .expect("last operation should be a commit with floor"); + + // Replay the log from the inactivity floor to build the snapshot. + build_snapshot_from_log::( + inactivity_floor_loc, + &reader, + &mut snapshot, + |_, _| {}, ) + .await?; + + (last_commit_loc, inactivity_floor_loc) }; Ok(Self { journal, snapshot, last_commit_loc, + inactivity_floor_loc, }) } + /// Return the inactivity floor location declared by the last committed batch. + pub const fn inactivity_floor_loc(&self) -> Location { + self.inactivity_floor_loc + } + /// Return the Location of the next operation appended to this db. pub async fn size(&self) -> Location { self.bounds().await.end @@ -194,13 +238,11 @@ where Location::new(bounds.start)..Location::new(bounds.end) } - /// Return the most recent location from which this database can safely be synced. - /// - /// Immutable databases have no inactivity concept; this returns the oldest retained - /// operation. Callers constructing a sync [`Target`](crate::qmdb::sync::Target) may use this - /// value or any later location as `range.start`. - pub async fn sync_boundary(&self) -> Location { - self.bounds().await.start + /// Return the most recent location from which this database can safely be synced, and the + /// upper bound on [`Self::prune`]'s `loc`. For immutable databases, this equals the + /// inactivity floor declared by the last committed batch. + pub const fn sync_boundary(&self) -> Location { + self.inactivity_floor_loc } /// Get the value of `key` in the db, or None if it has no value or its corresponding operation @@ -225,7 +267,7 @@ where /// [`crate::qmdb::Error::OperationPruned`] if loc precedes the oldest retained location. The /// location is otherwise assumed valid. async fn get_from_loc( - reader: &impl Reader>, + reader: &impl Reader>, key: &K, loc: Location, ) -> Result, Error> { @@ -247,7 +289,7 @@ where /// Get the metadata associated with the last commit. pub async fn get_metadata(&self) -> Result, Error> { let last_commit_loc = self.last_commit_loc; - let Operation::Commit(metadata) = self + let Operation::Commit(metadata, _floor) = self .journal .journal .reader() @@ -276,7 +318,7 @@ where op_count: Location, start_loc: Location, max_ops: NonZeroU64, - ) -> Result<(Proof, Vec>), Error> { + ) -> Result<(Proof, Vec>), Error> { Ok(self .journal .historical_proof(op_count, start_loc, max_ops) @@ -293,7 +335,7 @@ where &self, start_index: Location, max_ops: NonZeroU64, - ) -> Result<(Proof, Vec>), Error> { + ) -> Result<(Proof, Vec>), Error> { let op_count = self.bounds().await.end; self.historical_proof(op_count, start_index, max_ops).await } @@ -301,13 +343,22 @@ where /// Prune operations prior to `prune_loc`. This does not affect the db's root, but it will /// affect retrieval of any keys that were set prior to `prune_loc`. /// + /// Pruning is irreversible. Callers must ensure any floor-raising batch has been durably + /// committed (via [`Immutable::commit`] or [`Immutable::sync`]) before pruning. The + /// inactivity floor used to gate pruning is updated by [`Immutable::apply_batch`] before + /// the batch is durable. If the batch is lost on crash, recovery replays from the prior + /// durable floor, which may reference data that has already been pruned. + /// /// # Errors /// - /// - Returns [Error::PruneBeyondMinRequired] if `prune_loc` > last commit location. + /// - Returns [Error::PruneBeyondMinRequired] if `prune_loc` > inactivity floor. /// - Returns [crate::merkle::Error::LocationOverflow] if `prune_loc` > [crate::merkle::Family::MAX_LEAVES]. pub async fn prune(&mut self, loc: Location) -> Result<(), Error> { - if loc > self.last_commit_loc { - return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc)); + if loc > self.inactivity_floor_loc { + return Err(Error::PruneBeyondMinRequired( + loc, + self.inactivity_floor_loc, + )); } self.journal.prune(loc).await?; @@ -345,7 +396,7 @@ where ))); } - let (rewind_last_loc, rewound_sets) = { + let (rewind_last_loc, rewind_floor, rewound_keys) = { let reader = self.journal.reader().await; let bounds = reader.bounds(); let rewind_last_loc = Location::new(rewind_size - 1); @@ -355,30 +406,54 @@ where ))); } let rewind_last_op = reader.read(*rewind_last_loc).await?; - if !matches!(rewind_last_op, Operation::Commit(_)) { + let Operation::Commit(_, rewind_floor) = &rewind_last_op else { return Err(Error::UnexpectedData(rewind_last_loc)); + }; + let rewind_floor = *rewind_floor; + if *rewind_floor < bounds.start { + return Err(Error::Journal(crate::journal::Error::ItemPruned( + *rewind_floor, + ))); } - // Immutable operations do not have an inactivity floor (`Operation::has_floor()` - // is always `None` here). Rewind validity is determined by retained-location bounds - // and commit-target checks. Duplicate keys are unsupported and undefined behavior. - let mut rewound_sets = Vec::new(); + let mut rewound_keys = Vec::new(); for loc in rewind_size..current_size { if let Operation::Set(key, _) = reader.read(loc).await? { - rewound_sets.push((Location::new(loc), key)); + rewound_keys.push(key); } } - (rewind_last_loc, rewound_sets) + (rewind_last_loc, rewind_floor, rewound_keys) }; + let old_floor = self.inactivity_floor_loc; + // Journal rewind happens before in-memory snapshot updates. If a later step fails, this // handle may be internally diverged and must be dropped by the caller. self.journal.rewind(rewind_size).await?; - for (loc, key) in rewound_sets { - delete_known_loc(&mut self.snapshot, &key, loc); + + // Remove suffix keys from the snapshot. After reopen, the snapshot may + // have been rebuilt from a higher floor, so some suffix keys might not + // be present -- use remove() which is tolerant of missing keys. + for key in &rewound_keys { + self.snapshot.remove(key); } + + // If the rewind target has a lower floor than the current snapshot was + // built from, insert keys from the gap [rewind_floor, old_floor) that + // were excluded by the higher-floor reconstruction. + if rewind_floor < old_floor { + let reader = self.journal.journal.reader().await; + let gap_end = core::cmp::min(*old_floor, rewind_size); + for loc in *rewind_floor..gap_end { + if let Operation::Set(key, _) = reader.read(loc).await? { + self.snapshot.insert(&key, Location::new(loc)); + } + } + } + self.last_commit_loc = rewind_last_loc; + self.inactivity_floor_loc = rewind_floor; Ok(()) } @@ -437,6 +512,16 @@ where /// /// Returns the range of locations written. /// + /// # Errors + /// + /// - [`Error::StaleBatch`] if the batch was created from a stale DB state. + /// - [`Error::FloorRegressed`] if the batch's inactivity floor is below the + /// database's current floor. + /// - [`Error::FloorBeyondSize`] if the batch's inactivity floor is at or + /// beyond its total operation count. The maximum valid floor is + /// `total_size - 1` (the commit operation's location); a floor equal to + /// `total_size` would permit pruning the commit itself. + /// /// This publishes the batch to the in-memory database state and appends it to the /// journal, but does not durably commit it. Call [`Immutable::commit`] or /// [`Immutable::sync`] to guarantee durability. @@ -455,6 +540,18 @@ where batch_base_size: batch.base_size, }); } + if batch.new_inactivity_floor_loc < self.inactivity_floor_loc { + return Err(Error::FloorRegressed( + batch.new_inactivity_floor_loc, + self.inactivity_floor_loc, + )); + } + if batch.new_inactivity_floor_loc >= Location::new(batch.total_size) { + return Err(Error::FloorBeyondSize( + batch.new_inactivity_floor_loc, + Location::new(batch.total_size), + )); + } let start_loc = Location::new(db_size); // Apply journal. @@ -483,6 +580,7 @@ where // Update state. self.last_commit_loc = Location::new(batch.total_size - 1); + self.inactivity_floor_loc = batch.new_inactivity_floor_loc; Ok(start_loc..Location::new(batch.total_size)) } } @@ -515,13 +613,14 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let db = open_db(context.with_label("first")).await; let bounds = db.bounds().await; assert_eq!(bounds.end, 1); assert_eq!(bounds.start, Location::new(0)); + assert_eq!(db.inactivity_floor_loc(), Location::new(0)); assert!(db.get_metadata().await.unwrap().is_none()); // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op. @@ -538,7 +637,7 @@ pub(super) mod test { assert_eq!(db.bounds().await.end, 1); // Test calling commit on an empty db which should make it (durably) non-empty. - db.apply_batch(db.new_batch().merkleize(&db, None)) + db.apply_batch(db.new_batch().merkleize(&db, None, Location::new(0))) .await .unwrap(); db.commit().await.unwrap(); @@ -559,7 +658,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { // Build a db with 2 keys. @@ -575,9 +674,13 @@ pub(super) mod test { // Set and commit the first key. let metadata = Some(Sha256::fill(99u8)); - db.apply_batch(db.new_batch().set(k1, v1).merkleize(&db, metadata)) - .await - .unwrap(); + db.apply_batch( + db.new_batch() + .set(k1, v1) + .merkleize(&db, metadata, Location::new(0)), + ) + .await + .unwrap(); db.commit().await.unwrap(); assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1); assert!(db.get(&k2).await.unwrap().is_none()); @@ -585,9 +688,13 @@ pub(super) mod test { assert_eq!(db.get_metadata().await.unwrap(), Some(Sha256::fill(99u8))); // Set and commit the second key. - db.apply_batch(db.new_batch().set(k2, v2).merkleize(&db, None)) - .await - .unwrap(); + db.apply_batch( + db.new_batch() + .set(k2, v2) + .merkleize(&db, None, Location::new(0)), + ) + .await + .unwrap(); db.commit().await.unwrap(); assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1); assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2); @@ -626,16 +733,20 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("first")).await; let k1 = Sha256::fill(1u8); let v1 = Sha256::fill(10u8); - db.apply_batch(db.new_batch().set(k1, v1).merkleize(&db, None)) - .await - .unwrap(); + db.apply_batch( + db.new_batch() + .set(k1, v1) + .merkleize(&db, None, Location::new(0)), + ) + .await + .unwrap(); db.commit().await.unwrap(); let (proof, ops) = db.proof(Location::new(0), NZU64!(100)).await.unwrap(); @@ -653,7 +764,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("first")).await; @@ -661,7 +772,8 @@ pub(super) mod test { for i in 0..20u8 { let key = Sha256::fill(i); let value = Sha256::fill(i.wrapping_add(100)); - db.apply_batch(db.new_batch().set(key, value).merkleize(&db, None)) + let floor = db.bounds().await.end; + db.apply_batch(db.new_batch().set(key, value).merkleize(&db, None, floor)) .await .unwrap(); db.commit().await.unwrap(); @@ -694,7 +806,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("first")).await; @@ -706,11 +818,14 @@ pub(super) mod test { let v2 = Sha256::fill(12u8); let v3 = Sha256::fill(13u8); - let parent = db.new_batch().set(k1, v1).merkleize(&db, None); + let parent = db + .new_batch() + .set(k1, v1) + .merkleize(&db, None, Location::new(0)); let child = parent .new_batch::() .set(k2, v2) - .merkleize(&db, None); + .merkleize(&db, None, Location::new(0)); assert_eq!(child.get(&k1, &db).await.unwrap(), Some(v1)); assert_eq!(child.get(&k2, &db).await.unwrap(), Some(v2)); @@ -722,9 +837,13 @@ pub(super) mod test { assert_eq!(db.get(&k1).await.unwrap(), Some(v1)); assert_eq!(db.get(&k2).await.unwrap(), Some(v2)); - db.apply_batch(db.new_batch().set(k3, v3).merkleize(&db, None)) - .await - .unwrap(); + db.apply_batch( + db.new_batch() + .set(k3, v3) + .merkleize(&db, None, Location::new(0)), + ) + .await + .unwrap(); db.commit().await.unwrap(); assert_eq!(db.get(&k3).await.unwrap(), Some(v3)); @@ -738,7 +857,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { // Build a db with `ELEMENTS` key/value pairs and prove ranges over them. @@ -751,7 +870,7 @@ pub(super) mod test { let v = Sha256::fill(i as u8); batch = batch.set(k, v); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, Location::new(0)); db.apply_batch(merkleized).await.unwrap(); db.commit().await.unwrap(); assert_eq!(db.bounds().await.end, 2_000 + 2); @@ -787,7 +906,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { // Insert 1000 keys then sync. @@ -800,21 +919,21 @@ pub(super) mod test { let v = Sha256::fill(i as u8); batch = batch.set(k, v); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, Location::new(0)); db.apply_batch(merkleized).await.unwrap(); db.commit().await.unwrap(); assert_eq!(db.bounds().await.end, ELEMENTS + 2); db.sync().await.unwrap(); let halfway_root = db.root(); - // Insert another 1000 keys then commit. + // Insert another 1000 keys (different from the first batch) then commit. let mut batch = db.new_batch(); - for i in 0u64..ELEMENTS { + for i in ELEMENTS..ELEMENTS * 2 { let k = Sha256::hash(&i.to_be_bytes()); let v = Sha256::fill(i as u8); batch = batch.set(k, v); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, Location::new(0)); db.apply_batch(merkleized).await.unwrap(); db.commit().await.unwrap(); drop(db); // Drop before syncing @@ -842,7 +961,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("first")).await; @@ -850,9 +969,13 @@ pub(super) mod test { // Insert a single key and then commit to create a first commit point. let k1 = Sha256::fill(1u8); let v1 = Sha256::fill(3u8); - db.apply_batch(db.new_batch().set(k1, v1).merkleize(&db, None)) - .await - .unwrap(); + db.apply_batch( + db.new_batch() + .set(k1, v1) + .merkleize(&db, None, Location::new(0)), + ) + .await + .unwrap(); db.commit().await.unwrap(); let first_commit_root = db.root(); @@ -876,7 +999,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { // Build a db with `ELEMENTS` key/value pairs then prune some of them. @@ -899,7 +1022,10 @@ pub(super) mod test { let v = Sha256::fill(i as u8); batch = batch.set(k, v); } - let merkleized = batch.merkleize(&db, None); + // The inactivity floor must cover both prune targets in this test. + // Second prune request is at ELEMENTS / 2 + ITEMS_PER_SECTION * 2 - 1. + let inactivity_floor = Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION * 2 - 1); + let merkleized = batch.merkleize(&db, None, inactivity_floor); db.apply_batch(merkleized).await.unwrap(); assert_eq!(db.bounds().await.end, ELEMENTS + 2); @@ -953,13 +1079,14 @@ pub(super) mod test { Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION) ); - // Try to fetch a pruned key (at location oldest_retained - 3). - let pruned_key = sorted_keys[*oldest_retained_loc as usize - 4]; - assert!(db.get(&pruned_key).await.unwrap().is_none()); + // Try to fetch a key before the inactivity floor (not in snapshot after reopen). + let floor_val = ELEMENTS / 2 + ITEMS_PER_SECTION * 2 - 1; + let inactive_key = sorted_keys[floor_val as usize - 2]; + assert!(db.get(&inactive_key).await.unwrap().is_none()); - // Try to fetch unpruned key (at location oldest_retained). - let unpruned_key = sorted_keys[*oldest_retained_loc as usize - 1]; - assert!(db.get(&unpruned_key).await.unwrap().is_some()); + // Try to fetch a key at the inactivity floor (in snapshot after reopen). + let active_key = sorted_keys[floor_val as usize - 1]; + assert!(db.get(&active_key).await.unwrap().is_some()); // Confirm behavior of trying to create a proof of pruned items is as expected. let pruned_pos = ELEMENTS / 2; @@ -973,23 +1100,23 @@ pub(super) mod test { db.destroy().await.unwrap(); } - pub(crate) async fn test_immutable_prune_beyond_commit( + pub(crate) async fn test_immutable_prune_beyond_floor( context: deterministic::Context, open_db: impl Fn( deterministic::Context, ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("test")).await; - // Test pruning empty database (no commits) + // Test pruning empty database (floor=0, so prune(1) fails) let result = db.prune(Location::new(1)).await; assert!( - matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc)) - if prune_loc == Location::new(1) && commit_loc == Location::new(0)) + matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, floor)) + if prune_loc == Location::new(1) && floor == Location::new(0)) ); // Add key-value pairs and commit @@ -1000,27 +1127,37 @@ pub(super) mod test { let v2 = Sha256::fill(2u8); let v3 = Sha256::fill(3u8); - db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize(&db, None)) - .await - .unwrap(); + // First batch with floor=3 (the commit location). + db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize( + &db, + None, + Location::new(3), + )) + .await + .unwrap(); // op_count is 4 (initial_commit, k1, k2, commit), last_commit is at location 3 assert_eq!(*db.last_commit_loc, 3); - db.apply_batch(db.new_batch().set(k3, v3).merkleize(&db, None)) - .await - .unwrap(); + // Second batch with floor=5 (the new commit location). + db.apply_batch( + db.new_batch() + .set(k3, v3) + .merkleize(&db, None, Location::new(5)), + ) + .await + .unwrap(); - // Test valid prune (at previous commit location 3) + // Test valid prune (3 <= floor of 5) assert!(db.prune(Location::new(3)).await.is_ok()); - // Test pruning beyond last commit - let new_last_commit = db.last_commit_loc; - let beyond = new_last_commit + 1; + // Test pruning beyond inactivity floor + let floor = db.inactivity_floor_loc(); + let beyond = floor + 1; let result = db.prune(beyond).await; assert!( - matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc)) - if prune_loc == beyond && commit_loc == new_last_commit) + matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, f)) + if prune_loc == beyond && f == floor) ); db.destroy().await.unwrap(); @@ -1033,14 +1170,31 @@ pub(super) mod test { ) -> Range> where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, + C::Item: EncodeShared, + { + commit_sets_with_floor(db, sets, metadata, Location::new(0)).await + } + + async fn commit_sets_with_floor( + db: &mut TestDb, + sets: impl IntoIterator, + metadata: Option, + floor: Location, + ) -> Range> + where + V: ValueEncoding, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut batch = db.new_batch(); for (key, value) in sets { batch = batch.set(key, value); } - let range = db.apply_batch(batch.merkleize(db, metadata)).await.unwrap(); + let range = db + .apply_batch(batch.merkleize(db, metadata, floor)) + .await + .unwrap(); db.commit().await.unwrap(); range } @@ -1052,7 +1206,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1117,7 +1271,7 @@ pub(super) mod test { -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_small_sections_db(context.with_label("db")).await; @@ -1137,13 +1291,17 @@ pub(super) mod test { "failed to prune enough history for rewind test" ); - commit_sets( + // Floor must be >= last_commit_loc for prune to succeed. + // With 16 sets, commit is at current end + 16. + let floor = Location::new(*db.bounds().await.end + 16); + commit_sets_with_floor( &mut db, (0u64..16).map(|i| { let seed = round * 100 + i; (Sha256::hash(&seed.to_be_bytes()), Sha256::fill(seed as u8)) }), None, + floor, ) .await; db.prune(db.last_commit_loc).await.unwrap(); @@ -1180,7 +1338,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1188,9 +1346,13 @@ pub(super) mod test { // Pre-populate with key A. let key_a = Sha256::hash(&0u64.to_be_bytes()); let val_a = Sha256::fill(1u8); - db.apply_batch(db.new_batch().set(key_a, val_a).merkleize(&db, None)) - .await - .unwrap(); + db.apply_batch( + db.new_batch() + .set(key_a, val_a) + .merkleize(&db, None, Location::new(0)), + ) + .await + .unwrap(); // batch.get(&A) should return DB value. let mut batch = db.new_batch(); @@ -1217,7 +1379,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let db = open_db(context.with_label("db")).await; @@ -1226,7 +1388,7 @@ pub(super) mod test { let key_a = Sha256::hash(&0u64.to_be_bytes()); let val_a = Sha256::fill(10u8); let parent = db.new_batch().set(key_a, val_a); - let parent_m = parent.merkleize(&db, None); + let parent_m = parent.merkleize(&db, None, Location::new(0)); // Child reads parent's A. let mut child = parent_m.new_batch::(); @@ -1253,7 +1415,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1274,14 +1436,14 @@ pub(super) mod test { for (k, v) in &kvs_first { parent = parent.set(*k, *v); } - let parent_m = parent.merkleize(&db, None); + let parent_m = parent.merkleize(&db, None, Location::new(0)); // Child batch: set keys 5..10. let mut child = parent_m.new_batch::(); for (k, v) in &kvs_second { child = child.set(*k, *v); } - let child_m = child.merkleize(&db, None); + let child_m = child.merkleize(&db, None, Location::new(0)); let expected_root = child_m.root(); db.apply_batch(child_m).await.unwrap(); @@ -1303,7 +1465,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1313,7 +1475,7 @@ pub(super) mod test { let k = Sha256::hash(&[i]); batch = batch.set(k, Sha256::fill(i)); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, Location::new(0)); let speculative = merkleized.root(); db.apply_batch(merkleized).await.unwrap(); @@ -1324,7 +1486,7 @@ pub(super) mod test { let mut batch = db.new_batch(); let k = Sha256::hash(&[0xAA]); batch = batch.set(k, Sha256::fill(0xAA)); - let merkleized = batch.merkleize(&db, metadata); + let merkleized = batch.merkleize(&db, metadata, Location::new(0)); let speculative = merkleized.root(); db.apply_batch(merkleized).await.unwrap(); assert_eq!(db.root(), speculative); @@ -1340,7 +1502,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1348,14 +1510,21 @@ pub(super) mod test { // Pre-populate base DB. let key_a = Sha256::hash(&0u64.to_be_bytes()); let val_a = Sha256::fill(10u8); - db.apply_batch(db.new_batch().set(key_a, val_a).merkleize(&db, None)) - .await - .unwrap(); + db.apply_batch( + db.new_batch() + .set(key_a, val_a) + .merkleize(&db, None, Location::new(0)), + ) + .await + .unwrap(); // Create a merkleized batch with a new key. let key_b = Sha256::hash(&1u64.to_be_bytes()); let val_b = Sha256::fill(20u8); - let merkleized = db.new_batch().set(key_b, val_b).merkleize(&db, None); + let merkleized = db + .new_batch() + .set(key_b, val_b) + .merkleize(&db, None, Location::new(0)); // Read base DB value through merkleized batch. assert_eq!(merkleized.get(&key_a, &db).await.unwrap(), Some(val_a)); @@ -1378,7 +1547,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1387,7 +1556,10 @@ pub(super) mod test { let val_a = Sha256::fill(1u8); // First batch. - let m = db.new_batch().set(key_a, val_a).merkleize(&db, None); + let m = db + .new_batch() + .set(key_a, val_a) + .merkleize(&db, None, Location::new(0)); let root1 = m.root(); db.apply_batch(m).await.unwrap(); assert_eq!(db.root(), root1); @@ -1396,7 +1568,10 @@ pub(super) mod test { // Second independent batch. let key_b = Sha256::hash(&1u64.to_be_bytes()); let val_b = Sha256::fill(2u8); - let m = db.new_batch().set(key_b, val_b).merkleize(&db, None); + let m = db + .new_batch() + .set(key_b, val_b) + .merkleize(&db, None, Location::new(0)); let root2 = m.root(); db.apply_batch(m).await.unwrap(); assert_eq!(db.root(), root2); @@ -1413,7 +1588,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1433,7 +1608,7 @@ pub(super) mod test { batch = batch.set(k, v); all_kvs.push((k, v)); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, Location::new(0)); db.apply_batch(merkleized).await.unwrap(); } @@ -1462,25 +1637,25 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; // Apply a non-empty batch first. let k = Sha256::hash(&[1u8]); - db.apply_batch( - db.new_batch() - .set(k, Sha256::fill(1u8)) - .merkleize(&db, None), - ) + db.apply_batch(db.new_batch().set(k, Sha256::fill(1u8)).merkleize( + &db, + None, + Location::new(0), + )) .await .unwrap(); let root_before = db.root(); let size_before = db.bounds().await.end; // Empty batch with no mutations. - let merkleized = db.new_batch().merkleize(&db, None); + let merkleized = db.new_batch().merkleize(&db, None, Location::new(0)); let speculative = merkleized.root(); db.apply_batch(merkleized).await.unwrap(); @@ -1501,7 +1676,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1509,22 +1684,30 @@ pub(super) mod test { // Pre-populate base DB. let key_a = Sha256::hash(&0u64.to_be_bytes()); let val_a = Sha256::fill(10u8); - db.apply_batch(db.new_batch().set(key_a, val_a).merkleize(&db, None)) - .await - .unwrap(); + db.apply_batch( + db.new_batch() + .set(key_a, val_a) + .merkleize(&db, None, Location::new(0)), + ) + .await + .unwrap(); // Parent batch sets key B. let key_b = Sha256::hash(&1u64.to_be_bytes()); let val_b = Sha256::fill(1u8); - let parent_m = db.new_batch().set(key_b, val_b).merkleize(&db, None); + let parent_m = db + .new_batch() + .set(key_b, val_b) + .merkleize(&db, None, Location::new(0)); // Child batch sets key C. let key_c = Sha256::hash(&2u64.to_be_bytes()); let val_c = Sha256::fill(2u8); - let child_m = parent_m - .new_batch::() - .set(key_c, val_c) - .merkleize(&db, None); + let child_m = + parent_m + .new_batch::() + .set(key_c, val_c) + .merkleize(&db, None, Location::new(0)); // Child's MerkleizedBatch can read all three layers: // base DB value @@ -1548,7 +1731,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1564,7 +1747,7 @@ pub(super) mod test { batch = batch.set(k, v); kvs.push((k, v)); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, Location::new(0)); db.apply_batch(merkleized).await.unwrap(); // Verify every value. @@ -1591,7 +1774,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1601,7 +1784,10 @@ pub(super) mod test { let val_child = Sha256::fill(2u8); // Parent sets key. - let parent_m = db.new_batch().set(key, val_parent).merkleize(&db, None); + let parent_m = db + .new_batch() + .set(key, val_parent) + .merkleize(&db, None, Location::new(0)); // Child overrides same key. let mut child = parent_m.new_batch::(); @@ -1610,7 +1796,7 @@ pub(super) mod test { // Child's pending mutation wins over parent diff. assert_eq!(child.get(&key, &db).await.unwrap(), Some(val_child)); - let child_m = child.merkleize(&db, None); + let child_m = child.merkleize(&db, None, Location::new(0)); // After merkleize, child's diff wins. assert_eq!(child_m.get(&key, &db).await.unwrap(), Some(val_child)); @@ -1636,7 +1822,7 @@ pub(super) mod test { -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db_small_sections(context.with_label("db")).await; @@ -1647,16 +1833,25 @@ pub(super) mod test { // First batch sets key. // Layout: 0=initial commit, 1=Set(key,v1), 2=Commit - db.apply_batch(db.new_batch().set(key, v1).merkleize(&db, None)) - .await - .unwrap(); + db.apply_batch( + db.new_batch() + .set(key, v1) + .merkleize(&db, None, Location::new(0)), + ) + .await + .unwrap(); assert_eq!(db.get(&key).await.unwrap(), Some(v1)); // Second batch sets same key to different value. // Layout continues: 3=Set(key,v2), 4=Commit - db.apply_batch(db.new_batch().set(key, v2).merkleize(&db, None)) - .await - .unwrap(); + // Floor=4 so that prune(2) succeeds (2 <= 4). + db.apply_batch( + db.new_batch() + .set(key, v2) + .merkleize(&db, None, Location::new(4)), + ) + .await + .unwrap(); // Immutable DB returns the earliest non-pruned value. assert_eq!(db.get(&key).await.unwrap(), Some(v1)); @@ -1666,9 +1861,6 @@ pub(super) mod test { db.prune(Location::new(2)).await.unwrap(); assert_eq!(db.get(&key).await.unwrap(), Some(v2)); - // Verify persists across reopen. - db.sync().await.unwrap(); - db.destroy().await.unwrap(); } @@ -1680,7 +1872,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1688,17 +1880,17 @@ pub(super) mod test { // Batch with metadata. let metadata = Sha256::fill(42u8); let k = Sha256::hash(&[1u8]); - db.apply_batch( - db.new_batch() - .set(k, Sha256::fill(1u8)) - .merkleize(&db, Some(metadata)), - ) + db.apply_batch(db.new_batch().set(k, Sha256::fill(1u8)).merkleize( + &db, + Some(metadata), + Location::new(0), + )) .await .unwrap(); assert_eq!(db.get_metadata().await.unwrap(), Some(metadata)); // Second batch clears metadata. - db.apply_batch(db.new_batch().merkleize(&db, None)) + db.apply_batch(db.new_batch().merkleize(&db, None, Location::new(0))) .await .unwrap(); assert_eq!(db.get_metadata().await.unwrap(), None); @@ -1713,7 +1905,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1724,8 +1916,14 @@ pub(super) mod test { let v2 = Sha256::fill(20u8); // Create two batches from the same DB state. - let batch_a = db.new_batch().set(key1, v1).merkleize(&db, None); - let batch_b = db.new_batch().set(key2, v2).merkleize(&db, None); + let batch_a = db + .new_batch() + .set(key1, v1) + .merkleize(&db, None, Location::new(0)); + let batch_b = db + .new_batch() + .set(key2, v2) + .merkleize(&db, None, Location::new(0)); // Apply the first -- should succeed. db.apply_batch(batch_a).await.unwrap(); @@ -1757,7 +1955,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1767,20 +1965,20 @@ pub(super) mod test { let key3 = Sha256::hash(&[3]); // Parent batch. - let parent_m = db - .new_batch() - .set(key1, Sha256::fill(1u8)) - .merkleize(&db, None); + let parent_m = + db.new_batch() + .set(key1, Sha256::fill(1u8)) + .merkleize(&db, None, Location::new(0)); // Fork two children from the same parent. let child_a = parent_m .new_batch::() .set(key2, Sha256::fill(2u8)) - .merkleize(&db, None); + .merkleize(&db, None, Location::new(0)); let child_b = parent_m .new_batch::() .set(key3, Sha256::fill(3u8)) - .merkleize(&db, None); + .merkleize(&db, None, Location::new(0)); // Apply child A. db.apply_batch(child_a).await.unwrap(); @@ -1802,7 +2000,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1815,9 +2013,18 @@ pub(super) mod test { let v3 = Sha256::fill(3u8); // Chain: DB <- A <- B <- C - let a = db.new_batch().set(key1, v1).merkleize(&db, None); - let b = a.new_batch::().set(key2, v2).merkleize(&db, None); - let c = b.new_batch::().set(key3, v3).merkleize(&db, None); + let a = db + .new_batch() + .set(key1, v1) + .merkleize(&db, None, Location::new(0)); + let b = a + .new_batch::() + .set(key2, v2) + .merkleize(&db, None, Location::new(0)); + let c = b + .new_batch::() + .set(key3, v3) + .merkleize(&db, None, Location::new(0)); let expected_root = c.root(); @@ -1840,7 +2047,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1851,13 +2058,17 @@ pub(super) mod test { let v2 = Sha256::fill(2u8); // Parent batch. - let parent_m = db.new_batch().set(key1, v1).merkleize(&db, None); + let parent_m = db + .new_batch() + .set(key1, v1) + .merkleize(&db, None, Location::new(0)); // Child batch built on parent. - let child_m = parent_m - .new_batch::() - .set(key2, v2) - .merkleize(&db, None); + let child_m = + parent_m + .new_batch::() + .set(key2, v2) + .merkleize(&db, None, Location::new(0)); // Apply parent first, then child. This is a valid sequential commit. db.apply_batch(parent_m).await.unwrap(); @@ -1877,7 +2088,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1886,24 +2097,24 @@ pub(super) mod test { let key2 = Sha256::hash(&[2]); // Build the child while the parent is still pending. - let parent = db - .new_batch() - .set(key1, Sha256::fill(1u8)) - .merkleize(&db, None); + let parent = + db.new_batch() + .set(key1, Sha256::fill(1u8)) + .merkleize(&db, None, Location::new(0)); let pending_child = parent .new_batch::() .set(key2, Sha256::fill(2u8)) - .merkleize(&db, None); + .merkleize(&db, None, Location::new(0)); // Commit the parent, then rebuild the same logical child from the // committed DB state and compare roots. db.apply_batch(parent).await.unwrap(); db.commit().await.unwrap(); - let committed_child = db - .new_batch() - .set(key2, Sha256::fill(2u8)) - .merkleize(&db, None); + let committed_child = + db.new_batch() + .set(key2, Sha256::fill(2u8)) + .merkleize(&db, None, Location::new(0)); assert_eq!(pending_child.root(), committed_child.root()); @@ -1917,7 +2128,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1926,16 +2137,16 @@ pub(super) mod test { let key2 = Sha256::hash(&[2]); // Parent batch. - let parent_m = db - .new_batch() - .set(key1, Sha256::fill(1u8)) - .merkleize(&db, None); + let parent_m = + db.new_batch() + .set(key1, Sha256::fill(1u8)) + .merkleize(&db, None, Location::new(0)); // Child batch. let child_m = parent_m .new_batch::() .set(key2, Sha256::fill(2u8)) - .merkleize(&db, None); + .merkleize(&db, None, Location::new(0)); // Apply child first (it carries all parent ops too). db.apply_batch(child_m).await.unwrap(); @@ -1959,7 +2170,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -1967,9 +2178,13 @@ pub(super) mod test { // Populate. let key1 = Sha256::hash(&[1]); let v1 = Sha256::fill(10u8); - db.apply_batch(db.new_batch().set(key1, v1).merkleize(&db, None)) - .await - .unwrap(); + db.apply_batch( + db.new_batch() + .set(key1, v1) + .merkleize(&db, None, Location::new(0)), + ) + .await + .unwrap(); // to_batch root matches committed root. let snapshot = db.to_batch(); @@ -1978,10 +2193,11 @@ pub(super) mod test { // Chain a child from the snapshot, apply it. let key2 = Sha256::hash(&[2]); let v2 = Sha256::fill(20u8); - let child = snapshot - .new_batch::() - .set(key2, v2) - .merkleize(&db, None); + let child = + snapshot + .new_batch::() + .set(key2, v2) + .merkleize(&db, None, Location::new(0)); db.apply_batch(child).await.unwrap(); assert_eq!(db.get(&key1).await.unwrap(), Some(v1)); @@ -1999,7 +2215,7 @@ pub(super) mod test { ) -> Pin> + Send>>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, C::Item: EncodeShared, { let mut db = open_db(context.with_label("db")).await; @@ -2012,9 +2228,18 @@ pub(super) mod test { let v3 = Sha256::fill(3u8); // Chain: DB <- A <- B <- C - let a = db.new_batch().set(key1, v1).merkleize(&db, None); - let b = a.new_batch::().set(key2, v2).merkleize(&db, None); - let c = b.new_batch::().set(key3, v3).merkleize(&db, None); + let a = db + .new_batch() + .set(key1, v1) + .merkleize(&db, None, Location::new(0)); + let b = a + .new_batch::() + .set(key2, v2) + .merkleize(&db, None, Location::new(0)); + let c = b + .new_batch::() + .set(key3, v3) + .merkleize(&db, None, Location::new(0)); // Drop A and B without committing. Their Weak refs in C are now dead. drop(a); @@ -2030,4 +2255,494 @@ pub(super) mod test { db.destroy().await.unwrap(); } + + /// Verify the inactivity floor is zero for a fresh empty database and is + /// correctly set after applying batches with specific floor values. + pub(crate) async fn test_immutable_inactivity_floor_tracking( + context: deterministic::Context, + open_db: impl Fn( + deterministic::Context, + ) -> Pin> + Send>>, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + C::Item: EncodeShared, + { + let mut db = open_db(context.with_label("test")).await; + + // Empty DB has floor=0. + assert_eq!(db.inactivity_floor_loc(), Location::new(0)); + + // Apply batch with floor=0, floor stays 0. + let k1 = Sha256::fill(1u8); + let v1 = Sha256::fill(2u8); + db.apply_batch( + db.new_batch() + .set(k1, v1) + .merkleize(&db, None, Location::new(0)), + ) + .await + .unwrap(); + assert_eq!(db.inactivity_floor_loc(), Location::new(0)); + + // Apply batch with floor=3, floor advances. + let k2 = Sha256::fill(3u8); + let v2 = Sha256::fill(4u8); + db.apply_batch( + db.new_batch() + .set(k2, v2) + .merkleize(&db, None, Location::new(3)), + ) + .await + .unwrap(); + assert_eq!(db.inactivity_floor_loc(), Location::new(3)); + + // Floor persists across restart. + db.commit().await.unwrap(); + db.sync().await.unwrap(); + drop(db); + let db = open_db(context.with_label("reopen")).await; + assert_eq!(db.inactivity_floor_loc(), Location::new(3)); + + db.destroy().await.unwrap(); + } + + /// Verify that applying a batch with a floor equal to the current floor succeeds, + /// and that a higher floor also succeeds. + pub(crate) async fn test_immutable_floor_monotonicity( + context: deterministic::Context, + open_db: impl Fn( + deterministic::Context, + ) -> Pin> + Send>>, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + C::Item: EncodeShared, + { + let mut db = open_db(context.with_label("test")).await; + + // DB starts with 1 op (initial commit). + // First batch: 1 set + 1 commit = total_size 3. Use floor=2 (the commit loc). + let k1 = Sha256::fill(1u8); + let v1 = Sha256::fill(2u8); + db.apply_batch( + db.new_batch() + .set(k1, v1) + .merkleize(&db, None, Location::new(2)), + ) + .await + .unwrap(); + assert_eq!(db.inactivity_floor_loc(), Location::new(2)); + + // Same floor is OK. Second batch: 1 set + 1 commit = total_size 5. floor=2 < 5. + let k2 = Sha256::fill(3u8); + let v2 = Sha256::fill(4u8); + db.apply_batch( + db.new_batch() + .set(k2, v2) + .merkleize(&db, None, Location::new(2)), + ) + .await + .unwrap(); + assert_eq!(db.inactivity_floor_loc(), Location::new(2)); + + // Higher floor also succeeds. Third batch: 1 set + 1 commit = total_size 7. floor=5 < 7. + let k3 = Sha256::fill(5u8); + let v3 = Sha256::fill(6u8); + db.apply_batch( + db.new_batch() + .set(k3, v3) + .merkleize(&db, None, Location::new(5)), + ) + .await + .unwrap(); + assert_eq!(db.inactivity_floor_loc(), Location::new(5)); + + db.destroy().await.unwrap(); + } + + /// Verify that the inactivity floor is correctly restored after a rewind. + pub(crate) async fn test_immutable_rewind_restores_floor( + context: deterministic::Context, + open_db: impl Fn( + deterministic::Context, + ) -> Pin> + Send>>, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + C::Item: EncodeShared, + { + let mut db = open_db(context.with_label("test")).await; + + // Apply first batch with floor=2. + let k1 = Sha256::fill(1u8); + let v1 = Sha256::fill(2u8); + db.apply_batch( + db.new_batch() + .set(k1, v1) + .merkleize(&db, None, Location::new(2)), + ) + .await + .unwrap(); + db.commit().await.unwrap(); + let first_size = db.bounds().await.end; + assert_eq!(db.inactivity_floor_loc(), Location::new(2)); + + // Apply second batch with floor=4 (the new commit's location). + let k2 = Sha256::fill(3u8); + let v2 = Sha256::fill(4u8); + db.apply_batch( + db.new_batch() + .set(k2, v2) + .merkleize(&db, None, Location::new(4)), + ) + .await + .unwrap(); + db.commit().await.unwrap(); + assert_eq!(db.inactivity_floor_loc(), Location::new(4)); + + // Rewind to the first batch. + db.rewind(first_size).await.unwrap(); + assert_eq!(db.inactivity_floor_loc(), Location::new(2)); + + db.destroy().await.unwrap(); + } + + /// Verify that applying a batch with a floor lower than the current floor + /// returns an error. + pub(crate) async fn test_immutable_floor_monotonicity_violation( + context: deterministic::Context, + open_db: impl Fn( + deterministic::Context, + ) -> Pin> + Send>>, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + C::Item: EncodeShared, + { + let mut db = open_db(context.with_label("test")).await; + + // DB starts with 1 op. First batch: 1 set + 1 commit = total_size 3. floor=2. + let k1 = Sha256::fill(1u8); + let v1 = Sha256::fill(2u8); + db.apply_batch( + db.new_batch() + .set(k1, v1) + .merkleize(&db, None, Location::new(2)), + ) + .await + .unwrap(); + + // Apply batch with floor=1 (regression). Should return an error. + let k2 = Sha256::fill(3u8); + let v2 = Sha256::fill(4u8); + let result = db + .apply_batch( + db.new_batch() + .set(k2, v2) + .merkleize(&db, None, Location::new(1)), + ) + .await; + assert!(matches!(result, Err(Error::FloorRegressed(new, current)) + if new == Location::new(1) && current == Location::new(2))); + + db.destroy().await.unwrap(); + } + + /// Verify that applying a batch with a floor beyond the total operation + /// count returns an error. + pub(crate) async fn test_immutable_floor_beyond_size( + context: deterministic::Context, + open_db: impl Fn( + deterministic::Context, + ) -> Pin> + Send>>, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + C::Item: EncodeShared, + { + let mut db = open_db(context.with_label("test")).await; + + // DB has 1 op (initial commit). A batch with 1 set + 1 commit = total_size 3. + // Setting floor=100 exceeds total_size. + let k1 = Sha256::fill(1u8); + let v1 = Sha256::fill(2u8); + let result = db + .apply_batch( + db.new_batch() + .set(k1, v1) + .merkleize(&db, None, Location::new(100)), + ) + .await; + assert!(matches!(result, Err(Error::FloorBeyondSize(floor, total)) + if floor == Location::new(100) && total == Location::new(3))); + + // Boundary: floor == total_size must also be rejected. The commit op is + // at total_size - 1, so a floor equal to total_size would allow a later + // prune to remove the commit and leave the db unrecoverable. + let k2 = Sha256::fill(3u8); + let v2 = Sha256::fill(4u8); + let result = db + .apply_batch( + db.new_batch() + .set(k2, v2) + .merkleize(&db, None, Location::new(3)), + ) + .await; + assert!(matches!(result, Err(Error::FloorBeyondSize(floor, total)) + if floor == Location::new(3) && total == Location::new(3))); + + // Floor == total_size - 1 (the commit location) is the maximum valid. + db.apply_batch( + db.new_batch() + .set(k2, v2) + .merkleize(&db, None, Location::new(2)), + ) + .await + .unwrap(); + + db.destroy().await.unwrap(); + } + + /// Regression test for rewind-after-reopen with floor change. + /// + /// After reopening a database (which rebuilds the snapshot from the latest + /// floor), rewinding to an earlier commit with a lower floor must restore + /// all keys that were live at the rewind target -- not just the ones that + /// happened to be in the rebuilt snapshot. + pub(crate) async fn test_immutable_rewind_after_reopen_with_floor_change( + context: deterministic::Context, + open_db: impl Fn( + deterministic::Context, + ) -> Pin> + Send>>, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + C::Item: EncodeShared, + { + let mut db = open_db(context.with_label("first")).await; + + let k1 = Sha256::fill(1u8); + let k2 = Sha256::fill(2u8); + let k3 = Sha256::fill(3u8); + let v1 = Sha256::fill(11u8); + let v2 = Sha256::fill(12u8); + let v3 = Sha256::fill(13u8); + + // Commit A: 3 keys with floor=0. + commit_sets(&mut db, [(k1, v1), (k2, v2), (k3, v3)], None).await; + let first_size = db.bounds().await.end; + let first_root = db.root(); + + // Commit B: 3 more keys with floor=first_size (declares batch A inactive). + let k4 = Sha256::fill(4u8); + let k5 = Sha256::fill(5u8); + let k6 = Sha256::fill(6u8); + let v4 = Sha256::fill(14u8); + let v5 = Sha256::fill(15u8); + let v6 = Sha256::fill(16u8); + commit_sets_with_floor(&mut db, [(k4, v4), (k5, v5), (k6, v6)], None, first_size).await; + db.sync().await.unwrap(); + + // Reopen: snapshot rebuilt from floor=first_size, batch A keys excluded. + drop(db); + let mut db = open_db(context.with_label("second")).await; + + // Verify batch A keys are NOT in the reopened snapshot (expected). + assert!(db.get(&k1).await.unwrap().is_none()); + + // Rewind to commit A. + db.rewind(first_size).await.unwrap(); + + // All batch A keys must be accessible after rewind. + assert_eq!(db.get(&k1).await.unwrap(), Some(v1)); + assert_eq!(db.get(&k2).await.unwrap(), Some(v2)); + assert_eq!(db.get(&k3).await.unwrap(), Some(v3)); + assert_eq!(db.root(), first_root); + assert_eq!(db.inactivity_floor_loc(), Location::new(0)); + + // Batch B keys must NOT be accessible. + assert!(db.get(&k4).await.unwrap().is_none()); + + db.destroy().await.unwrap(); + } + + /// Regression test: rewind-after-reopen where the rewind target is NOT the + /// immediate predecessor. This ensures the snapshot gap fill only covers + /// [rewind_floor, old_floor) and does not re-insert keys already present. + pub(crate) async fn test_immutable_rewind_after_reopen_partial_floor_gap( + context: deterministic::Context, + open_db: impl Fn( + deterministic::Context, + ) -> Pin> + Send>>, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + C::Item: EncodeShared, + { + let mut db = open_db(context.with_label("first")).await; + + let k1 = Sha256::fill(1u8); + let v1 = Sha256::fill(11u8); + + // Commit A: 1 key, floor=0. + commit_sets(&mut db, [(k1, v1)], None).await; + let first_size = db.bounds().await.end; + let first_root = db.root(); + + // Commit B: 1 key, floor=first_size. + let k2 = Sha256::fill(2u8); + let v2 = Sha256::fill(12u8); + commit_sets_with_floor(&mut db, [(k2, v2)], None, first_size).await; + let second_size = db.bounds().await.end; + + // Commit C: 1 key, floor=second_size. This raises the floor + // above commit B's keys, so reopen excludes both A and B keys. + let k3 = Sha256::fill(3u8); + let v3 = Sha256::fill(13u8); + commit_sets_with_floor(&mut db, [(k3, v3)], None, second_size).await; + db.sync().await.unwrap(); + + // Reopen: snapshot rebuilt from floor=second_size. Only k3 is in snapshot. + drop(db); + let mut db = open_db(context.with_label("second")).await; + assert!(db.get(&k1).await.unwrap().is_none()); + assert!(db.get(&k2).await.unwrap().is_none()); + assert_eq!(db.get(&k3).await.unwrap(), Some(v3)); + + // Rewind to commit B (not A). The gap fill should add keys from + // [first_size, second_size) -- which includes k2 but not k1. + // k3 is in the suffix and gets removed. k2 from the gap gets inserted. + db.rewind(second_size).await.unwrap(); + assert!(db.get(&k1).await.unwrap().is_none()); // below B's floor + assert_eq!(db.get(&k2).await.unwrap(), Some(v2)); + assert!(db.get(&k3).await.unwrap().is_none()); // in suffix, removed + + // Now rewind further to commit A. + db.rewind(first_size).await.unwrap(); + assert_eq!(db.get(&k1).await.unwrap(), Some(v1)); + assert!(db.get(&k2).await.unwrap().is_none()); // above first_size, truncated + assert_eq!(db.root(), first_root); + assert_eq!(db.inactivity_floor_loc(), Location::new(0)); + + db.destroy().await.unwrap(); + } + + /// After committing with `floor = commit_loc` and pruning down to it, the live set is + /// exactly one operation — the commit itself. This is the minimum non-empty live set + /// achievable under the per-commit bound. The DB must remain fully usable: + /// + /// - `prune(commit_loc + 1)` is rejected (the floor is a hard ceiling). + /// - `prune` does not affect the root (documented invariant). + /// - Reopen reconstructs `inactivity_floor_loc` from the sole surviving commit op, and the + /// in-memory snapshot is empty (all Sets were below the floor). + /// - A follow-on batch applies cleanly on top from the floor-at-max state. + pub(crate) async fn test_immutable_single_commit_live_set( + context: deterministic::Context, + open_db: impl Fn( + deterministic::Context, + ) -> Pin> + Send>>, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + C::Item: EncodeShared, + { + let mut db = open_db(context.with_label("test")).await; + + // Initial commit is at loc 0. 3 sets + 1 commit → commit lands at loc 4. + // Declare floor = 4 (= commit_loc), the tight maximum. + let metadata = Sha256::fill(42u8); + let commit_loc = Location::::new(4); + let k1 = Sha256::fill(1u8); + let k2 = Sha256::fill(2u8); + let k3 = Sha256::fill(3u8); + let v1 = Sha256::fill(11u8); + let v2 = Sha256::fill(12u8); + let v3 = Sha256::fill(13u8); + db.apply_batch( + db.new_batch() + .set(k1, v1) + .set(k2, v2) + .set(k3, v3) + .merkleize(&db, Some(metadata), commit_loc), + ) + .await + .unwrap(); + db.commit().await.unwrap(); + assert_eq!(db.last_commit_loc, commit_loc); + assert_eq!(db.inactivity_floor_loc(), commit_loc); + let root_after_commit = db.root(); + + // All three keys are in the in-memory snapshot pre-prune. + assert_eq!(db.get(&k1).await.unwrap(), Some(v1)); + assert_eq!(db.get(&k2).await.unwrap(), Some(v2)); + assert_eq!(db.get(&k3).await.unwrap(), Some(v3)); + + // Prune at the floor — the maximum prune allowed. + // Pruning is blob-aligned, so `bounds.start` may not physically advance all the way + // to `commit_loc`; what matters semantically is that the floor authorizes pruning + // of everything below the commit and that any further prune is rejected. + db.prune(commit_loc).await.unwrap(); + let bounds = db.bounds().await; + assert!( + bounds.start <= commit_loc, + "prune must not advance bounds.start past the floor" + ); + assert_eq!(bounds.end, Location::new(*commit_loc + 1)); + + // Pruning one past the floor must be rejected — the floor is the hard ceiling. + let err = db.prune(Location::new(*commit_loc + 1)).await.unwrap_err(); + assert!(matches!(err, Error::PruneBeyondMinRequired(p, f) + if *p == *commit_loc + 1 && *f == *commit_loc)); + + // State preserved across the prune; root unchanged; commit metadata still readable. + assert_eq!(db.last_commit_loc, commit_loc); + assert_eq!(db.inactivity_floor_loc(), commit_loc); + assert_eq!(db.root(), root_after_commit); + assert_eq!(db.get_metadata().await.unwrap(), Some(metadata)); + + // Persist and reopen. `init_from_journal` rebuilds the snapshot by replaying from + // the floor (= commit_loc). The only op at/above the floor is the commit, which + // contributes no keys — so the rebuilt snapshot is empty. + db.sync().await.unwrap(); + drop(db); + let mut db = open_db(context.with_label("reopened")).await; + assert_eq!(db.last_commit_loc, commit_loc); + assert_eq!(db.inactivity_floor_loc(), commit_loc); + assert_eq!(db.root(), root_after_commit); + // The commit op at `commit_loc` is the anchor that survived pruning — its metadata + // must come back through `get_metadata` after the snapshot rebuild. + assert_eq!(db.get_metadata().await.unwrap(), Some(metadata)); + + // Keys set below the floor are excluded from the rebuilt snapshot. + assert!(db.get(&k1).await.unwrap().is_none()); + assert!(db.get(&k2).await.unwrap().is_none()); + assert!(db.get(&k3).await.unwrap().is_none()); + + // A follow-on batch applies on top. Monotonicity requires the new floor to be at + // least `commit_loc` (= 4); advancing to the new tight max (= 6) exercises the + // floor-at-max → new-batch transition. + let k4 = Sha256::fill(4u8); + let v4 = Sha256::fill(14u8); + let next_commit_loc = Location::::new(6); + db.apply_batch( + db.new_batch() + .set(k4, v4) + .merkleize(&db, None, next_commit_loc), + ) + .await + .unwrap(); + db.commit().await.unwrap(); + assert_eq!(db.last_commit_loc, next_commit_loc); + assert_eq!(db.inactivity_floor_loc(), next_commit_loc); + + // New key readable; keys from the pre-prune batch remain excluded. + assert_eq!(db.get(&k4).await.unwrap(), Some(v4)); + assert!(db.get(&k1).await.unwrap().is_none()); + // Follow-on commit replaced the anchor: its metadata was `None`, so `get_metadata` + // should no longer return the original metadata. + assert_eq!(db.get_metadata().await.unwrap(), None); + + db.destroy().await.unwrap(); + } } diff --git a/storage/src/qmdb/immutable/operation/fixed.rs b/storage/src/qmdb/immutable/operation/fixed.rs index c32f7665527..56b1df827ba 100644 --- a/storage/src/qmdb/immutable/operation/fixed.rs +++ b/storage/src/qmdb/immutable/operation/fixed.rs @@ -1,5 +1,8 @@ use super::{Operation, COMMIT_CONTEXT, SET_CONTEXT}; -use crate::qmdb::any::{value::FixedEncoding, FixedValue}; +use crate::{ + merkle::{Family, Location}, + qmdb::any::{value::FixedEncoding, FixedValue}, +}; use commonware_codec::{ util::{at_least, ensure_zeros}, Error as CodecError, FixedSize, Read, ReadExt as _, Write, @@ -21,18 +24,18 @@ const fn set_op_size() -> usize { } const fn commit_op_size() -> usize { - 1 + 1 + V::SIZE + 1 + 1 + V::SIZE + u64::SIZE } const fn total_op_size() -> usize { const_max(set_op_size::(), commit_op_size::()) } -impl FixedSize for Operation> { +impl FixedSize for Operation> { const SIZE: usize = total_op_size::(); } -impl Write for Operation> { +impl Write for Operation> { fn write(&self, buf: &mut impl BufMut) { let total = total_op_size::(); match &self { @@ -42,7 +45,7 @@ impl Write for Operation> { v.write(buf); buf.put_bytes(0, total - set_op_size::()); } - Self::Commit(v) => { + Self::Commit(v, floor_loc) => { COMMIT_CONTEXT.write(buf); if let Some(v) = v { true.write(buf); @@ -50,13 +53,14 @@ impl Write for Operation> { } else { buf.put_bytes(0, 1 + V::SIZE); } + buf.put_slice(&floor_loc.to_be_bytes()); buf.put_bytes(0, total - commit_op_size::()); } } } } -impl Read for Operation> { +impl Read for Operation> { type Cfg = (); fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result { @@ -78,8 +82,15 @@ impl Read for Operation> { ensure_zeros(buf, V::SIZE)?; None }; + let floor_loc = Location::new(u64::read(buf)?); + if !floor_loc.is_valid() { + return Err(CodecError::Invalid( + "storage::qmdb::immutable::operation::fixed::Operation", + "commit floor location overflow", + )); + } ensure_zeros(buf, total - commit_op_size::())?; - Ok(Self::Commit(value)) + Ok(Self::Commit(value, floor_loc)) } e => Err(CodecError::InvalidEnum(e)), } @@ -89,24 +100,25 @@ impl Read for Operation> { #[cfg(test)] mod tests { use super::*; + use crate::merkle::mmr; use commonware_codec::{DecodeExt, Encode}; use commonware_utils::sequence::U64; - type FixedOp = Operation>; + type FixedOp = Operation>; #[test] fn test_fixed_size() { // Set: 1 + 8 + 8 = 17 - // Commit: 1 + 1 + 8 = 10 - // Max = 17 - assert_eq!(FixedOp::SIZE, 17); + // Commit: 1 + 1 + 8 + 8 = 18 + // Max = 18 + assert_eq!(FixedOp::SIZE, 18); } #[test] fn test_uniform_encoding_size() { let set_op = FixedOp::Set(U64::new(1), U64::new(2)); - let commit_some = FixedOp::Commit(Some(U64::new(3))); - let commit_none = FixedOp::Commit(None); + let commit_some = FixedOp::Commit(Some(U64::new(3)), Location::new(10)); + let commit_none = FixedOp::Commit(None, Location::new(0)); assert_eq!(set_op.encode().len(), FixedOp::SIZE); assert_eq!(commit_some.encode().len(), FixedOp::SIZE); @@ -117,8 +129,8 @@ mod tests { fn test_roundtrip() { let operations: Vec = vec![ FixedOp::Set(U64::new(1234), U64::new(56789)), - FixedOp::Commit(Some(U64::new(42))), - FixedOp::Commit(None), + FixedOp::Commit(Some(U64::new(42)), Location::new(100)), + FixedOp::Commit(None, Location::new(0)), ]; for op in operations { diff --git a/storage/src/qmdb/immutable/operation/mod.rs b/storage/src/qmdb/immutable/operation/mod.rs index 5b6cdbc3952..19203645ad9 100644 --- a/storage/src/qmdb/immutable/operation/mod.rs +++ b/storage/src/qmdb/immutable/operation/mod.rs @@ -29,30 +29,31 @@ pub(crate) const COMMIT_CONTEXT: u8 = 1; /// Unlike mutable database operations, immutable operations only support /// setting new values and committing - no updates or deletions. #[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] -pub enum Operation { +pub enum Operation { /// Set a key to a value. The key must not already exist. Set(K, V::Value), - /// Commit with optional metadata. - Commit(Option), + /// Commit with optional metadata and the inactivity floor location. + /// Operations before the floor are declared inactive by the application. + Commit(Option, Location), } -impl Operation { +impl Operation { /// If this is an operation involving a key, returns the key. Otherwise, returns None. pub const fn key(&self) -> Option<&K> { match self { Self::Set(key, _) => Some(key), - Self::Commit(_) => None, + Self::Commit(_, _) => None, } } /// Returns true if this is a commit operation. pub const fn is_commit(&self) -> bool { - matches!(self, Self::Commit(_)) + matches!(self, Self::Commit(_, _)) } } -impl OperationTrait for Operation { +impl OperationTrait for Operation { type Key = K; fn key(&self) -> Option<&Self::Key> { @@ -69,12 +70,14 @@ impl OperationTrait for Operation } fn has_floor(&self) -> Option> { - // Immutable databases don't have inactivity floors - None + match self { + Self::Commit(_, loc) => Some(*loc), + _ => None, + } } } -impl Display for Operation +impl Display for Operation where V::Value: Encode, { @@ -83,11 +86,11 @@ where Self::Set(key, value) => { write!(f, "[key:{} value:{}]", hex(key), hex(&value.encode())) } - Self::Commit(value) => { + Self::Commit(value, floor) => { if let Some(value) = value { - write!(f, "[commit {}]", hex(&value.encode())) + write!(f, "[commit {} floor:{}]", hex(&value.encode()), **floor) } else { - write!(f, "[commit]") + write!(f, "[commit floor:{}]", **floor) } } } @@ -95,7 +98,7 @@ where } #[cfg(feature = "arbitrary")] -impl arbitrary::Arbitrary<'_> for Operation +impl arbitrary::Arbitrary<'_> for Operation where K: for<'a> arbitrary::Arbitrary<'a>, V::Value: for<'a> arbitrary::Arbitrary<'a>, @@ -108,7 +111,12 @@ where let value = V::Value::arbitrary(u)?; Ok(Self::Set(key, value)) } - 1 => Ok(Self::Commit(Option::::arbitrary(u)?)), + 1 => { + let metadata = Option::::arbitrary(u)?; + let max_loc = F::MAX_LEAVES; + let floor = u.int_in_range(0..=*max_loc)?; + Ok(Self::Commit(metadata, Location::new(floor))) + } _ => unreachable!(), } } @@ -117,11 +125,11 @@ where #[cfg(test)] mod tests { use super::*; - use crate::qmdb::any::value::VariableEncoding; + use crate::{merkle::mmr, qmdb::any::value::VariableEncoding}; use commonware_codec::Encode; use commonware_utils::sequence::U64; - type VarOp = Operation>; + type VarOp = Operation>; #[test] fn test_operation_key() { @@ -131,10 +139,10 @@ mod tests { let set_op = VarOp::Set(key.clone(), value.clone()); assert_eq!(&key, set_op.key().unwrap()); - let commit_op = VarOp::Commit(Some(value)); + let commit_op = VarOp::Commit(Some(value), Location::new(0)); assert_eq!(None, commit_op.key()); - let commit_op_none = VarOp::Commit(None); + let commit_op_none = VarOp::Commit(None, Location::new(0)); assert_eq!(None, commit_op_none.key()); } @@ -146,13 +154,37 @@ mod tests { let set_op = VarOp::Set(key, value.clone()); assert!(!set_op.is_commit()); - let commit_op = VarOp::Commit(Some(value)); + let commit_op = VarOp::Commit(Some(value), Location::new(0)); assert!(commit_op.is_commit()); - let commit_op_none = VarOp::Commit(None); + let commit_op_none = VarOp::Commit(None, Location::new(0)); assert!(commit_op_none.is_commit()); } + #[test] + fn test_operation_has_floor() { + let key = U64::new(1234); + let value = U64::new(56789); + + let set_op = VarOp::Set(key, value.clone()); + assert_eq!( + >::has_floor(&set_op), + None + ); + + let commit_op = VarOp::Commit(Some(value), Location::new(42)); + assert_eq!( + >::has_floor(&commit_op), + Some(Location::new(42)) + ); + + let commit_op_none = VarOp::Commit(None, Location::new(0)); + assert_eq!( + >::has_floor(&commit_op_none), + Some(Location::new(0)) + ); + } + #[test] fn test_operation_display() { let key = U64::new(1234); @@ -164,13 +196,13 @@ mod tests { format!("[key:{} value:{}]", hex(&key), hex(&value.encode())) ); - let commit_op = VarOp::Commit(Some(value.clone())); + let commit_op = VarOp::Commit(Some(value.clone()), Location::new(10)); assert_eq!( format!("{commit_op}"), - format!("[commit {}]", hex(&value.encode())) + format!("[commit {} floor:10]", hex(&value.encode())) ); - let commit_op = VarOp::Commit(None); - assert_eq!(format!("{commit_op}"), "[commit]"); + let commit_op = VarOp::Commit(None, Location::new(0)); + assert_eq!(format!("{commit_op}"), "[commit floor:0]"); } } diff --git a/storage/src/qmdb/immutable/operation/variable.rs b/storage/src/qmdb/immutable/operation/variable.rs index 53cd9568092..1ff6c4ad6d5 100644 --- a/storage/src/qmdb/immutable/operation/variable.rs +++ b/storage/src/qmdb/immutable/operation/variable.rs @@ -1,21 +1,24 @@ use super::{Operation, COMMIT_CONTEXT, SET_CONTEXT}; -use crate::qmdb::{ - any::{value::VariableEncoding, VariableValue}, - operation::Key, +use crate::{ + merkle::{Family, Location}, + qmdb::{ + any::{value::VariableEncoding, VariableValue}, + operation::Key, + }, }; -use commonware_codec::{EncodeSize, Error as CodecError, Read, ReadExt as _, Write}; +use commonware_codec::{varint::UInt, EncodeSize, Error as CodecError, Read, ReadExt as _, Write}; use commonware_runtime::{Buf, BufMut}; -impl EncodeSize for Operation> { +impl EncodeSize for Operation> { fn encode_size(&self) -> usize { 1 + match self { Self::Set(k, v) => k.encode_size() + v.encode_size(), - Self::Commit(v) => v.encode_size(), + Self::Commit(v, floor) => v.encode_size() + UInt(**floor).encode_size(), } } } -impl Write for Operation> { +impl Write for Operation> { fn write(&self, buf: &mut impl BufMut) { match &self { Self::Set(k, v) => { @@ -23,15 +26,16 @@ impl Write for Operation> { k.write(buf); v.write(buf); } - Self::Commit(v) => { + Self::Commit(v, floor_loc) => { COMMIT_CONTEXT.write(buf); v.write(buf); + UInt(**floor_loc).write(buf); } } } } -impl Read for Operation> { +impl Read for Operation> { type Cfg = (::Cfg, ::Cfg); fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result { @@ -41,7 +45,11 @@ impl Read for Operation> { let value = V::read_cfg(buf, &cfg.1)?; Ok(Self::Set(key, value)) } - COMMIT_CONTEXT => Ok(Self::Commit(Option::::read_cfg(buf, &cfg.1)?)), + COMMIT_CONTEXT => { + let metadata = Option::::read_cfg(buf, &cfg.1)?; + let floor_loc = Location::read(buf)?; + Ok(Self::Commit(metadata, floor_loc)) + } e => Err(CodecError::InvalidEnum(e)), } } @@ -50,10 +58,11 @@ impl Read for Operation> { #[cfg(test)] mod tests { use super::*; + use crate::merkle::mmr; use commonware_codec::{DecodeExt, Encode, EncodeSize, FixedSize as _}; use commonware_utils::sequence::U64; - type VarOp = Operation>; + type VarOp = Operation>; #[test] fn test_operation_encode_decode() { @@ -67,13 +76,13 @@ mod tests { assert_eq!(set_op, decoded); // Test Commit operation with value - let commit_op = VarOp::Commit(Some(value)); + let commit_op = VarOp::Commit(Some(value), Location::new(100)); let encoded = commit_op.encode(); let decoded = VarOp::decode(encoded).unwrap(); assert_eq!(commit_op, decoded); // Test Commit operation without value - let commit_op = VarOp::Commit(None); + let commit_op = VarOp::Commit(None, Location::new(0)); let encoded = commit_op.encode(); let decoded = VarOp::decode(encoded).unwrap(); assert_eq!(commit_op, decoded); @@ -88,14 +97,18 @@ mod tests { assert_eq!(set_op.encode_size(), 1 + U64::SIZE + value.encode_size()); assert_eq!(set_op.encode().len(), set_op.encode_size()); - let commit_op = VarOp::Commit(Some(value.clone())); - assert_eq!(commit_op.encode_size(), 1 + Some(value).encode_size()); + let floor = Location::new(100); + let commit_op = VarOp::Commit(Some(value.clone()), floor); + assert_eq!( + commit_op.encode_size(), + 1 + Some(value).encode_size() + UInt(*floor).encode_size() + ); assert_eq!(commit_op.encode().len(), commit_op.encode_size()); - let commit_op = VarOp::Commit(None); + let commit_op = VarOp::Commit(None, Location::new(0)); assert_eq!( commit_op.encode_size(), - 1 + Option::::None.encode_size() + 1 + Option::::None.encode_size() + UInt(0u64).encode_size() ); assert_eq!(commit_op.encode().len(), commit_op.encode_size()); } @@ -128,8 +141,8 @@ mod tests { let operations: Vec = vec![ VarOp::Set(key, value.clone()), - VarOp::Commit(Some(value)), - VarOp::Commit(None), + VarOp::Commit(Some(value), Location::new(50)), + VarOp::Commit(None, Location::new(0)), ]; for op in operations { @@ -144,29 +157,28 @@ mod tests { fn test_operation_variable_key_roundtrip() { use commonware_codec::Decode as _; + type VecOp = Operation, VariableEncoding>; + let key = vec![1u8, 2, 3, 4, 5]; let cfg = ((commonware_codec::RangeCfg::from(0..=100usize), ()), ()); // Test Set with variable-length key - let set_op = Operation::Set(key, U64::new(42)); + let set_op = VecOp::Set(key, U64::new(42)); let encoded = set_op.encode(); assert_eq!(encoded.len(), set_op.encode_size()); - let decoded = - Operation::, VariableEncoding>::decode_cfg(encoded, &cfg).unwrap(); + let decoded = VecOp::decode_cfg(encoded, &cfg).unwrap(); assert_eq!(set_op, decoded); // Test Commit (key-independent, should work the same) - let commit_op = Operation::, VariableEncoding>::Commit(Some(U64::new(42))); + let commit_op = VecOp::Commit(Some(U64::new(42)), Location::new(10)); let encoded = commit_op.encode(); - let decoded = - Operation::, VariableEncoding>::decode_cfg(encoded, &cfg).unwrap(); + let decoded = VecOp::decode_cfg(encoded, &cfg).unwrap(); assert_eq!(commit_op, decoded); // Test empty key - let empty_key_op = Operation::Set(vec![], U64::new(99)); + let empty_key_op = VecOp::Set(vec![], U64::new(99)); let encoded = empty_key_op.encode(); - let decoded = - Operation::, VariableEncoding>::decode_cfg(encoded, &cfg).unwrap(); + let decoded = VecOp::decode_cfg(encoded, &cfg).unwrap(); assert_eq!(empty_key_op, decoded); } @@ -175,7 +187,7 @@ mod tests { use super::*; use commonware_codec::conformance::CodecConformance; - type VarKeyOp = Operation, VariableEncoding>; + type VarKeyOp = Operation, VariableEncoding>; commonware_conformance::conformance_tests! { CodecConformance, diff --git a/storage/src/qmdb/immutable/sync/mod.rs b/storage/src/qmdb/immutable/sync/mod.rs index 2076b5b2223..dbd477e3630 100644 --- a/storage/src/qmdb/immutable/sync/mod.rs +++ b/storage/src/qmdb/immutable/sync/mod.rs @@ -13,7 +13,7 @@ use crate::{ any::ValueEncoding, build_snapshot_from_log, immutable::{self, Operation}, - operation::Key, + operation::{Key, Operation as _}, sync::{self}, Error, }, @@ -32,16 +32,16 @@ where E: Context, K: Key, V: ValueEncoding, - C: Mutable> + C: Mutable> + Persistable - + sync::Journal>, + + sync::Journal>, C::Item: EncodeShared, C::Config: Clone + Send, H: Hasher, T: Translator, { type Family = F; - type Op = Operation; + type Op = Operation; type Journal = C; type Hasher = H; type Config = immutable::Config; @@ -97,23 +97,35 @@ where let mut snapshot: Index> = Index::new(context.with_label("snapshot"), db_config.translator.clone()); - let last_commit_loc = { - // Get the start of the log. + let (last_commit_loc, inactivity_floor_loc) = { let reader = journal.journal.reader().await; let bounds = reader.bounds(); - let start_loc = Location::::new(bounds.start); + let last_commit_loc = + Location::::new(bounds.end.checked_sub(1).expect("commit should exist")); - // Build snapshot from the log - build_snapshot_from_log::(start_loc, &reader, &mut snapshot, |_, _| {}) - .await?; + // Read the floor from the last commit operation. + let last_op = reader.read(*last_commit_loc).await?; + let inactivity_floor_loc = last_op + .has_floor() + .expect("last operation should be a commit with floor"); - Location::new(bounds.end.checked_sub(1).expect("commit should exist")) + // Replay the log from the inactivity floor to build the snapshot. + build_snapshot_from_log::( + inactivity_floor_loc, + &reader, + &mut snapshot, + |_, _| {}, + ) + .await?; + + (last_commit_loc, inactivity_floor_loc) }; let db = Self { journal, snapshot, last_commit_loc, + inactivity_floor_loc, }; db.sync().await?; diff --git a/storage/src/qmdb/immutable/sync/tests.rs b/storage/src/qmdb/immutable/sync/tests.rs index fb4f2fa4b06..36f32bbdd84 100644 --- a/storage/src/qmdb/immutable/sync/tests.rs +++ b/storage/src/qmdb/immutable/sync/tests.rs @@ -76,6 +76,14 @@ pub(crate) trait SyncTestHarness: Sized + 'static { ops: Vec>, metadata: Option, ) -> impl Future + Send; + fn apply_ops_with_floor( + db: Self::Db, + ops: Vec>, + metadata: Option, + floor: Location, + ) -> impl Future + Send; + fn commit(db: &mut Self::Db) -> impl Future + Send; + fn inactivity_floor_loc(db: &Self::Db) -> Location; fn prune(db: &mut Self::Db, loc: Location) -> impl Future + Send; fn bounds( @@ -709,6 +717,84 @@ where }); } +pub(crate) fn test_sync_nonzero_floor() +where + OpOf: Encode + Clone + Send + Sync, + Arc>: Resolver, Digest = sha256::Digest>, +{ + let executor = deterministic::Runner::default(); + executor.start(|mut context| async move { + let target_db = H::init_db(context.with_label("target")).await; + + // First batch with floor=0. + let early_ops = H::create_ops(50); + let mut target_db = H::apply_ops(target_db, early_ops.clone(), None).await; + H::commit(&mut target_db).await; + let first_commit_end = H::bounds(&target_db).await.end; + + // Second batch with floor = first_commit_end, declaring the first batch inactive. + let late_ops = H::create_ops_seeded(50, 1); + let mut target_db = H::apply_ops_with_floor( + target_db, + late_ops.clone(), + Some(H::sample_metadata()), + first_commit_end, + ) + .await; + H::commit(&mut target_db).await; + + assert_eq!(H::inactivity_floor_loc(&target_db), first_commit_end); + + let bounds = H::bounds(&target_db).await; + let target_root = H::db_root(&target_db); + + let target_db = Arc::new(target_db); + let db_config = H::config(&format!("floor_sync_{}", context.next_u64()), &context); + let config = Config { + db_config, + fetch_batch_size: NZU64!(100), + target: Target { + root: target_root, + range: non_empty_range!(bounds.start, bounds.end), + }, + context: context.with_label("client"), + resolver: target_db.clone(), + apply_batch_size: 1024, + max_outstanding_requests: 1, + update_rx: None, + finish_rx: None, + reached_target_tx: None, + max_retained_roots: 8, + }; + let synced_db: DbOf = sync::sync(config).await.unwrap(); + + assert_eq!(H::db_root(&synced_db), target_root); + assert_eq!(H::inactivity_floor_loc(&synced_db), first_commit_end); + + // Keys from the second batch (after the floor) should be findable. + for op in &late_ops { + if let Some((key, value)) = H::op_kv(op) { + assert_eq!(H::lookup(&synced_db, key).await, Some(value.clone())); + } + } + + // Keys from the first batch (before the floor) should NOT be in the snapshot. + for op in &early_ops { + if let Some((key, _)) = H::op_kv(op) { + assert_eq!( + H::lookup(&synced_db, key).await, + None, + "key from before floor should not be in synced snapshot" + ); + } + } + + H::destroy(synced_db).await; + H::destroy(Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"))) + .await; + }); +} + pub(crate) fn test_target_update_on_done_client() where OpOf: Encode + Clone + Send + Sync, @@ -808,10 +894,10 @@ pub(crate) mod harnesses { } } - fn variable_create_ops_seeded( + fn variable_create_ops_seeded( n: usize, seed: u64, - ) -> Vec> { + ) -> Vec> { let mut rng = test_rng_seeded(seed); let mut ops = Vec::new(); for _ in 0..n { @@ -823,9 +909,22 @@ pub(crate) mod harnesses { } async fn variable_apply_ops( + db: VariableDb, + ops: Vec>, + metadata: Option, + ) -> VariableDb + where + VariableDb: qmdb::sync::Database, + { + let floor = db.inactivity_floor_loc(); + variable_apply_ops_with_floor::(db, ops, metadata, floor).await + } + + async fn variable_apply_ops_with_floor( mut db: VariableDb, - ops: Vec>, + ops: Vec>, metadata: Option, + floor: Location, ) -> VariableDb where VariableDb: qmdb::sync::Database, @@ -836,12 +935,12 @@ pub(crate) mod harnesses { Operation::Set(key, value) => { batch = batch.set(key, value); } - Operation::Commit(_) => { + Operation::Commit(_, _) => { panic!("Commit operation not supported in apply_ops"); } } } - let merkleized = batch.merkleize(&db, metadata); + let merkleized = batch.merkleize(&db, metadata, floor); db.apply_batch(merkleized).await.unwrap(); db } @@ -860,11 +959,11 @@ pub(crate) mod harnesses { } fn create_ops(n: usize) -> Vec> { - variable_create_ops_seeded(n, 0) + variable_create_ops_seeded::(n, 0) } fn create_ops_seeded(n: usize, seed: u64) -> Vec> { - variable_create_ops_seeded(n, seed) + variable_create_ops_seeded::(n, seed) } fn sample_metadata() -> Self::Metadata { @@ -900,7 +999,29 @@ pub(crate) mod harnesses { variable_apply_ops::(db, ops, metadata).await } + async fn apply_ops_with_floor( + db: Self::Db, + ops: Vec>, + metadata: Option, + floor: Location, + ) -> Self::Db { + variable_apply_ops_with_floor::(db, ops, metadata, floor).await + } + + async fn commit(db: &mut Self::Db) { + db.commit().await.unwrap(); + } + + fn inactivity_floor_loc(db: &Self::Db) -> Location { + db.inactivity_floor_loc() + } + async fn prune(db: &mut Self::Db, loc: Location) { + // Advance the inactivity floor to `loc` via a commit before pruning, + // since prune requires the floor to be at or beyond the prune target. + let merkleized = db.new_batch().merkleize(db, None, loc); + db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); db.prune(loc).await.unwrap(); } @@ -919,7 +1040,7 @@ pub(crate) mod harnesses { fn op_kv(op: &OpOf) -> Option<(&Self::Key, &Self::Value)> { match op { Operation::Set(key, value) => Some((key, value)), - Operation::Commit(_) => None, + Operation::Commit(_, _) => None, } } @@ -1007,6 +1128,11 @@ macro_rules! sync_tests_for_harness { fn test_target_update_on_done_client() { super::test_target_update_on_done_client::<$harness>(); } + + #[test_traced("WARN")] + fn test_sync_nonzero_floor() { + super::test_sync_nonzero_floor::<$harness>(); + } } }; } diff --git a/storage/src/qmdb/immutable/variable.rs b/storage/src/qmdb/immutable/variable.rs index 80ded03edd0..893c4d88de4 100644 --- a/storage/src/qmdb/immutable/variable.rs +++ b/storage/src/qmdb/immutable/variable.rs @@ -21,14 +21,14 @@ use commonware_cryptography::Hasher; use commonware_runtime::{Clock, Metrics, Storage}; /// Type alias for a variable-size operation. -pub type Operation = BaseOperation>; +pub type Operation = BaseOperation>; /// Type alias for the variable-size immutable database. pub type Db = - Immutable, variable::Journal>, H, T>; + Immutable, variable::Journal>, H, T>; type Journal = - authenticated::Journal>, H>; + authenticated::Journal>, H>; /// Configuration for a variable-size immutable authenticated db. pub type Config = BaseConfig>; @@ -46,13 +46,13 @@ impl< /// discarded and the state of the db will be as of the last committed operation. pub async fn init( context: E, - cfg: Config as Read>::Cfg>, + cfg: Config as Read>::Cfg>, ) -> Result> { let journal: Journal = Journal::new( context.clone(), cfg.merkle_config, cfg.log, - Operation::::is_commit, + Operation::::is_commit, ) .await?; Self::init_from_journal(journal, context, cfg.translator).await @@ -233,10 +233,10 @@ mod tests { } #[test_traced("INFO")] - fn test_variable_prune_beyond_commit() { + fn test_variable_prune_beyond_floor() { let executor = deterministic::Runner::default(); executor.start(|ctx| async move { - test::test_immutable_prune_beyond_commit(ctx, open::).await; + test::test_immutable_prune_beyond_floor(ctx, open::).await; }); } @@ -502,10 +502,10 @@ mod tests { } #[test_traced("INFO")] - fn test_variable_prune_beyond_commit_mmb() { + fn test_variable_prune_beyond_floor_mmb() { let executor = deterministic::Runner::default(); executor.start(|ctx| async move { - test::test_immutable_prune_beyond_commit(ctx, open::).await; + test::test_immutable_prune_beyond_floor(ctx, open::).await; }); } @@ -695,4 +695,136 @@ mod tests { test::test_immutable_apply_after_ancestor_dropped(ctx, open::).await; }); } + + #[test_traced("INFO")] + fn test_variable_inactivity_floor_tracking() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_inactivity_floor_tracking(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_variable_floor_monotonicity() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_floor_monotonicity(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_variable_floor_monotonicity_violation() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_floor_monotonicity_violation(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_variable_floor_beyond_size() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_floor_beyond_size(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_variable_rewind_restores_floor() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_rewind_restores_floor(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_variable_inactivity_floor_tracking_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_inactivity_floor_tracking(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_variable_floor_monotonicity_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_floor_monotonicity(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_variable_floor_monotonicity_violation_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_floor_monotonicity_violation(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_variable_floor_beyond_size_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_floor_beyond_size(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_variable_rewind_restores_floor_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_rewind_restores_floor(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_variable_single_commit_live_set() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_single_commit_live_set(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_variable_single_commit_live_set_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_single_commit_live_set(ctx, open::).await; + }); + } + + #[test_traced("INFO")] + fn test_variable_rewind_after_reopen_with_floor_change() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_rewind_after_reopen_with_floor_change(ctx, open::) + .await; + }); + } + + #[test_traced("INFO")] + fn test_variable_rewind_after_reopen_with_floor_change_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_rewind_after_reopen_with_floor_change(ctx, open::) + .await; + }); + } + + #[test_traced("INFO")] + fn test_variable_rewind_after_reopen_partial_floor_gap() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_rewind_after_reopen_partial_floor_gap(ctx, open::) + .await; + }); + } + + #[test_traced("INFO")] + fn test_variable_rewind_after_reopen_partial_floor_gap_mmb() { + let executor = deterministic::Runner::default(); + executor.start(|ctx| async move { + test::test_immutable_rewind_after_reopen_partial_floor_gap(ctx, open::) + .await; + }); + } } diff --git a/storage/src/qmdb/mod.rs b/storage/src/qmdb/mod.rs index 5f5d963cea4..b6318834b5c 100644 --- a/storage/src/qmdb/mod.rs +++ b/storage/src/qmdb/mod.rs @@ -115,6 +115,14 @@ pub enum Error { batch_db_size: u64, batch_base_size: u64, }, + + /// The batch's inactivity floor is lower than the database's current floor. + #[error("floor regressed: batch floor {0} < current floor {1}")] + FloorRegressed(Location, Location), + + /// The batch's inactivity floor exceeds its total operation count. + #[error("floor beyond size: floor {0} > total size {1}")] + FloorBeyondSize(Location, Location), } impl From> for Error { diff --git a/storage/src/qmdb/sync/resolver.rs b/storage/src/qmdb/sync/resolver.rs index 4dc9c6e8482..16b6e442ed9 100644 --- a/storage/src/qmdb/sync/resolver.rs +++ b/storage/src/qmdb/sync/resolver.rs @@ -222,8 +222,9 @@ impl_resolver!(OrderedFixedDb, OrderedFixedOperation, FixedValue); // Ordered Variable impl_resolver!(OrderedVariableDb, OrderedVariableOperation, VariableValue); -// Immutable types have a different Operation signature (no F parameter), -// so we use a separate macro. +// Immutable types need a separate macro because the key bound varies +// (Array for fixed, Key for variable) unlike the other DB types which +// always use Array. macro_rules! impl_resolver_immutable { ($db:ident, $op:ident, $val_bound:ident, $key_bound:path) => { impl Resolver for Arc<$db> @@ -238,7 +239,7 @@ macro_rules! impl_resolver_immutable { { type Family = F; type Digest = H::Digest; - type Op = $op; + type Op = $op; type Error = qmdb::Error; async fn get_operations( @@ -277,7 +278,7 @@ macro_rules! impl_resolver_immutable { { type Family = F; type Digest = H::Digest; - type Op = $op; + type Op = $op; type Error = qmdb::Error; async fn get_operations( @@ -316,7 +317,7 @@ macro_rules! impl_resolver_immutable { { type Family = F; type Digest = H::Digest; - type Op = $op; + type Op = $op; type Error = qmdb::Error; async fn get_operations(