From febb61ab78e639fe9ee0d15d713b63a4504034f7 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Fri, 17 Apr 2026 15:36:32 -0700 Subject: [PATCH 1/6] add inactivity floor to qmdb::keyless Commit operation --- examples/sync/src/databases/keyless.rs | 22 +- storage/conformance.toml | 8 +- storage/fuzz/fuzz_targets/qmdb_keyless.rs | 34 +- storage/src/qmdb/benches/generate.rs | 5 +- storage/src/qmdb/keyless/batch.rs | 39 +- storage/src/qmdb/keyless/fixed.rs | 72 +- storage/src/qmdb/keyless/mod.rs | 672 ++++++++++++------ storage/src/qmdb/keyless/operation/fixed.rs | 79 +- storage/src/qmdb/keyless/operation/mod.rs | 84 ++- .../src/qmdb/keyless/operation/variable.rs | 57 +- storage/src/qmdb/keyless/sync.rs | 33 +- storage/src/qmdb/keyless/variable.rs | 74 +- storage/src/qmdb/mod.rs | 8 + storage/src/qmdb/sync/resolver.rs | 6 +- 14 files changed, 853 insertions(+), 340 deletions(-) diff --git a/examples/sync/src/databases/keyless.rs b/examples/sync/src/databases/keyless.rs index dd67840f07f..e9736da3be2 100644 --- a/examples/sync/src/databases/keyless.rs +++ b/examples/sync/src/databases/keyless.rs @@ -1,9 +1,9 @@ //! Keyless database types and helpers for the sync example. //! //! A `keyless` database is append-only: operations are stored by location rather than by key. -//! It supports `Append(value)` and `Commit(metadata)` operations. For sync, the engine targets -//! the Merkle root over all operations, and the client reconstructs the same state by replaying -//! the fetched operations. +//! It supports `Append(value)` and `Commit(metadata, floor)` operations. For sync, the engine +//! targets the Merkle root over all operations, and the client reconstructs the same state by +//! replaying the fetched operations. use crate::{Hasher, Key, Value}; use commonware_cryptography::{Hasher as CryptoHasher, Sha256}; @@ -28,7 +28,7 @@ use tracing::error; pub type Database = fixed::Db; /// Operation type alias. -pub type Operation = fixed::Operation; +pub type Operation = fixed::Operation; /// Create a database configuration for the keyless variant. pub fn create_config(context: &(impl BufferPooler + commonware_runtime::Metrics)) -> fixed::Config { @@ -71,12 +71,12 @@ pub fn create_test_operations(count: usize, seed: u64) -> Vec { operations.push(Operation::Append(value)); if (i + 1) % 10 == 0 { - operations.push(Operation::Commit(None)); + operations.push(Operation::Commit(None, Location::new(0))); } } // Always end with a commit - operations.push(Operation::Commit(Some(Sha256::fill(1)))); + operations.push(Operation::Commit(Some(Sha256::fill(1)), Location::new(0))); operations } @@ -107,8 +107,8 @@ where Operation::Append(value) => { batch = batch.append(value); } - Operation::Commit(metadata) => { - let merkleized = batch.merkleize(self, metadata); + Operation::Commit(metadata, floor) => { + let merkleized = batch.merkleize(self, metadata, floor); self.apply_batch(merkleized).await?; self.commit().await?; batch = self.new_batch(); @@ -127,9 +127,7 @@ where } async fn inactivity_floor(&self) -> Location { - // Keyless databases have no inactivity floor concept. - // Use the pruning boundary, same as immutable. - self.bounds().await.start + self.inactivity_floor_loc() } async fn historical_proof( @@ -163,7 +161,7 @@ mod tests { let ops = ::create_test_operations(5, 12345); assert_eq!(ops.len(), 6); // 5 operations + 1 commit - if let Operation::Commit(Some(_)) = &ops[5] { + if let Operation::Commit(Some(_), _) = &ops[5] { // ok } else { panic!("last operation should be a commit with metadata"); diff --git a/storage/conformance.toml b/storage/conformance.toml index b898401bf9a..aee748a9f67 100644 --- a/storage/conformance.toml +++ b/storage/conformance.toml @@ -150,13 +150,13 @@ hash = "cce5f888e506282f861e0e49e176b26c65f92e457f0c5f5353fc9b7196f07478" n_cases = 65536 hash = "fdca5df62d243b28676ee15034663694cd219d5ef80749079126b0ed73effe0d" -["commonware_storage::qmdb::keyless::operation::tests::conformance::CodecConformance>>"] +["commonware_storage::qmdb::keyless::operation::tests::conformance::CodecConformance>>"] n_cases = 65536 -hash = "c692a20fa40180844888e7a26401f99a45ce3127faeca5f1228a41b454424623" +hash = "1ad07a5388e328474b353c153617b6a8a3a4713a9895f3f505148f919c49b86d" -["commonware_storage::qmdb::keyless::operation::tests::conformance::CodecConformance>>"] +["commonware_storage::qmdb::keyless::operation::tests::conformance::CodecConformance>>"] n_cases = 65536 -hash = "b41d6c6ec560bde9caf2e206526864c618a0721af367585a1719617ca7ce9291" +hash = "c6702e8b8d2e96438df00786289df9ee4ec92a767974fa1484766a246ebc71b7" ["commonware_storage::qmdb::sync::target::tests::conformance::CodecConformance>"] n_cases = 65536 diff --git a/storage/fuzz/fuzz_targets/qmdb_keyless.rs b/storage/fuzz/fuzz_targets/qmdb_keyless.rs index eb324720307..13f6cd4f862 100644 --- a/storage/fuzz/fuzz_targets/qmdb_keyless.rs +++ b/storage/fuzz/fuzz_targets/qmdb_keyless.rs @@ -25,6 +25,7 @@ enum Operation { }, Commit { metadata_bytes: Option>, + advance_floor: bool, }, Get { loc_offset: u32, @@ -67,7 +68,11 @@ impl<'a> Arbitrary<'a> for Operation { } else { None }; - Ok(Operation::Commit { metadata_bytes }) + let advance_floor: bool = u.arbitrary()?; + Ok(Operation::Commit { + metadata_bytes, + advance_floor, + }) } 2 => { let loc_offset = u.arbitrary()?; @@ -168,12 +173,21 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { pending_appends.push(value_bytes.clone()); } - Operation::Commit { metadata_bytes } => { + Operation::Commit { metadata_bytes, advance_floor } => { + let pending_count = pending_appends.len() as u64; let mut batch = db.new_batch(); for v in pending_appends.drain(..) { batch = batch.append(v); } - let merkleized = batch.merkleize(&db, metadata_bytes.clone()); + // If the fuzzer opts in, advance the floor to the commit location (the max + // valid value). Otherwise, keep the current floor to preserve monotonicity. + let floor = if *advance_floor { + let end = db.bounds().await.end; + Location::::new(end.as_u64() + pending_count) + } else { + db.inactivity_floor_loc() + }; + let merkleized = batch.merkleize(&db, metadata_bytes.clone(), floor); db.apply_batch(merkleized).await.expect("Commit should not fail"); db.commit().await.expect("Commit should not fail"); } @@ -195,10 +209,10 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { for v in pending_appends.drain(..) { batch = batch.append(v); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(merkleized).await.expect("Commit should not fail"); db.commit().await.expect("Commit should not fail"); - db.prune(db.last_commit_loc()) + db.prune(db.inactivity_floor_loc()) .await .expect("Prune should not fail"); } @@ -208,7 +222,7 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { for v in pending_appends.drain(..) { batch = batch.append(v); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(merkleized).await.expect("Commit should not fail"); db.sync().await.expect("Sync should not fail"); } @@ -230,7 +244,7 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { for v in pending_appends.drain(..) { batch = batch.append(v); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(merkleized).await.expect("Commit should not fail"); db.commit().await.expect("Commit should not fail"); let _ = db.root(); @@ -248,7 +262,7 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { for v in pending_appends.drain(..) { batch = batch.append(v); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(merkleized).await.expect("Commit should not fail"); db.commit().await.expect("Commit should not fail"); let start_loc = (*start_offset as u64) % op_count.as_u64(); @@ -276,7 +290,7 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { for v in pending_appends.drain(..) { batch = batch.append(v); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(merkleized).await.expect("Commit should not fail"); db.commit().await.expect("Commit should not fail"); // Use post-commit op_count so it's consistent with the root. @@ -317,7 +331,7 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { for v in pending_appends.drain(..) { batch = batch.append(v); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(merkleized).await.expect("Commit should not fail"); db.destroy().await.expect("Destroy should not fail"); }); diff --git a/storage/src/qmdb/benches/generate.rs b/storage/src/qmdb/benches/generate.rs index bdef1fd21cd..a0bb0997695 100644 --- a/storage/src/qmdb/benches/generate.rs +++ b/storage/src/qmdb/benches/generate.rs @@ -135,12 +135,13 @@ fn bench_keyless_generate(c: &mut Criterion) { let v = make_var_value(&mut rng); batch = batch.append(v); if rng.next_u32() % KEYLESS_COMMIT_FREQ == 0 { - let merkleized = batch.merkleize(&db, None); + let merkleized = + batch.merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(merkleized).await.unwrap(); batch = db.new_batch(); } } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(merkleized).await.unwrap(); db.sync().await.unwrap(); diff --git a/storage/src/qmdb/keyless/batch.rs b/storage/src/qmdb/keyless/batch.rs index 2f8a926a643..59a03dff404 100644 --- a/storage/src/qmdb/keyless/batch.rs +++ b/storage/src/qmdb/keyless/batch.rs @@ -21,10 +21,10 @@ where F: Family, V: ValueEncoding, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { /// Authenticated journal batch for computing the speculative Merkle root. - journal_batch: authenticated::UnmerkleizedBatch>, + journal_batch: authenticated::UnmerkleizedBatch>, /// Pending appends. appends: Vec, @@ -45,10 +45,10 @@ where #[derive(Clone)] pub struct MerkleizedBatch where - Operation: EncodeShared, + Operation: EncodeShared, { /// Authenticated journal batch (Merkle state + local items). - pub(super) journal_batch: Arc>>, + pub(super) journal_batch: Arc>>, /// The parent batch in the chain, if any. pub(super) parent: Option>, @@ -65,11 +65,14 @@ where /// Each ancestor's `total_size` (operation count after that ancestor). /// Used by `apply_batch` to validate partial ancestor commits. pub(super) ancestor_batch_ends: Vec, + + /// The inactivity floor declared by this batch's commit operation. + pub(super) new_inactivity_floor_loc: Location, } impl MerkleizedBatch where - Operation: EncodeShared, + Operation: EncodeShared, { /// Iterate over ancestor batches (parent first, then grandparent, etc.). pub(super) fn ancestors(&self) -> impl Iterator> { @@ -90,9 +93,9 @@ where fn read_chain_op( batch: &MerkleizedBatch, loc: u64, -) -> Option> +) -> Option> where - Operation: EncodeShared, + Operation: EncodeShared, { // Each batch's items span [size - items.len(), size). We compute the range from the // journal (strong Arcs, always intact) rather than from the QMDB-layer Weak parent @@ -117,13 +120,13 @@ where F: Family, V: ValueEncoding, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { /// Create a batch from a committed DB (no parent chain). pub(super) fn new(keyless: &Keyless, journal_size: u64) -> Self where E: Context, - C: Mutable> + Persistable, + C: Mutable> + Persistable, { Self { journal_batch: keyless.journal.new_batch(), @@ -155,7 +158,7 @@ where ) -> Result, Error> where E: Context, - C: Mutable> + Persistable, + C: Mutable> + Persistable, { let loc_val = *loc; @@ -184,23 +187,28 @@ where } /// Resolve appends into operations, merkleize, and return an `Arc`. + /// + /// `inactivity_floor` is the application-declared floor embedded in the commit. It must + /// be monotonically non-decreasing (enforced on `apply_batch`) and must not exceed the + /// batch's total operation count. pub fn merkleize( self, db: &Keyless, metadata: Option, + inactivity_floor: Location, ) -> Arc> where E: Context, - C: Mutable> + Persistable, + C: Mutable> + Persistable, { let base = self.base_size; // Build operations: one Append per value, then Commit. - let mut ops: Vec> = Vec::with_capacity(self.appends.len() + 1); + let mut ops: Vec> = Vec::with_capacity(self.appends.len() + 1); for value in self.appends { ops.push(Operation::Append(value)); } - ops.push(Operation::Commit(metadata)); + ops.push(Operation::Commit(metadata, inactivity_floor)); let total_size = base + ops.len() as u64; @@ -226,13 +234,14 @@ where total_size, db_size: self.db_size, ancestor_batch_ends, + new_inactivity_floor_loc: inactivity_floor, }) } } impl MerkleizedBatch where - Operation: EncodeShared, + Operation: EncodeShared, { /// Return the speculative root. pub fn root(&self) -> D { @@ -248,7 +257,7 @@ where where E: Context, H: Hasher, - C: Mutable> + Persistable, + C: Mutable> + Persistable, { let loc_val = *loc; diff --git a/storage/src/qmdb/keyless/fixed.rs b/storage/src/qmdb/keyless/fixed.rs index d0fb9f2f90d..4ae548406ed 100644 --- a/storage/src/qmdb/keyless/fixed.rs +++ b/storage/src/qmdb/keyless/fixed.rs @@ -19,13 +19,13 @@ use commonware_cryptography::Hasher; use commonware_runtime::{Clock, Metrics, Storage}; /// Keyless operation for fixed-size values. -pub type Operation = BaseOperation>; +pub type Operation = BaseOperation>; /// A keyless authenticated database for fixed-size data. pub type Db = - super::Keyless, fixed::Journal>, H>; + super::Keyless, fixed::Journal>, H>; -type Journal = authenticated::Journal>, H>; +type Journal = authenticated::Journal>, H>; /// Configuration for a fixed-size [keyless](super) authenticated db. pub type Config = super::Config; @@ -35,7 +35,7 @@ impl Db Result> { let journal: Journal = - Journal::new(context, cfg.merkle, cfg.log, Operation::::is_commit).await?; + Journal::new(context, cfg.merkle, cfg.log, Operation::::is_commit).await?; Self::init_from_journal(journal).await } } @@ -330,6 +330,38 @@ mod test { }); } + #[test_traced("INFO")] + fn test_keyless_fixed_floor_tracking() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db1")).await; + tests::test_keyless_db_floor_tracking(ctx, db, reopen::()).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_fixed_floor_regression_rejected() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_floor_regression_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_fixed_floor_beyond_size_rejected() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_floor_beyond_size_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_fixed_rewind_restores_floor() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_rewind_restores_floor(db).await; + }); + } + // mmb::Family variants #[test_traced("INFO")] @@ -572,4 +604,36 @@ mod test { tests::test_keyless_db_rewind_pruned_target_errors(db).await; }); } + + #[test_traced("INFO")] + fn test_keyless_fixed_floor_tracking_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db1")).await; + tests::test_keyless_db_floor_tracking(ctx, db, reopen::()).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_fixed_floor_regression_rejected_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_floor_regression_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_fixed_floor_beyond_size_rejected_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_floor_beyond_size_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_fixed_rewind_restores_floor_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_rewind_restores_floor(db).await; + }); + } } diff --git a/storage/src/qmdb/keyless/mod.rs b/storage/src/qmdb/keyless/mod.rs index 37895864572..6827fe9e6b4 100644 --- a/storage/src/qmdb/keyless/mod.rs +++ b/storage/src/qmdb/keyless/mod.rs @@ -7,23 +7,24 @@ //! ```ignore //! // Simple mode: apply a batch, then durably commit it. //! let batch = db.new_batch().append(value); -//! let merkleized = batch.merkleize(&db, None); +//! let merkleized = batch.merkleize(&db, None, db.inactivity_floor_loc()); //! db.apply_batch(merkleized).await?; //! db.commit().await?; //! ``` //! //! ```ignore //! // Batches can still fork before you apply them. +//! let floor = db.inactivity_floor_loc(); //! let parent = db.new_batch().append(value_a); -//! let parent = parent.merkleize(&db, None); +//! let parent = parent.merkleize(&db, None, floor); //! //! let child_a = parent.new_batch(); //! let child_a = child_a.append(value_b); -//! let child_a = child_a.merkleize(&db, None); +//! let child_a = child_a.merkleize(&db, None, floor); //! //! let child_b = parent.new_batch(); //! let child_b = child_b.append(value_c); -//! let child_b = child_b.merkleize(&db, None); +//! let child_b = child_b.merkleize(&db, None, floor); //! //! db.apply_batch(child_a).await?; //! db.commit().await?; @@ -31,10 +32,11 @@ //! //! ```ignore //! // Sequential commit: apply parent then child. +//! let floor = db.inactivity_floor_loc(); //! let parent = db.new_batch().append(value_a); -//! let parent_m = parent.merkleize(&db, None); +//! let parent_m = parent.merkleize(&db, None, floor); //! let child = parent_m.new_batch().append(value_b); -//! let child_m = child.merkleize(&db, None); +//! let child_m = child.merkleize(&db, None, floor); //! //! db.apply_batch(parent_m).await?; //! db.apply_batch(child_m).await?; @@ -79,15 +81,19 @@ where F: Family, E: Context, V: ValueEncoding, - C: Contiguous>, + C: Contiguous>, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { /// Authenticated journal of operations. journal: authenticated::Journal, /// The location of the last commit, if any. last_commit_loc: Location, + + /// The inactivity floor declared by the last committed batch. Operations at locations below + /// this value are considered inactive by the application and may be pruned. + inactivity_floor_loc: Location, } impl Keyless @@ -95,16 +101,18 @@ where F: Family, E: Context, V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { pub(crate) async fn init_from_journal( mut journal: authenticated::Journal, ) -> Result> { if journal.size().await == 0 { warn!("no operations found in log, creating initial commit"); - journal.append(&Operation::Commit(None)).await?; + journal + .append(&Operation::Commit(None, Location::new(0))) + .await?; journal.sync().await?; } @@ -114,9 +122,17 @@ where .checked_sub(1) .expect("at least one commit should exist"); + let inactivity_floor_loc = { + let reader = journal.reader().await; + let op = reader.read(*last_commit_loc).await?; + op.has_floor() + .expect("last operation should be a commit with floor") + }; + Ok(Self { journal, last_commit_loc, + inactivity_floor_loc, }) } @@ -142,6 +158,11 @@ where self.last_commit_loc } + /// Returns the inactivity floor declared by the last committed batch. + pub const fn inactivity_floor_loc(&self) -> Location { + self.inactivity_floor_loc + } + /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest /// retained operations respectively. pub async fn bounds(&self) -> std::ops::Range> { @@ -157,7 +178,7 @@ where .await .read(*self.last_commit_loc) .await?; - let Operation::Commit(metadata) = op else { + let Operation::Commit(metadata, _floor) = op else { return Ok(None); }; @@ -186,7 +207,7 @@ where &self, start_loc: Location, max_ops: NonZeroU64, - ) -> Result<(Proof, Vec>), Error> { + ) -> Result<(Proof, Vec>), Error> { self.historical_proof(self.bounds().await.end, start_loc, max_ops) .await } @@ -205,7 +226,7 @@ where op_count: Location, start_loc: Location, max_ops: NonZeroU64, - ) -> Result<(Proof, Vec>), Error> { + ) -> Result<(Proof, Vec>), Error> { Ok(self .journal .historical_proof(op_count, start_loc, max_ops) @@ -233,10 +254,14 @@ where /// /// # Errors /// - /// - Returns [`Error::PruneBeyondMinRequired`] if `loc` > last commit point. + /// - Returns [`Error::PruneBeyondMinRequired`] if `loc` > the inactivity floor declared by + /// the last committed batch. pub async fn prune(&mut self, loc: Location) -> Result<(), Error> { - if loc > self.last_commit_loc { - return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc)); + if loc > self.inactivity_floor_loc { + return Err(Error::PruneBeyondMinRequired( + loc, + self.inactivity_floor_loc, + )); } self.journal.prune(loc).await?; @@ -275,7 +300,7 @@ where } let rewind_last_loc = Location::new(rewind_size - 1); - { + let rewind_floor = { let reader = self.journal.reader().await; let bounds = reader.bounds(); if rewind_size <= bounds.start { @@ -284,15 +309,17 @@ where ))); } let rewind_last_op = reader.read(*rewind_last_loc).await?; - if !matches!(rewind_last_op, Operation::Commit(_)) { + let Operation::Commit(_, floor) = rewind_last_op else { return Err(Error::UnexpectedData(rewind_last_loc)); - } - } + }; + floor + }; // Journal rewind happens before in-memory commit-location updates. If a later step fails, // this handle may be internally diverged and must be dropped by the caller. self.journal.rewind(rewind_size).await?; self.last_commit_loc = rewind_last_loc; + self.inactivity_floor_loc = rewind_floor; Ok(()) } @@ -329,6 +356,7 @@ where total_size: journal_size, db_size: journal_size, ancestor_batch_ends: Vec::new(), + new_inactivity_floor_loc: self.inactivity_floor_loc, }) } @@ -358,11 +386,26 @@ where batch_base_size: batch.base_size, }); } + // Enforce floor monotonicity and bounds before mutating the journal. + if batch.new_inactivity_floor_loc < self.inactivity_floor_loc { + return Err(Error::FloorRegressed( + batch.new_inactivity_floor_loc, + self.inactivity_floor_loc, + )); + } + let batch_end = Location::new(batch.total_size); + if batch.new_inactivity_floor_loc > batch_end { + return Err(Error::FloorBeyondSize( + batch.new_inactivity_floor_loc, + batch_end, + )); + } let start_loc = self.last_commit_loc + 1; self.journal.apply_batch(&batch.journal_batch).await?; self.last_commit_loc = Location::new(batch.total_size - 1); + self.inactivity_floor_loc = batch.new_inactivity_floor_loc; let end_loc = Location::new(batch.total_size); debug!(size = ?end_loc, "applied batch"); Ok(start_loc..end_loc) @@ -409,9 +452,9 @@ pub(crate) mod tests { reopen: Reopen>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { let bounds = db.bounds().await; assert_eq!(bounds.end, 1); // initial commit should exist @@ -434,7 +477,9 @@ pub(crate) mod tests { // Test calling commit on an empty db which should make it (durably) non-empty. let metadata = V::Value::make(99); - let merkleized = db.new_batch().merkleize(&db, Some(metadata.clone())); + let merkleized = + db.new_batch() + .merkleize(&db, Some(metadata.clone()), db.inactivity_floor_loc()); db.apply_batch(merkleized).await.unwrap(); db.commit().await.unwrap(); assert_eq!(db.bounds().await.end, 2); // 2 commit ops @@ -461,9 +506,9 @@ pub(crate) mod tests { reopen: Reopen>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { // Build a db with 2 values and make sure we can get them back. let v1 = V::Value::make(1); @@ -477,7 +522,9 @@ pub(crate) mod tests { let batch = batch.append(v2.clone()); assert_eq!(loc1, Location::new(1)); assert_eq!(loc2, Location::new(2)); - db.apply_batch(batch.merkleize(&db, None)).await.unwrap(); + db.apply_batch(batch.merkleize(&db, None, db.inactivity_floor_loc())) + .await + .unwrap(); } // Make sure closing/reopening gets us back to the same state. @@ -509,9 +556,9 @@ pub(crate) mod tests { reopen: Reopen>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { let root = db.root(); const ELEMENTS: u64 = 100; @@ -535,7 +582,9 @@ pub(crate) mod tests { for i in 0..ELEMENTS { batch = batch.append(V::Value::make(i + 100)); } - db.apply_batch(batch.merkleize(&db, None)).await.unwrap(); + db.apply_batch(batch.merkleize(&db, None, db.inactivity_floor_loc())) + .await + .unwrap(); } db.commit().await.unwrap(); let root = db.root(); @@ -559,7 +608,9 @@ pub(crate) mod tests { for i in 0..ELEMENTS { batch = batch.append(V::Value::make(i + 300)); } - db.apply_batch(batch.merkleize(&db, None)).await.unwrap(); + db.apply_batch(batch.merkleize(&db, None, db.inactivity_floor_loc())) + .await + .unwrap(); } db.commit().await.unwrap(); let root = db.root(); @@ -577,8 +628,8 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, - Operation: EncodeShared + std::fmt::Debug, + C: Mutable> + Persistable, + Operation: EncodeShared + std::fmt::Debug, { let hasher = Standard::::new(); const ELEMENTS: u64 = 50; @@ -588,7 +639,9 @@ pub(crate) mod tests { for i in 0..ELEMENTS { batch = batch.append(V::Value::make(i)); } - db.apply_batch(batch.merkleize(&db, None)).await.unwrap(); + db.apply_batch(batch.merkleize(&db, None, db.inactivity_floor_loc())) + .await + .unwrap(); } let root = db.root(); @@ -613,19 +666,22 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { let metadata = V::Value::make(99); - let merkleized = db - .new_batch() - .append(V::Value::make(1)) - .merkleize(&db, Some(metadata.clone())); + let merkleized = db.new_batch().append(V::Value::make(1)).merkleize( + &db, + Some(metadata.clone()), + db.inactivity_floor_loc(), + ); db.apply_batch(merkleized).await.unwrap(); assert_eq!(db.get_metadata().await.unwrap(), Some(metadata)); - let merkleized = db.new_batch().merkleize(&db, None); + let merkleized = db + .new_batch() + .merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(merkleized).await.unwrap(); assert_eq!(db.get_metadata().await.unwrap(), None); @@ -636,47 +692,49 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { - // Test pruning empty database (no appends beyond initial commit). + // Initial floor is 0, so pruning past 0 should fail. + assert_eq!(db.inactivity_floor_loc(), Location::new(0)); let result = db.prune(Location::new(1)).await; assert!( - matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc)) - if prune_loc == Location::new(1) && commit_loc == Location::new(0)) + matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, floor)) + if prune_loc == Location::new(1) && floor == Location::new(0)) ); - // Add values and commit. + // Add values and commit, advancing the floor to the new commit location. + let first_commit_loc = Location::::new(3); let merkleized = db .new_batch() .append(V::Value::make(1)) .append(V::Value::make(2)) - .merkleize(&db, None); + .merkleize(&db, None, first_commit_loc); db.apply_batch(merkleized).await.unwrap(); - - // op_count is 4 (initial_commit, v1, v2, commit), last_commit_loc is 3. - let last_commit = db.last_commit_loc(); - assert_eq!(last_commit, Location::new(3)); - - let merkleized = db - .new_batch() - .append(V::Value::make(3)) - .merkleize(&db, None); + assert_eq!(db.last_commit_loc(), first_commit_loc); + assert_eq!(db.inactivity_floor_loc(), first_commit_loc); + + // Append one more, advancing the floor with it. + let second_commit_loc = Location::::new(5); + let merkleized = + db.new_batch() + .append(V::Value::make(3)) + .merkleize(&db, None, second_commit_loc); db.apply_batch(merkleized).await.unwrap(); - // Test valid prune (at previous commit location 3). + // Valid prune: up to the floor (previous commit location). let root = db.root(); - assert!(db.prune(Location::new(3)).await.is_ok()); + assert!(db.prune(first_commit_loc).await.is_ok()); assert_eq!(db.root(), root); - // Test pruning beyond last commit. - let new_last_commit = db.last_commit_loc(); - let beyond = Location::new(*new_last_commit + 1); + // Pruning beyond the current floor fails. + let new_floor = db.inactivity_floor_loc(); + let beyond = Location::new(*new_floor + 1); let result = db.prune(beyond).await; assert!( - matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc)) - if prune_loc == beyond && commit_loc == new_last_commit) + matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, floor)) + if prune_loc == beyond && floor == new_floor) ); db.destroy().await.unwrap(); @@ -688,9 +746,9 @@ pub(crate) mod tests { reopen: Reopen>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { let root = db.root(); const ELEMENTS: u64 = 200; @@ -746,7 +804,9 @@ pub(crate) mod tests { for i in 0..ELEMENTS { batch = batch.append(V::Value::make(i + 2000)); } - db.apply_batch(batch.merkleize(&db, None)).await.unwrap(); + db.apply_batch(batch.merkleize(&db, None, db.inactivity_floor_loc())) + .await + .unwrap(); } db.commit().await.unwrap(); let db = reopen(context.with_label("db6")).await; @@ -762,9 +822,9 @@ pub(crate) mod tests { reopen: Reopen>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { // Add some initial operations and commit. { @@ -772,7 +832,9 @@ pub(crate) mod tests { for i in 0..10u64 { batch = batch.append(V::Value::make(i)); } - db.apply_batch(batch.merkleize(&db, None)).await.unwrap(); + db.apply_batch(batch.merkleize(&db, None, db.inactivity_floor_loc())) + .await + .unwrap(); } db.commit().await.unwrap(); let committed_root = db.root(); @@ -809,7 +871,9 @@ pub(crate) mod tests { loc, committed_size, "New append should get the expected location" ); - db.apply_batch(batch.merkleize(&db, None)).await.unwrap(); + db.apply_batch(batch.merkleize(&db, None, db.inactivity_floor_loc())) + .await + .unwrap(); } db.commit().await.unwrap(); @@ -853,8 +917,8 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, - Operation: EncodeShared, + C: Mutable> + Persistable, + Operation: EncodeShared, { let v1 = V::Value::make(10); let v2 = V::Value::make(20); @@ -863,14 +927,14 @@ pub(crate) mod tests { let parent = db.new_batch(); let loc1 = parent.size(); let parent = parent.append(v1.clone()); - let parent_m = parent.merkleize(&db, None); + let parent_m = parent.merkleize(&db, None, db.inactivity_floor_loc()); let child = parent_m.new_batch::(); let loc2 = child.size(); let child = child.append(v2.clone()); let loc3 = child.size(); let child = child.append(v3.clone()); - let child_m = child.merkleize(&db, None); + let child_m = child.merkleize(&db, None, db.inactivity_floor_loc()); let child_root = child_m.root(); db.apply_batch(child_m).await.unwrap(); @@ -888,18 +952,20 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { - let batch_a = db - .new_batch() - .append(V::Value::make(10)) - .merkleize(&db, None); - let batch_b = db - .new_batch() - .append(V::Value::make(20)) - .merkleize(&db, None); + let batch_a = db.new_batch().append(V::Value::make(10)).merkleize( + &db, + None, + db.inactivity_floor_loc(), + ); + let batch_b = db.new_batch().append(V::Value::make(20)).merkleize( + &db, + None, + db.inactivity_floor_loc(), + ); db.apply_batch(batch_a).await.unwrap(); @@ -913,23 +979,26 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { // Chain: DB <- A <- B <- C - let a = db - .new_batch() - .append(V::Value::make(10)) - .merkleize(&db, None); - let b = a - .new_batch::() - .append(V::Value::make(20)) - .merkleize(&db, None); - let c = b - .new_batch::() - .append(V::Value::make(30)) - .merkleize(&db, None); + let a = db.new_batch().append(V::Value::make(10)).merkleize( + &db, + None, + db.inactivity_floor_loc(), + ); + let b = a.new_batch::().append(V::Value::make(20)).merkleize( + &db, + None, + db.inactivity_floor_loc(), + ); + let c = b.new_batch::().append(V::Value::make(30)).merkleize( + &db, + None, + db.inactivity_floor_loc(), + ); let expected_root = c.root(); @@ -947,13 +1016,15 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, - Operation: EncodeShared, + C: Mutable> + Persistable, + Operation: EncodeShared, { let batch = db.new_batch(); let loc1 = batch.size(); let batch = batch.append(V::Value::make(10)); - db.apply_batch(batch.merkleize(&db, None)).await.unwrap(); + db.apply_batch(batch.merkleize(&db, None, db.inactivity_floor_loc())) + .await + .unwrap(); let snapshot = db.to_batch(); assert_eq!(snapshot.root(), db.root()); @@ -961,7 +1032,7 @@ pub(crate) mod tests { let child_batch = snapshot.new_batch::(); let loc2 = child_batch.size(); let child_batch = child_batch.append(V::Value::make(20)); - db.apply_batch(child_batch.merkleize(&db, None)) + db.apply_batch(child_batch.merkleize(&db, None, db.inactivity_floor_loc())) .await .unwrap(); @@ -977,18 +1048,22 @@ pub(crate) mod tests { reopen: Reopen>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { - // Append many values then commit. + // Append many values then commit, advancing the floor to the new commit so we can + // later prune up to it. const ELEMENTS: u64 = 200; { let mut batch = db.new_batch(); for i in 0..ELEMENTS { batch = batch.append(V::Value::make(i)); } - db.apply_batch(batch.merkleize(&db, None)).await.unwrap(); + let new_commit = Location::new(*db.last_commit_loc() + 1 + ELEMENTS); + db.apply_batch(batch.merkleize(&db, None, new_commit)) + .await + .unwrap(); } db.commit().await.unwrap(); let root = db.root(); @@ -1044,7 +1119,9 @@ pub(crate) mod tests { for i in 0..ELEMENTS { batch = batch.append(V::Value::make(i + 3000)); } - db.apply_batch(batch.merkleize(&db, None)).await.unwrap(); + db.apply_batch(batch.merkleize(&db, None, db.inactivity_floor_loc())) + .await + .unwrap(); } db.commit().await.unwrap(); let db = reopen(context.with_label("db5")).await; @@ -1060,8 +1137,8 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, - Operation: EncodeShared + std::fmt::Debug, + C: Mutable> + Persistable, + Operation: EncodeShared + std::fmt::Debug, { let hasher = Standard::::new(); @@ -1072,7 +1149,9 @@ pub(crate) mod tests { for i in 0u64..ELEMENTS { batch = batch.append(V::Value::make(i)); } - db.apply_batch(batch.merkleize(&db, None)).await.unwrap(); + db.apply_batch(batch.merkleize(&db, None, db.inactivity_floor_loc())) + .await + .unwrap(); } // Test that historical proof fails with op_count > number of operations. @@ -1134,8 +1213,8 @@ pub(crate) mod tests { reopen: Reopen>, ) where V: ValueEncoding, - C: Mutable> + Persistable, - Operation: EncodeShared + std::fmt::Debug, + C: Mutable> + Persistable, + Operation: EncodeShared + std::fmt::Debug, { let hasher = Standard::::new(); @@ -1145,7 +1224,10 @@ pub(crate) mod tests { for i in 0u64..ELEMENTS { batch = batch.append(V::Value::make(i)); } - db.apply_batch(batch.merkleize(&db, None)).await.unwrap(); + let new_commit = Location::new(*db.last_commit_loc() + 1 + ELEMENTS); + db.apply_batch(batch.merkleize(&db, None, new_commit)) + .await + .unwrap(); } { @@ -1153,7 +1235,10 @@ pub(crate) mod tests { for i in ELEMENTS..ELEMENTS * 2 { batch = batch.append(V::Value::make(i)); } - db.apply_batch(batch.merkleize(&db, None)).await.unwrap(); + let new_commit = Location::new(*db.last_commit_loc() + 1 + ELEMENTS); + db.apply_batch(batch.merkleize(&db, None, new_commit)) + .await + .unwrap(); } let root = db.root(); @@ -1208,9 +1293,9 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { assert!(db.get(Location::new(0)).await.unwrap().is_none()); @@ -1218,7 +1303,7 @@ pub(crate) mod tests { .new_batch() .append(V::Value::make(1)) .append(V::Value::make(2)) - .merkleize(&db, None); + .merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(merkleized).await.unwrap(); assert_eq!( @@ -1239,9 +1324,9 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { let base_vals: Vec = (0..3).map(|i| V::Value::make(10 + i)).collect(); let mut base_locs = Vec::new(); @@ -1252,7 +1337,9 @@ pub(crate) mod tests { batch = batch.append(v.clone()); base_locs.push(loc); } - db.apply_batch(batch.merkleize(&db, None)).await.unwrap(); + db.apply_batch(batch.merkleize(&db, None, db.inactivity_floor_loc())) + .await + .unwrap(); } let batch = db.new_batch(); @@ -1279,8 +1366,8 @@ pub(crate) mod tests { db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, - Operation: EncodeShared, + C: Mutable> + Persistable, + Operation: EncodeShared, { let v1 = V::Value::make(1); let v2 = V::Value::make(2); @@ -1288,7 +1375,7 @@ pub(crate) mod tests { let parent = db.new_batch(); let loc1 = parent.size(); let parent = parent.append(v1.clone()); - let parent_m = parent.merkleize(&db, None); + let parent_m = parent.merkleize(&db, None, db.inactivity_floor_loc()); let child = parent_m.new_batch::(); assert_eq!(child.get(loc1, &db).await.unwrap(), Some(v1)); @@ -1305,23 +1392,24 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { let mut batch = db.new_batch(); for i in 0u64..10 { batch = batch.append(V::Value::make(i)); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, db.inactivity_floor_loc()); let speculative = merkleized.root(); db.apply_batch(merkleized).await.unwrap(); assert_eq!(db.root(), speculative); - let merkleized = db - .new_batch() - .append(V::Value::make(100)) - .merkleize(&db, Some(V::Value::make(55))); + let merkleized = db.new_batch().append(V::Value::make(100)).merkleize( + &db, + Some(V::Value::make(55)), + db.inactivity_floor_loc(), + ); let speculative = merkleized.root(); db.apply_batch(merkleized).await.unwrap(); assert_eq!(db.root(), speculative); @@ -1333,15 +1421,21 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, - Operation: EncodeShared, + C: Mutable> + Persistable, + Operation: EncodeShared, { let base_val = V::Value::make(10); - let merkleized = db.new_batch().append(base_val.clone()).merkleize(&db, None); + let merkleized = + db.new_batch() + .append(base_val.clone()) + .merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(merkleized).await.unwrap(); let new_val = V::Value::make(20); - let merkleized = db.new_batch().append(new_val.clone()).merkleize(&db, None); + let merkleized = + db.new_batch() + .append(new_val.clone()) + .merkleize(&db, None, db.inactivity_floor_loc()); assert_eq!( merkleized.get(Location::new(1), &db).await.unwrap(), @@ -1360,9 +1454,9 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { let v1 = V::Value::make(1); let v2 = V::Value::make(2); @@ -1370,7 +1464,7 @@ pub(crate) mod tests { let parent = db.new_batch(); let loc1 = parent.size(); let parent = parent.append(v1.clone()); - let parent_m = parent.merkleize(&db, None); + let parent_m = parent.merkleize(&db, None, db.inactivity_floor_loc()); let parent_root = parent_m.root(); db.apply_batch(parent_m).await.unwrap(); @@ -1380,7 +1474,7 @@ pub(crate) mod tests { let batch2 = db.new_batch(); let loc2 = batch2.size(); let batch2 = batch2.append(v2.clone()); - let batch2_m = batch2.merkleize(&db, None); + let batch2_m = batch2.merkleize(&db, None, db.inactivity_floor_loc()); let batch2_root = batch2_m.root(); db.apply_batch(batch2_m).await.unwrap(); assert_eq!(db.root(), batch2_root); @@ -1393,8 +1487,8 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, - Operation: EncodeShared + std::fmt::Debug, + C: Mutable> + Persistable, + Operation: EncodeShared + std::fmt::Debug, { let hasher = Standard::::new(); @@ -1412,7 +1506,7 @@ pub(crate) mod tests { all_values.push(v); all_locs.push(loc); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(merkleized).await.unwrap(); } @@ -1432,19 +1526,22 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { - let merkleized = db - .new_batch() - .append(V::Value::make(1)) - .merkleize(&db, None); + let merkleized = db.new_batch().append(V::Value::make(1)).merkleize( + &db, + None, + db.inactivity_floor_loc(), + ); db.apply_batch(merkleized).await.unwrap(); let root_before = db.root(); let size_before = db.bounds().await.end; - let merkleized = db.new_batch().merkleize(&db, None); + let merkleized = db + .new_batch() + .merkleize(&db, None, db.inactivity_floor_loc()); let speculative = merkleized.root(); db.apply_batch(merkleized).await.unwrap(); @@ -1459,23 +1556,32 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, - Operation: EncodeShared, + C: Mutable> + Persistable, + Operation: EncodeShared, { let base_val = V::Value::make(10); - db.apply_batch(db.new_batch().append(base_val.clone()).merkleize(&db, None)) - .await - .unwrap(); + let floor = db.inactivity_floor_loc(); + db.apply_batch( + db.new_batch() + .append(base_val.clone()) + .merkleize(&db, None, floor), + ) + .await + .unwrap(); let v1 = V::Value::make(1); let parent = db.new_batch(); let loc1 = parent.size(); - let parent_m = parent.append(v1.clone()).merkleize(&db, None); + let parent_m = parent + .append(v1.clone()) + .merkleize(&db, None, db.inactivity_floor_loc()); let v2 = V::Value::make(2); let child = parent_m.new_batch::(); let loc2 = child.size(); - let child_m = child.append(v2.clone()).merkleize(&db, None); + let child_m = child + .append(v2.clone()) + .merkleize(&db, None, db.inactivity_floor_loc()); assert_eq!( child_m.get(Location::new(1), &db).await.unwrap(), @@ -1491,8 +1597,8 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, - Operation: EncodeShared + std::fmt::Debug, + C: Mutable> + Persistable, + Operation: EncodeShared + std::fmt::Debug, { let hasher = Standard::::new(); const N: u64 = 500; @@ -1506,7 +1612,7 @@ pub(crate) mod tests { batch = batch.append(v.clone()); values.push(v); } - let merkleized = batch.merkleize(&db, None); + let merkleized = batch.merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(merkleized).await.unwrap(); for (i, loc) in locs.iter().enumerate() { @@ -1525,21 +1631,22 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, - Operation: EncodeShared, + C: Mutable> + Persistable, + Operation: EncodeShared, { - let parent = db - .new_batch() - .append(V::Value::make(1)) - .merkleize(&db, None); + let parent = db.new_batch().append(V::Value::make(1)).merkleize( + &db, + None, + db.inactivity_floor_loc(), + ); let child_a = parent .new_batch::() .append(V::Value::make(2)) - .merkleize(&db, None); + .merkleize(&db, None, db.inactivity_floor_loc()); let child_b = parent .new_batch::() .append(V::Value::make(3)) - .merkleize(&db, None); + .merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(child_a).await.unwrap(); assert!(matches!( @@ -1554,17 +1661,18 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, - Operation: EncodeShared, + C: Mutable> + Persistable, + Operation: EncodeShared, { - let parent = db - .new_batch() - .append(V::Value::make(1)) - .merkleize(&db, None); + let parent = db.new_batch().append(V::Value::make(1)).merkleize( + &db, + None, + db.inactivity_floor_loc(), + ); let child = parent .new_batch::() .append(V::Value::make(2)) - .merkleize(&db, None); + .merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(parent).await.unwrap(); db.apply_batch(child).await.unwrap(); @@ -1576,17 +1684,18 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, - Operation: EncodeShared, + C: Mutable> + Persistable, + Operation: EncodeShared, { - let parent = db - .new_batch() - .append(V::Value::make(1)) - .merkleize(&db, None); + let parent = db.new_batch().append(V::Value::make(1)).merkleize( + &db, + None, + db.inactivity_floor_loc(), + ); let child = parent .new_batch::() .append(V::Value::make(2)) - .merkleize(&db, None); + .merkleize(&db, None, db.inactivity_floor_loc()); db.apply_batch(child).await.unwrap(); assert!(matches!( @@ -1601,28 +1710,30 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, - Operation: EncodeShared, + C: Mutable> + Persistable, + Operation: EncodeShared, { // Build the child while the parent is still pending. - let parent = db - .new_batch() - .append(V::Value::make(1)) - .merkleize(&db, None); + let parent = db.new_batch().append(V::Value::make(1)).merkleize( + &db, + None, + db.inactivity_floor_loc(), + ); let pending_child = parent .new_batch::() .append(V::Value::make(2)) - .merkleize(&db, None); + .merkleize(&db, None, db.inactivity_floor_loc()); // Commit the parent, then rebuild the same logical child from the // committed DB state and compare roots. db.apply_batch(parent).await.unwrap(); db.commit().await.unwrap(); - let committed_child = db - .new_batch() - .append(V::Value::make(2)) - .merkleize(&db, None); + let committed_child = db.new_batch().append(V::Value::make(2)).merkleize( + &db, + None, + db.inactivity_floor_loc(), + ); assert_eq!(pending_child.root(), committed_child.root()); @@ -1636,15 +1747,24 @@ pub(crate) mod tests { ) -> core::ops::Range> where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { + // Tests that don't specifically exercise floor behavior advance the floor to the new + // commit location, so pruning up to the last commit works analogously to the pre-floor + // semantics. + let base_size = *db.last_commit_loc() + 1; + let appends_iter: Vec<_> = values.into_iter().collect(); + let new_commit_loc = Location::new(base_size + appends_iter.len() as u64); let mut batch = db.new_batch(); - for value in values { + for value in appends_iter { batch = batch.append(value); } - let range = db.apply_batch(batch.merkleize(db, metadata)).await.unwrap(); + let range = db + .apply_batch(batch.merkleize(db, metadata, new_commit_loc)) + .await + .unwrap(); db.commit().await.unwrap(); range } @@ -1655,9 +1775,9 @@ pub(crate) mod tests { reopen: Reopen>, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { let initial_root = db.root(); let initial_size = db.bounds().await.end; @@ -1753,9 +1873,9 @@ pub(crate) mod tests { mut db: Keyless, ) where V: ValueEncoding, - C: Mutable> + Persistable, + C: Mutable> + Persistable, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { let first_range = commit_appends(&mut db, (0..16).map(V::Value::make), None).await; @@ -1798,4 +1918,156 @@ pub(crate) mod tests { db.destroy().await.unwrap(); } + + pub(crate) async fn test_keyless_db_floor_tracking( + context: deterministic::Context, + mut db: Keyless, + reopen: Reopen>, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + H: Hasher, + Operation: EncodeShared, + { + // Freshly created db has floor = 0. + assert_eq!(db.inactivity_floor_loc(), Location::new(0)); + + // Apply a batch with a declared floor; the db's floor should update. + let floor_a = Location::::new(2); + let merkleized = db + .new_batch() + .append(V::Value::make(1)) + .append(V::Value::make(2)) + .merkleize(&db, None, floor_a); + db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); + assert_eq!(db.inactivity_floor_loc(), floor_a); + + // Reopen: floor should survive restart (it's part of the last commit operation). + drop(db); + let mut db = reopen(context.with_label("reopen")).await; + assert_eq!(db.inactivity_floor_loc(), floor_a); + + // Floor may stay the same across a commit (monotonic non-decreasing). + let merkleized = db + .new_batch() + .append(V::Value::make(3)) + .merkleize(&db, None, floor_a); + db.apply_batch(merkleized).await.unwrap(); + assert_eq!(db.inactivity_floor_loc(), floor_a); + + // Floor may advance further. + let floor_b = Location::::new(5); + let merkleized = db + .new_batch() + .append(V::Value::make(4)) + .merkleize(&db, None, floor_b); + db.apply_batch(merkleized).await.unwrap(); + assert_eq!(db.inactivity_floor_loc(), floor_b); + + db.destroy().await.unwrap(); + } + + pub(crate) async fn test_keyless_db_floor_regression_rejected( + mut db: Keyless, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + H: Hasher, + Operation: EncodeShared, + { + // Advance floor to 3. + let merkleized = db + .new_batch() + .append(V::Value::make(1)) + .append(V::Value::make(2)) + .merkleize(&db, None, Location::new(3)); + db.apply_batch(merkleized).await.unwrap(); + assert_eq!(db.inactivity_floor_loc(), Location::new(3)); + + // Try to commit with a lower floor; apply_batch rejects. + let merkleized = + db.new_batch() + .append(V::Value::make(3)) + .merkleize(&db, None, Location::new(1)); + let err = db.apply_batch(merkleized).await.unwrap_err(); + assert!( + matches!(err, Error::FloorRegressed(new, current) if *new == 1 && *current == 3), + "unexpected error: {err:?}" + ); + + // Current floor unchanged after the rejected batch. + assert_eq!(db.inactivity_floor_loc(), Location::new(3)); + + db.destroy().await.unwrap(); + } + + pub(crate) async fn test_keyless_db_floor_beyond_size_rejected( + mut db: Keyless, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + H: Hasher, + Operation: EncodeShared, + { + // Batch of 2 appends + 1 commit lands at locations [1..4); commit at 3, total_size = 4. + // A floor > 4 is invalid. + let merkleized = db + .new_batch() + .append(V::Value::make(1)) + .append(V::Value::make(2)) + .merkleize(&db, None, Location::new(999)); + let err = db.apply_batch(merkleized).await.unwrap_err(); + assert!( + matches!(err, Error::FloorBeyondSize(floor, size) if *floor == 999 && *size == 4), + "unexpected error: {err:?}" + ); + + db.destroy().await.unwrap(); + } + + pub(crate) async fn test_keyless_db_rewind_restores_floor( + mut db: Keyless, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + H: Hasher, + Operation: EncodeShared, + { + // First commit: floor advances to 3 (= commit location). + let floor_a = Location::::new(3); + let merkleized = db + .new_batch() + .append(V::Value::make(1)) + .append(V::Value::make(2)) + .merkleize(&db, None, floor_a); + db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); + let rewind_target = Location::new(*db.last_commit_loc() + 1); + + // Second commit: floor advances to 6. + let floor_b = Location::::new(6); + let merkleized = db + .new_batch() + .append(V::Value::make(3)) + .append(V::Value::make(4)) + .merkleize(&db, None, floor_b); + db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); + assert_eq!(db.inactivity_floor_loc(), floor_b); + + // Rewind to the first commit; floor should restore to floor_a. + db.rewind(rewind_target).await.unwrap(); + assert_eq!(db.inactivity_floor_loc(), floor_a); + + // Prune is now gated at floor_a. Pruning past it fails. + let beyond = Location::new(*floor_a + 1); + let err = db.prune(beyond).await.unwrap_err(); + assert!(matches!(err, Error::PruneBeyondMinRequired(_, _))); + + // Pruning up to the floor works. + db.prune(floor_a).await.unwrap(); + + db.destroy().await.unwrap(); + } } diff --git a/storage/src/qmdb/keyless/operation/fixed.rs b/storage/src/qmdb/keyless/operation/fixed.rs index 73a41defd3d..fe27b23c97c 100644 --- a/storage/src/qmdb/keyless/operation/fixed.rs +++ b/storage/src/qmdb/keyless/operation/fixed.rs @@ -1,6 +1,9 @@ -use crate::qmdb::{ - any::{value::FixedEncoding, FixedValue}, - keyless::operation::{Codec, Operation, APPEND_CONTEXT, COMMIT_CONTEXT}, +use crate::{ + merkle::{Family, Location}, + qmdb::{ + any::{value::FixedEncoding, FixedValue}, + keyless::operation::{Codec, Operation, APPEND_CONTEXT, COMMIT_CONTEXT}, + }, }; use commonware_codec::{ util::{at_least, ensure_zeros}, @@ -8,27 +11,29 @@ use commonware_codec::{ }; use commonware_runtime::{Buf, BufMut}; -/// Total padded operation size: context byte + option-tag byte + value. -/// Append: 1 (context) + V::SIZE + 1 (padding for the option tag). -/// Commit: 1 (context) + 1 (option tag) + V::SIZE. -/// Both are 2 + V::SIZE. +/// Fixed padded operation size: `Commit` is always the larger variant. +/// +/// - Append: 1 (context) + V::SIZE + padding +/// - Commit: 1 (context) + 1 (option tag) + V::SIZE + u64::SIZE (floor) +/// +/// Total = 2 + V::SIZE + u64::SIZE. Append pads to match. const fn op_size() -> usize { - 2 + V::SIZE + 2 + V::SIZE + u64::SIZE } impl Codec for FixedEncoding { type ReadCfg = (); - fn write_operation(op: &Operation, buf: &mut impl BufMut) { + fn write_operation(op: &Operation, buf: &mut impl BufMut) { let total = op_size::(); match op { Operation::Append(value) => { APPEND_CONTEXT.write(buf); value.write(buf); - // Pad to uniform size (1 byte for the option-tag gap). + // Pad to uniform size: 1 byte (option-tag gap) + u64::SIZE (floor gap). buf.put_bytes(0, total - 1 - V::SIZE); } - Operation::Commit(metadata) => { + Operation::Commit(metadata, floor) => { COMMIT_CONTEXT.write(buf); if let Some(metadata) = metadata { true.write(buf); @@ -36,14 +41,15 @@ impl Codec for FixedEncoding { } else { buf.put_bytes(0, 1 + V::SIZE); } + buf.put_slice(&floor.as_u64().to_be_bytes()); } } } - fn read_operation( + fn read_operation( buf: &mut impl Buf, _cfg: &Self::ReadCfg, - ) -> Result, CodecError> { + ) -> Result, CodecError> { let total = op_size::(); at_least(buf, total)?; @@ -61,30 +67,38 @@ impl Codec for FixedEncoding { ensure_zeros(buf, V::SIZE)?; None }; - Ok(Operation::Commit(metadata)) + let floor = Location::::new(u64::read(buf)?); + if !floor.is_valid() { + return Err(CodecError::Invalid( + "storage::qmdb::keyless::operation::fixed::Operation", + "commit floor location overflow", + )); + } + Ok(Operation::Commit(metadata, floor)) } e => Err(CodecError::InvalidEnum(e)), } } } -impl FixedSize for Operation> { +impl FixedSize for Operation> { const SIZE: usize = op_size::(); } #[cfg(test)] mod tests { use super::*; + use crate::merkle::mmr; use commonware_codec::{DecodeExt, Encode, FixedSize}; use commonware_utils::sequence::U64; - type Op = Operation>; + type Op = Operation>; #[test] fn all_variants_have_same_encoded_size() { let append = Op::Append(U64::new(42)); - let commit_some = Op::Commit(Some(U64::new(99))); - let commit_none = Op::Commit(None); + let commit_some = Op::Commit(Some(U64::new(99)), Location::new(5)); + let commit_none = Op::Commit(None, Location::new(0)); let a = append.encode(); let b = commit_some.encode(); @@ -93,7 +107,7 @@ mod tests { assert_eq!(a.len(), Op::SIZE); assert_eq!(b.len(), Op::SIZE); assert_eq!(c.len(), Op::SIZE); - assert_eq!(Op::SIZE, 2 + U64::SIZE); + assert_eq!(Op::SIZE, 2 + U64::SIZE + u64::SIZE); } #[test] @@ -105,14 +119,14 @@ mod tests { #[test] fn commit_some_roundtrip() { - let op = Op::Commit(Some(U64::new(999))); + let op = Op::Commit(Some(U64::new(999)), Location::new(77)); let decoded = Op::decode(op.encode()).unwrap(); assert_eq!(op, decoded); } #[test] fn commit_none_roundtrip() { - let op = Op::Commit(None); + let op = Op::Commit(None, Location::new(42)); let decoded = Op::decode(op.encode()).unwrap(); assert_eq!(op, decoded); } @@ -132,7 +146,7 @@ mod tests { // Encode an Append, then corrupt the padding byte. let op = Op::Append(U64::new(1)); let mut buf: Vec = op.encode().to_vec(); - // Padding is the last byte (option-tag gap). + // Padding is the last byte (part of the floor gap). *buf.last_mut().unwrap() = 0x01; assert!(Op::decode(buf.as_ref()).is_err()); } @@ -147,9 +161,26 @@ mod tests { #[test] fn commit_none_has_zero_value_bytes() { - let op = Op::Commit(None); + let op = Op::Commit(None, Location::new(0)); let buf: Vec = op.encode().to_vec(); - // After context byte (0) and option-tag byte (0), all value bytes should be zero. + // After context byte (0) and option-tag byte (0), all remaining bytes (including the + // all-zero floor) should be zero. assert!(buf[2..].iter().all(|&b| b == 0)); } + + #[test] + fn commit_floor_overflow_rejected() { + // Construct a Commit buffer by hand with a floor beyond MAX_LEAVES. + let mut buf = vec![0u8; Op::SIZE]; + buf[0] = COMMIT_CONTEXT; + // Option tag = false (None metadata); value bytes already zero. + // Last 8 bytes are the floor; write u64::MAX big-endian. + let floor_bytes = u64::MAX.to_be_bytes(); + let floor_offset = Op::SIZE - u64::SIZE; + buf[floor_offset..].copy_from_slice(&floor_bytes); + assert!(matches!( + Op::decode(buf.as_ref()).unwrap_err(), + CodecError::Invalid(_, _) + )); + } } diff --git a/storage/src/qmdb/keyless/operation/mod.rs b/storage/src/qmdb/keyless/operation/mod.rs index 6180661056f..96a4a37ffcf 100644 --- a/storage/src/qmdb/keyless/operation/mod.rs +++ b/storage/src/qmdb/keyless/operation/mod.rs @@ -1,4 +1,7 @@ -use crate::qmdb::{any::value::ValueEncoding, operation::Committable}; +use crate::{ + merkle::{Family, Location}, + qmdb::{any::value::ValueEncoding, operation::Committable}, +}; use commonware_codec::{Encode as _, Error as CodecError, Read, Write}; use commonware_runtime::{Buf, BufMut}; use commonware_utils::hex; @@ -14,52 +17,61 @@ const APPEND_CONTEXT: u8 = 1; /// Delegates Operation-level codec (Write, Read) to the value encoding. /// /// Fixed and variable encodings have different wire formats. Fixed pads to a uniform size, -/// variable does not. A single blanket `impl Write for Operation` dispatches here, while the -/// two impls of this trait (on FixedEncoding and VariableEncoding) live on different Self types -/// and therefore do not overlap. +/// variable does not. A single blanket `impl Write for Operation` dispatches here, while +/// the two impls of this trait (on FixedEncoding and VariableEncoding) live on different Self +/// types and therefore do not overlap. pub trait Codec: ValueEncoding + Sized { type ReadCfg: Clone + Send + Sync + 'static; - fn write_operation(op: &Operation, buf: &mut impl BufMut); - fn read_operation( + fn write_operation(op: &Operation, buf: &mut impl BufMut); + fn read_operation( buf: &mut impl Buf, cfg: &Self::ReadCfg, - ) -> Result, CodecError>; + ) -> Result, CodecError>; } /// Operations for keyless stores. #[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] -pub enum Operation { +pub enum Operation { /// Wraps the value appended to the database by this operation. Append(V::Value), - /// Indicates the database has been committed. - Commit(Option), + /// Indicates the database has been committed, carrying optional metadata and the inactivity + /// floor location declared by the application at commit time. + Commit(Option, Location), } -impl Operation { +impl Operation { /// Returns the value (if any) wrapped by this operation. pub fn into_value(self) -> Option { match self { Self::Append(value) => Some(value), - Self::Commit(value) => value, + Self::Commit(value, _) => value, + } + } + + /// Returns the inactivity floor location if this is a commit operation. + pub const fn has_floor(&self) -> Option> { + match self { + Self::Commit(_, loc) => Some(*loc), + Self::Append(_) => None, } } } -impl Write for Operation { +impl Write for Operation { fn write(&self, buf: &mut impl BufMut) { V::write_operation(self, buf) } } -impl Committable for Operation { +impl Committable for Operation { fn is_commit(&self) -> bool { - matches!(self, Self::Commit(_)) + matches!(self, Self::Commit(_, _)) } } -impl Read for Operation { +impl Read for Operation { type Cfg = ::ReadCfg; fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result { @@ -67,15 +79,15 @@ impl Read for Operation { } } -impl Display for Operation { +impl Display for Operation { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Append(value) => write!(f, "[append value:{}]", hex(&value.encode())), - Self::Commit(value) => { + Self::Commit(value, floor) => { if let Some(value) = value { - write!(f, "[commit {}]", hex(&value.encode())) + write!(f, "[commit {} floor:{}]", hex(&value.encode()), **floor) } else { - write!(f, "[commit]") + write!(f, "[commit floor:{}]", **floor) } } } @@ -83,7 +95,7 @@ impl Display for Operation { } #[cfg(feature = "arbitrary")] -impl arbitrary::Arbitrary<'_> for Operation +impl arbitrary::Arbitrary<'_> for Operation where V::Value: for<'a> arbitrary::Arbitrary<'a>, { @@ -91,7 +103,11 @@ where let choice = u.int_in_range(0..=1)?; match choice { 0 => Ok(Self::Append(V::Value::arbitrary(u)?)), - 1 => Ok(Self::Commit(Option::::arbitrary(u)?)), + 1 => { + let metadata = Option::::arbitrary(u)?; + let floor = Location::::arbitrary(u)?; + Ok(Self::Commit(metadata, floor)) + } _ => unreachable!(), } } @@ -100,13 +116,13 @@ where #[cfg(test)] mod tests { use super::*; - use crate::qmdb::any::value::VariableEncoding; + use crate::{merkle::mmr, qmdb::any::value::VariableEncoding}; use commonware_codec::Encode; use commonware_utils::{hex, sequence::U64}; #[test] fn display_append() { - let op = Operation::>::Append(U64::new(12345)); + let op = Operation::>::Append(U64::new(12345)); assert_eq!( format!("{op}"), format!("[append value:{}]", hex(&U64::new(12345).encode())) @@ -115,29 +131,35 @@ mod tests { #[test] fn display_commit_some() { - let op = Operation::>::Commit(Some(U64::new(42))); + let op = Operation::>::Commit( + Some(U64::new(42)), + Location::new(7), + ); assert_eq!( format!("{op}"), - format!("[commit {}]", hex(&U64::new(42).encode())) + format!("[commit {} floor:7]", hex(&U64::new(42).encode())) ); } #[test] fn display_commit_none() { - let op = Operation::>::Commit(None); - assert_eq!(format!("{op}"), "[commit]"); + let op = Operation::>::Commit(None, Location::new(3)); + assert_eq!(format!("{op}"), "[commit floor:3]"); } #[cfg(feature = "arbitrary")] mod conformance { use super::Operation; - use crate::qmdb::any::value::{FixedEncoding, VariableEncoding}; + use crate::{ + merkle::mmr, + qmdb::any::value::{FixedEncoding, VariableEncoding}, + }; use commonware_codec::conformance::CodecConformance; use commonware_utils::sequence::U64; commonware_conformance::conformance_tests! { - CodecConformance>>, - CodecConformance>> + CodecConformance>>, + CodecConformance>> } } } diff --git a/storage/src/qmdb/keyless/operation/variable.rs b/storage/src/qmdb/keyless/operation/variable.rs index 5650359f4f4..351d987587a 100644 --- a/storage/src/qmdb/keyless/operation/variable.rs +++ b/storage/src/qmdb/keyless/operation/variable.rs @@ -1,6 +1,9 @@ -use crate::qmdb::{ - any::{value::VariableEncoding, VariableValue}, - keyless::operation::{Codec, Operation, APPEND_CONTEXT, COMMIT_CONTEXT}, +use crate::{ + merkle::{Family, Location}, + qmdb::{ + any::{value::VariableEncoding, VariableValue}, + keyless::operation::{Codec, Operation, APPEND_CONTEXT, COMMIT_CONTEXT}, + }, }; use commonware_codec::{EncodeSize, Error as CodecError, Read, ReadExt as _, Write}; use commonware_runtime::{Buf, BufMut}; @@ -8,36 +11,41 @@ use commonware_runtime::{Buf, BufMut}; impl Codec for VariableEncoding { type ReadCfg = ::Cfg; - fn write_operation(op: &Operation, buf: &mut impl BufMut) { + fn write_operation(op: &Operation, buf: &mut impl BufMut) { match op { Operation::Append(value) => { APPEND_CONTEXT.write(buf); value.write(buf); } - Operation::Commit(metadata) => { + Operation::Commit(metadata, floor) => { COMMIT_CONTEXT.write(buf); metadata.write(buf); + floor.write(buf); } } } - fn read_operation( + fn read_operation( buf: &mut impl Buf, cfg: &Self::ReadCfg, - ) -> Result, CodecError> { + ) -> Result, CodecError> { match u8::read(buf)? { APPEND_CONTEXT => Ok(Operation::Append(V::read_cfg(buf, cfg)?)), - COMMIT_CONTEXT => Ok(Operation::Commit(Option::::read_cfg(buf, cfg)?)), + COMMIT_CONTEXT => { + let metadata = Option::::read_cfg(buf, cfg)?; + let floor = Location::::read(buf)?; + Ok(Operation::Commit(metadata, floor)) + } e => Err(CodecError::InvalidEnum(e)), } } } -impl EncodeSize for Operation> { +impl EncodeSize for Operation> { fn encode_size(&self) -> usize { 1 + match self { Self::Append(v) => v.encode_size(), - Self::Commit(v) => v.encode_size(), + Self::Commit(v, floor) => v.encode_size() + floor.encode_size(), } } } @@ -45,11 +53,12 @@ impl EncodeSize for Operation> { #[cfg(test)] mod tests { use super::*; + use crate::merkle::mmr; use commonware_codec::{DecodeExt, Encode, EncodeSize}; use commonware_utils::sequence::U64; // Use U64 as the value type: it implements VariableValue and has Cfg = (). - type Op = Operation>; + type Op = Operation>; #[test] fn append_roundtrip() { @@ -60,14 +69,14 @@ mod tests { #[test] fn commit_some_roundtrip() { - let op = Op::Commit(Some(U64::new(999))); + let op = Op::Commit(Some(U64::new(999)), Location::new(77)); let decoded = Op::decode(op.encode()).unwrap(); assert_eq!(op, decoded); } #[test] fn commit_none_roundtrip() { - let op = Op::Commit(None); + let op = Op::Commit(None, Location::new(42)); let decoded = Op::decode(op.encode()).unwrap(); assert_eq!(op, decoded); } @@ -77,8 +86,8 @@ mod tests { let cases: Vec = vec![ Op::Append(U64::new(0)), Op::Append(U64::new(u64::MAX)), - Op::Commit(None), - Op::Commit(Some(U64::new(42))), + Op::Commit(None, Location::new(0)), + Op::Commit(Some(U64::new(42)), Location::new(1234)), ]; for op in cases { assert_eq!(op.encode_size(), op.encode().len(), "mismatch for {op:?}"); @@ -104,15 +113,29 @@ mod tests { #[test] fn append_and_commit_have_different_encodings() { let append = Op::Append(U64::new(1)); - let commit = Op::Commit(Some(U64::new(1))); + let commit = Op::Commit(Some(U64::new(1)), Location::new(0)); assert_ne!(append.encode().as_ref(), commit.encode().as_ref()); } #[test] fn context_byte_is_first() { let append = Op::Append(U64::new(0)); - let commit = Op::Commit(None); + let commit = Op::Commit(None, Location::new(0)); assert_eq!(append.encode()[0], APPEND_CONTEXT); assert_eq!(commit.encode()[0], COMMIT_CONTEXT); } + + #[test] + fn commit_floor_overflow_rejected() { + // Hand-build a Commit with a varint floor of u64::MAX, which exceeds MAX_LEAVES. + use commonware_codec::{varint::UInt, Write}; + let mut buf = Vec::new(); + COMMIT_CONTEXT.write(&mut buf); + Option::::None.write(&mut buf); + UInt(u64::MAX).write(&mut buf); + assert!(matches!( + Op::decode(buf.as_ref()).unwrap_err(), + CodecError::Invalid(_, _) + )); + } } diff --git a/storage/src/qmdb/keyless/sync.rs b/storage/src/qmdb/keyless/sync.rs index 8548be2e1b2..2412763eef8 100644 --- a/storage/src/qmdb/keyless/sync.rs +++ b/storage/src/qmdb/keyless/sync.rs @@ -12,7 +12,6 @@ use crate::{ self, any::value::ValueEncoding, keyless::{operation::Codec, Keyless, Operation}, - operation::Committable as _, sync, }, Context, Persistable, @@ -25,14 +24,14 @@ impl sync::Database for Keyless where E: Context, V: ValueEncoding + Codec, - C: Mutable> + C: Mutable> + Persistable - + sync::Journal>, + + sync::Journal>, C::Config: Clone + Send, H: Hasher, - Operation: EncodeShared, + Operation: EncodeShared, { - type Op = Operation; + type Op = Operation; type Journal = C; type Hasher = H; type Config = super::Config; @@ -83,7 +82,7 @@ where ) .await?; - let last_commit_loc = { + let (last_commit_loc, inactivity_floor_loc) = { let reader = journal.reader().await; let loc = reader .bounds() @@ -91,13 +90,16 @@ where .checked_sub(1) .expect("journal should not be empty"); let op = reader.read(loc).await?; - assert!(op.is_commit(), "last operation should be a commit"); - Location::new(loc) + let floor = op + .has_floor() + .expect("last operation should be a commit with floor"); + (Location::new(loc), floor) }; let db = Self { journal, last_commit_loc, + inactivity_floor_loc, }; db.sync().await?; @@ -142,7 +144,7 @@ mod tests { /// Type alias for sync tests with variable-length values. type KeylessSyncTest = variable::Db, Sha256>; - type VariableOp = Operation>>; + type VariableOp = Operation>>; // Used by both `create_sync_config` and `test_sync_fixed`. const PAGE_SIZE: NonZeroU16 = NZU16!(77); @@ -204,20 +206,24 @@ mod tests { ops } - /// Applies the given operations and commits the database. + /// Applies the given operations and commits the database, advancing the inactivity floor to + /// the new commit location so sync tests that exercise pruning can do so freely. async fn apply_ops(db: &mut KeylessSyncTest, ops: Vec, metadata: Option>) { + let mut appends = 0u64; let mut batch = db.new_batch(); for op in ops { match op { Operation::Append(value) => { batch = batch.append(value); + appends += 1; } - Operation::Commit(_) => { + Operation::Commit(_, _) => { panic!("Commit operation not supported in apply_ops"); } } } - let merkleized = batch.merkleize(db, metadata); + let new_commit = Location::new(db.last_commit_loc().as_u64() + 1 + appends); + let merkleized = batch.merkleize(db, metadata, new_commit); db.apply_batch(merkleized).await.unwrap(); } @@ -957,7 +963,8 @@ mod tests { for i in 0..20u64 { batch = batch.append(U64::new(i * 10 + 1)); } - let merkleized = batch.merkleize(&target_db, None); + let floor = target_db.inactivity_floor_loc(); + let merkleized = batch.merkleize(&target_db, None, floor); target_db.apply_batch(merkleized).await.unwrap(); let target_root = target_db.root(); diff --git a/storage/src/qmdb/keyless/variable.rs b/storage/src/qmdb/keyless/variable.rs index cd484cfbbf8..b6551808c88 100644 --- a/storage/src/qmdb/keyless/variable.rs +++ b/storage/src/qmdb/keyless/variable.rs @@ -20,13 +20,13 @@ use commonware_cryptography::Hasher; use commonware_runtime::{Clock, Metrics, Storage}; /// Keyless operation for variable-length values. -pub type Operation = BaseOperation>; +pub type Operation = BaseOperation>; /// A keyless authenticated database for variable-length data. pub type Db = - super::Keyless, variable::Journal>, H>; + super::Keyless, variable::Journal>, H>; -type Journal = authenticated::Journal>, H>; +type Journal = authenticated::Journal>, H>; /// Configuration for a variable-size [keyless](super) authenticated db. pub type Config = super::Config>; @@ -36,10 +36,10 @@ impl Db as Read>::Cfg>, + cfg: Config< as Read>::Cfg>, ) -> Result> { let journal: Journal = - Journal::new(context, cfg.merkle, cfg.log, Operation::::is_commit).await?; + Journal::new(context, cfg.merkle, cfg.log, Operation::::is_commit).await?; Self::init_from_journal(journal).await } } @@ -592,6 +592,70 @@ mod test { }); } + #[test_traced("INFO")] + fn test_keyless_variable_floor_tracking_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db1")).await; + tests::test_keyless_db_floor_tracking(ctx, db, reopen::()).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_floor_regression_rejected_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_floor_regression_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_floor_beyond_size_rejected_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_floor_beyond_size_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_rewind_restores_floor_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_rewind_restores_floor(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_floor_tracking() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db1")).await; + tests::test_keyless_db_floor_tracking(ctx, db, reopen::()).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_floor_regression_rejected() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_floor_regression_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_floor_beyond_size_rejected() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_floor_beyond_size_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_rewind_restores_floor() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_rewind_restores_floor(db).await; + }); + } + fn is_send(_: T) {} #[allow(dead_code)] diff --git a/storage/src/qmdb/mod.rs b/storage/src/qmdb/mod.rs index ceb0d831870..7059719a394 100644 --- a/storage/src/qmdb/mod.rs +++ b/storage/src/qmdb/mod.rs @@ -113,6 +113,14 @@ pub enum Error { batch_db_size: u64, batch_base_size: u64, }, + + /// The batch's inactivity floor is lower than the database's current floor. + #[error("floor regressed: batch floor {0} < current floor {1}")] + FloorRegressed(Location, Location), + + /// The batch's inactivity floor exceeds its total operation count. + #[error("floor beyond size: floor {0} > total size {1}")] + FloorBeyondSize(Location, Location), } impl From> for Error { diff --git a/storage/src/qmdb/sync/resolver.rs b/storage/src/qmdb/sync/resolver.rs index 9b503aaa6b7..43c6b55f24a 100644 --- a/storage/src/qmdb/sync/resolver.rs +++ b/storage/src/qmdb/sync/resolver.rs @@ -348,7 +348,7 @@ macro_rules! impl_resolver_keyless { H: Hasher, { type Digest = H::Digest; - type Op = $op; + type Op = $op; type Error = qmdb::Error; async fn get_operations( @@ -382,7 +382,7 @@ macro_rules! impl_resolver_keyless { H: Hasher, { type Digest = H::Digest; - type Op = $op; + type Op = $op; type Error = qmdb::Error; async fn get_operations( @@ -416,7 +416,7 @@ macro_rules! impl_resolver_keyless { H: Hasher, { type Digest = H::Digest; - type Op = $op; + type Op = $op; type Error = qmdb::Error; async fn get_operations( From 35d2138543983e3157338e0d2f2935a7d9d2e39c Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Fri, 17 Apr 2026 16:06:28 -0700 Subject: [PATCH 2/6] add inactivity floor to qmdb::keyless Commit operation --- storage/fuzz/fuzz_targets/qmdb_keyless.rs | 8 +- storage/src/qmdb/keyless/fixed.rs | 61 +++++++++- storage/src/qmdb/keyless/mod.rs | 131 +++++++++++++++++++++- storage/src/qmdb/keyless/variable.rs | 61 +++++++++- 4 files changed, 256 insertions(+), 5 deletions(-) diff --git a/storage/fuzz/fuzz_targets/qmdb_keyless.rs b/storage/fuzz/fuzz_targets/qmdb_keyless.rs index 13f6cd4f862..198723d24e0 100644 --- a/storage/fuzz/fuzz_targets/qmdb_keyless.rs +++ b/storage/fuzz/fuzz_targets/qmdb_keyless.rs @@ -205,11 +205,17 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { } Operation::Prune => { + let pending_count = pending_appends.len() as u64; let mut batch = db.new_batch(); for v in pending_appends.drain(..) { batch = batch.append(v); } - let merkleized = batch.merkleize(&db, None, db.inactivity_floor_loc()); + // Advance the floor to the new commit location so the subsequent prune + // actually removes data. This exercises more of the code path than pruning + // at a stale floor would. + let end = db.bounds().await.end; + let floor = Location::::new(end.as_u64() + pending_count); + let merkleized = batch.merkleize(&db, None, floor); db.apply_batch(merkleized).await.expect("Commit should not fail"); db.commit().await.expect("Commit should not fail"); db.prune(db.inactivity_floor_loc()) diff --git a/storage/src/qmdb/keyless/fixed.rs b/storage/src/qmdb/keyless/fixed.rs index 4ae548406ed..f53beec8417 100644 --- a/storage/src/qmdb/keyless/fixed.rs +++ b/storage/src/qmdb/keyless/fixed.rs @@ -81,7 +81,14 @@ mod test { type TestDb = Db; async fn open_db(context: deterministic::Context) -> TestDb { - let cfg = db_config("partition", &context); + open_db_with_suffix("partition", context).await + } + + async fn open_db_with_suffix( + suffix: &str, + context: deterministic::Context, + ) -> TestDb { + let cfg = db_config(suffix, &context); TestDb::init(context, cfg).await.unwrap() } @@ -362,6 +369,32 @@ mod test { }); } + #[test_traced("INFO")] + fn test_keyless_fixed_floor_changes_root() { + deterministic::Runner::default().start(|ctx| async move { + let db_a = open_db_with_suffix::("root-a", ctx.with_label("a")).await; + let db_b = open_db_with_suffix::("root-b", ctx.with_label("b")).await; + tests::test_keyless_db_floor_changes_root(db_a, db_b).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_fixed_floor_at_total_size_accepted() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_floor_at_total_size_accepted(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_fixed_rewind_after_reopen_with_floor() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db1")).await; + tests::test_keyless_db_rewind_after_reopen_with_floor(ctx, db, reopen::()) + .await; + }); + } + // mmb::Family variants #[test_traced("INFO")] @@ -636,4 +669,30 @@ mod test { tests::test_keyless_db_rewind_restores_floor(db).await; }); } + + #[test_traced("INFO")] + fn test_keyless_fixed_floor_changes_root_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db_a = open_db_with_suffix::("root-a", ctx.with_label("a")).await; + let db_b = open_db_with_suffix::("root-b", ctx.with_label("b")).await; + tests::test_keyless_db_floor_changes_root(db_a, db_b).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_fixed_floor_at_total_size_accepted_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_floor_at_total_size_accepted(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_fixed_rewind_after_reopen_with_floor_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db1")).await; + tests::test_keyless_db_rewind_after_reopen_with_floor(ctx, db, reopen::()) + .await; + }); + } } diff --git a/storage/src/qmdb/keyless/mod.rs b/storage/src/qmdb/keyless/mod.rs index 6827fe9e6b4..3be664e8c3e 100644 --- a/storage/src/qmdb/keyless/mod.rs +++ b/storage/src/qmdb/keyless/mod.rs @@ -271,7 +271,8 @@ where /// Rewind the database to `size` operations, where `size` is the location of the next append. /// /// This rewinds both the operations journal and its Merkle structure to the historical state - /// at `size`. + /// at `size`. The inactivity floor is restored from the rewind target commit operation, so + /// the post-rewind floor matches the floor that was in effect at that commit. /// /// # Errors /// @@ -366,6 +367,11 @@ where /// ancestor chain was created is an ancestor of this batch. Applying a batch from a /// different fork returns [`Error::StaleBatch`]. /// + /// The batch's declared inactivity floor must be monotonically non-decreasing with respect + /// to the current floor and must not exceed the batch's total operation count. Violations + /// return [`Error::FloorRegressed`] or [`Error::FloorBeyondSize`]. Floor validation happens + /// before any journal mutation, so the database is untouched on floor errors. + /// /// Returns the range of locations written. /// /// This publishes the batch to the in-memory database state and appends it to the @@ -1984,6 +1990,8 @@ pub(crate) mod tests { .merkleize(&db, None, Location::new(3)); db.apply_batch(merkleized).await.unwrap(); assert_eq!(db.inactivity_floor_loc(), Location::new(3)); + let root_before = db.root(); + let last_commit_before = db.last_commit_loc(); // Try to commit with a lower floor; apply_batch rejects. let merkleized = @@ -1996,8 +2004,10 @@ pub(crate) mod tests { "unexpected error: {err:?}" ); - // Current floor unchanged after the rejected batch. + // DB state must be untouched: floor, last_commit_loc, and root unchanged. assert_eq!(db.inactivity_floor_loc(), Location::new(3)); + assert_eq!(db.last_commit_loc(), last_commit_before); + assert_eq!(db.root(), root_before); db.destroy().await.unwrap(); } @@ -2070,4 +2080,121 @@ pub(crate) mod tests { db.destroy().await.unwrap(); } + + /// Floor is embedded in the Commit operation and therefore in the Merkle root: two databases + /// with identical appends but different floors must produce different roots. + pub(crate) async fn test_keyless_db_floor_changes_root( + mut db_a: Keyless, + mut db_b: Keyless, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + H: Hasher, + Operation: EncodeShared, + { + let appends = [V::Value::make(1), V::Value::make(2)]; + + // db_a commits with floor=0. + let mut batch_a = db_a.new_batch(); + for v in appends.iter() { + batch_a = batch_a.append(v.clone()); + } + db_a.apply_batch(batch_a.merkleize(&db_a, None, Location::new(0))) + .await + .unwrap(); + + // db_b commits the same appends but with floor=3 (= commit location). + let mut batch_b = db_b.new_batch(); + for v in appends.iter() { + batch_b = batch_b.append(v.clone()); + } + db_b.apply_batch(batch_b.merkleize(&db_b, None, Location::new(3))) + .await + .unwrap(); + + assert_ne!(db_a.root(), db_b.root()); + + db_a.destroy().await.unwrap(); + db_b.destroy().await.unwrap(); + } + + /// A floor equal to the batch's total operation count is on the boundary of acceptance. + pub(crate) async fn test_keyless_db_floor_at_total_size_accepted( + mut db: Keyless, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + H: Hasher, + Operation: EncodeShared, + { + // 2 appends + 1 commit on top of the initial commit: total_size = 4. + // Setting floor = 4 is on the boundary (inclusive). + let total_size = Location::::new(4); + db.apply_batch( + db.new_batch() + .append(V::Value::make(1)) + .append(V::Value::make(2)) + .merkleize(&db, None, total_size), + ) + .await + .unwrap(); + assert_eq!(db.inactivity_floor_loc(), total_size); + + db.destroy().await.unwrap(); + } + + /// End-to-end: commit → drop → reopen → rewind → verify floor restored after a crash. + pub(crate) async fn test_keyless_db_rewind_after_reopen_with_floor( + context: deterministic::Context, + mut db: Keyless, + reopen: Reopen>, + ) where + V: ValueEncoding, + C: Mutable> + Persistable, + H: Hasher, + Operation: EncodeShared, + { + // First commit: 2 appends + commit, floor advances to 3. + let floor_a = Location::::new(3); + db.apply_batch( + db.new_batch() + .append(V::Value::make(1)) + .append(V::Value::make(2)) + .merkleize(&db, None, floor_a), + ) + .await + .unwrap(); + db.commit().await.unwrap(); + let rewind_target = Location::new(*db.last_commit_loc() + 1); + + // Second commit: 2 appends + commit, floor advances to 6. + let floor_b = Location::::new(6); + db.apply_batch( + db.new_batch() + .append(V::Value::make(3)) + .append(V::Value::make(4)) + .merkleize(&db, None, floor_b), + ) + .await + .unwrap(); + db.commit().await.unwrap(); + + // Drop & reopen to simulate a crash after both commits were durable. + drop(db); + let mut db = reopen(context.with_label("reopen")).await; + assert_eq!(db.inactivity_floor_loc(), floor_b); + + // Rewind to the first commit; floor should restore to floor_a. + db.rewind(rewind_target).await.unwrap(); + assert_eq!(db.inactivity_floor_loc(), floor_a); + assert_eq!(db.last_commit_loc(), Location::new(3)); + + // Commit the rewind so it's durable, then reopen and confirm the floor again. + db.commit().await.unwrap(); + drop(db); + let db = reopen(context.with_label("reopen2")).await; + assert_eq!(db.inactivity_floor_loc(), floor_a); + + db.destroy().await.unwrap(); + } } diff --git a/storage/src/qmdb/keyless/variable.rs b/storage/src/qmdb/keyless/variable.rs index b6551808c88..f46094687d4 100644 --- a/storage/src/qmdb/keyless/variable.rs +++ b/storage/src/qmdb/keyless/variable.rs @@ -92,7 +92,14 @@ mod test { /// Return a [Db] database initialized with a fixed config. async fn open_db(context: deterministic::Context) -> TestDb { - let cfg = db_config("partition", &context); + open_db_with_suffix("partition", context).await + } + + async fn open_db_with_suffix( + suffix: &str, + context: deterministic::Context, + ) -> TestDb { + let cfg = db_config(suffix, &context); TestDb::init(context, cfg).await.unwrap() } @@ -624,6 +631,32 @@ mod test { }); } + #[test_traced("INFO")] + fn test_keyless_variable_floor_changes_root_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db_a = open_db_with_suffix::("root-a", ctx.with_label("a")).await; + let db_b = open_db_with_suffix::("root-b", ctx.with_label("b")).await; + tests::test_keyless_db_floor_changes_root(db_a, db_b).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_floor_at_total_size_accepted_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_floor_at_total_size_accepted(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_rewind_after_reopen_with_floor_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db1")).await; + tests::test_keyless_db_rewind_after_reopen_with_floor(ctx, db, reopen::()) + .await; + }); + } + #[test_traced("INFO")] fn test_keyless_variable_floor_tracking() { deterministic::Runner::default().start(|ctx| async move { @@ -656,6 +689,32 @@ mod test { }); } + #[test_traced("INFO")] + fn test_keyless_variable_floor_changes_root() { + deterministic::Runner::default().start(|ctx| async move { + let db_a = open_db_with_suffix::("root-a", ctx.with_label("a")).await; + let db_b = open_db_with_suffix::("root-b", ctx.with_label("b")).await; + tests::test_keyless_db_floor_changes_root(db_a, db_b).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_floor_at_total_size_accepted() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_floor_at_total_size_accepted(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_rewind_after_reopen_with_floor() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db1")).await; + tests::test_keyless_db_rewind_after_reopen_with_floor(ctx, db, reopen::()) + .await; + }); + } + fn is_send(_: T) {} #[allow(dead_code)] From 8bd2d70a81b7b783eaef0be0d2aac7f58308085b Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 20 Apr 2026 13:02:52 -0400 Subject: [PATCH 3/6] add inactivity floor to qmdb::keyless Commit operation --- examples/sync/src/databases/keyless.rs | 26 ++- storage/fuzz/fuzz_targets/qmdb_keyless.rs | 174 +++++++++++++++++-- storage/src/qmdb/keyless/batch.rs | 14 +- storage/src/qmdb/keyless/fixed.rs | 64 ++++++- storage/src/qmdb/keyless/mod.rs | 198 +++++++++++++++++++--- storage/src/qmdb/keyless/sync.rs | 7 + storage/src/qmdb/keyless/variable.rs | 64 ++++++- storage/src/qmdb/mod.rs | 6 +- 8 files changed, 493 insertions(+), 60 deletions(-) diff --git a/examples/sync/src/databases/keyless.rs b/examples/sync/src/databases/keyless.rs index e9736da3be2..cc185abd324 100644 --- a/examples/sync/src/databases/keyless.rs +++ b/examples/sync/src/databases/keyless.rs @@ -56,11 +56,23 @@ pub fn create_config(context: &(impl BufferPooler + commonware_runtime::Metrics) } /// Create deterministic test operations for demonstration purposes. -/// Generates Append operations and periodic Commit operations. +/// +/// Generates Append operations and periodic Commit operations, advancing the inactivity +/// floor at each commit to the *previous* commit's location. This models a realistic +/// application that declares older commits inactive over time (enabling pruning) while +/// always keeping the most recent commit readable. pub fn create_test_operations(count: usize, seed: u64) -> Vec { let mut operations = Vec::new(); let mut hasher = ::new(); + // The DB's initial commit lands at location 0 before any of these ops are applied. + // `op_count` tracks the total ops that will exist on disk (including this initial commit + // and everything we push below). `prev_commit_loc` tracks the last commit's location + // and is used as the next commit's floor — always <= the next commit's own location, so + // the per-commit floor bound is satisfied. + let mut op_count: u64 = 1; + let mut prev_commit_loc: u64 = 0; + for i in 0..count { let value = { hasher.update(&i.to_be_bytes()); @@ -69,14 +81,20 @@ pub fn create_test_operations(count: usize, seed: u64) -> Vec { }; operations.push(Operation::Append(value)); + op_count += 1; if (i + 1) % 10 == 0 { - operations.push(Operation::Commit(None, Location::new(0))); + operations.push(Operation::Commit(None, Location::new(prev_commit_loc))); + prev_commit_loc = op_count; + op_count += 1; } } - // Always end with a commit - operations.push(Operation::Commit(Some(Sha256::fill(1)), Location::new(0))); + // Always end with a commit, floor set to the previous commit's location. + operations.push(Operation::Commit( + Some(Sha256::fill(1)), + Location::new(prev_commit_loc), + )); operations } diff --git a/storage/fuzz/fuzz_targets/qmdb_keyless.rs b/storage/fuzz/fuzz_targets/qmdb_keyless.rs index 198723d24e0..034f21daab5 100644 --- a/storage/fuzz/fuzz_targets/qmdb_keyless.rs +++ b/storage/fuzz/fuzz_targets/qmdb_keyless.rs @@ -8,7 +8,7 @@ use commonware_storage::{ merkle::{hasher::Standard, journaled::Config as MerkleConfig, mmb, mmr, Family, Location}, qmdb::{ keyless::variable::{Config, Db as Keyless}, - verify_proof, + verify_proof, Error, }, }; use commonware_utils::{NZUsize, NZU16, NZU64}; @@ -18,6 +18,49 @@ use std::num::NonZeroU16; const MAX_OPERATIONS: usize = 50; const MAX_PROOF_OPS: u64 = 100; +/// Which error variant a bad-floor commit should produce. +#[derive(Debug, Clone, Copy)] +enum BadFloorExpect { + Regression, + BeyondSize, +} + +fn assert_bad_floor_error(err: &Error, kind: BadFloorExpect) { + match (err, kind) { + (Error::FloorRegressed(_, _), BadFloorExpect::Regression) => {} + (Error::FloorBeyondSize(_, _), BadFloorExpect::BeyondSize) => {} + _ => panic!("unexpected error for {kind:?}: {err:?}"), + } +} + +/// What floor value a fuzz-generated commit should carry. The `Bad*` variants intentionally +/// produce floors that must be rejected; the handler asserts the expected error variant and +/// that the DB state is untouched. +#[derive(Debug, Clone, Copy)] +enum FloorKind { + /// Keep the current floor (monotonicity trivially preserved). + Current, + /// Advance to the commit location (the tight upper bound). + AdvanceToCommit, + /// Floor one below the current floor — must be rejected as `FloorRegressed`. + BadRegression, + /// Floor one past the commit location — must be rejected as `FloorBeyondSize`. + BadBeyondCommit, +} + +impl<'a> Arbitrary<'a> for FloorKind { + fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { + let choice: u8 = u.arbitrary()?; + Ok(match choice % 4 { + 0 => FloorKind::Current, + 1 => FloorKind::AdvanceToCommit, + 2 => FloorKind::BadRegression, + 3 => FloorKind::BadBeyondCommit, + _ => unreachable!(), + }) + } +} + #[derive(Debug)] enum Operation { Append { @@ -25,7 +68,13 @@ enum Operation { }, Commit { metadata_bytes: Option>, - advance_floor: bool, + floor_kind: FloorKind, + }, + /// Build a two-level batch chain (parent → child) and apply the child directly. The + /// parent's floor is intentionally invalid (regressed or beyond its own commit location); + /// this exercises the per-ancestor validation path in `apply_batch`. + BadChainedCommit { + ancestor_kind: FloorKind, }, Get { loc_offset: u32, @@ -52,7 +101,7 @@ enum Operation { impl<'a> Arbitrary<'a> for Operation { fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { let choice: u8 = u.arbitrary()?; - match choice % 13 { + match choice % 14 { 0 => { let value_len: u16 = u.arbitrary()?; let actual_len = ((value_len as usize) % 10000) + 1; @@ -68,10 +117,10 @@ impl<'a> Arbitrary<'a> for Operation { } else { None }; - let advance_floor: bool = u.arbitrary()?; + let floor_kind = FloorKind::arbitrary(u)?; Ok(Operation::Commit { metadata_bytes, - advance_floor, + floor_kind, }) } 2 => { @@ -104,6 +153,14 @@ impl<'a> Arbitrary<'a> for Operation { }) } 12 => Ok(Operation::SimulateFailure {}), + 13 => { + // Only Bad* kinds make sense here — the ancestor is guaranteed unapplied. + let ancestor_kind = match u.arbitrary::()? { + false => FloorKind::BadRegression, + true => FloorKind::BadBeyondCommit, + }; + Ok(Operation::BadChainedCommit { ancestor_kind }) + } _ => unreachable!(), } } @@ -173,23 +230,108 @@ fn fuzz_family(input: &FuzzInput, suffix: &str) { pending_appends.push(value_bytes.clone()); } - Operation::Commit { metadata_bytes, advance_floor } => { + Operation::Commit { metadata_bytes, floor_kind } => { let pending_count = pending_appends.len() as u64; + let end = db.bounds().await.end; + let commit_loc = end.as_u64() + pending_count; + let current_floor = db.inactivity_floor_loc(); + + // Pick the floor for this commit. `Bad*` kinds are guaranteed to trigger + // the expected error; Valid kinds (Current/AdvanceToCommit) apply cleanly. + let (floor, expect_err) = match floor_kind { + FloorKind::Current => (current_floor, None), + FloorKind::AdvanceToCommit => (Location::::new(commit_loc), None), + FloorKind::BadRegression => { + // Only meaningful when current floor > 0; otherwise fall back to Current. + if current_floor.as_u64() == 0 { + (current_floor, None) + } else { + let bad = Location::::new(current_floor.as_u64() - 1); + (bad, Some(BadFloorExpect::Regression)) + } + } + FloorKind::BadBeyondCommit => { + let bad = Location::::new(commit_loc + 1); + (bad, Some(BadFloorExpect::BeyondSize)) + } + }; + let mut batch = db.new_batch(); for v in pending_appends.drain(..) { batch = batch.append(v); } - // If the fuzzer opts in, advance the floor to the commit location (the max - // valid value). Otherwise, keep the current floor to preserve monotonicity. - let floor = if *advance_floor { - let end = db.bounds().await.end; - Location::::new(end.as_u64() + pending_count) - } else { - db.inactivity_floor_loc() - }; let merkleized = batch.merkleize(&db, metadata_bytes.clone(), floor); - db.apply_batch(merkleized).await.expect("Commit should not fail"); - db.commit().await.expect("Commit should not fail"); + + match expect_err { + None => { + db.apply_batch(merkleized).await.expect("Commit should not fail"); + db.commit().await.expect("Commit should not fail"); + } + Some(kind) => { + // Snapshot state; the reject must not mutate. + let before_last_commit = db.last_commit_loc(); + let before_floor = db.inactivity_floor_loc(); + let before_root = db.root(); + let err = db + .apply_batch(merkleized) + .await + .expect_err("bad floor must be rejected"); + assert_bad_floor_error(&err, kind); + assert_eq!(db.last_commit_loc(), before_last_commit); + assert_eq!(db.inactivity_floor_loc(), before_floor); + assert_eq!(db.root(), before_root); + } + } + } + + Operation::BadChainedCommit { ancestor_kind } => { + let end = db.bounds().await.end; + let current_floor = db.inactivity_floor_loc(); + + // Parent batch: base = end, 1 append lands at `end`, commit lands at `end + 1`. + // So parent's total_size = end + 2 and parent_commit_loc = end + 1. + let parent_commit_loc = end.as_u64() + 1; + let (parent_floor, kind) = match ancestor_kind { + FloorKind::BadRegression => { + if current_floor.as_u64() == 0 { + // No regression possible; skip this op (no-op). + continue; + } + ( + Location::::new(current_floor.as_u64() - 1), + BadFloorExpect::Regression, + ) + } + FloorKind::BadBeyondCommit => ( + Location::::new(parent_commit_loc + 1), + BadFloorExpect::BeyondSize, + ), + _ => continue, // only bad kinds are meaningful here + }; + + // Don't drain pending_appends — keep them for future ops. Build from scratch. + let parent = db + .new_batch() + .append(vec![0u8; 1]) + .merkleize(&db, None, parent_floor); + // child: valid on its own; only the ancestor should trip the check. + let child_floor = parent_floor; // stay ≥ parent_floor even if parent is bad + let child = parent + .new_batch::() + .append(vec![1u8; 1]) + .merkleize(&db, None, child_floor); + + let before_last_commit = db.last_commit_loc(); + let before_floor = db.inactivity_floor_loc(); + let before_root = db.root(); + let err = db + .apply_batch(child) + .await + .expect_err("bad ancestor floor must be rejected"); + assert_bad_floor_error(&err, kind); + assert_eq!(db.last_commit_loc(), before_last_commit); + assert_eq!(db.inactivity_floor_loc(), before_floor); + assert_eq!(db.root(), before_root); } Operation::Get { loc_offset } => { diff --git a/storage/src/qmdb/keyless/batch.rs b/storage/src/qmdb/keyless/batch.rs index 59a03dff404..63a9ad2a90d 100644 --- a/storage/src/qmdb/keyless/batch.rs +++ b/storage/src/qmdb/keyless/batch.rs @@ -66,6 +66,11 @@ where /// Used by `apply_batch` to validate partial ancestor commits. pub(super) ancestor_batch_ends: Vec, + /// Each ancestor's `new_inactivity_floor_loc`, stored in parallel with + /// `ancestor_batch_ends` (same order, newest-first: parent, grandparent, ...). + /// Used by `apply_batch` to enforce per-commit floor monotonicity across the chain. + pub(super) ancestor_new_inactivity_floor_locs: Vec>, + /// The inactivity floor declared by this batch's commit operation. pub(super) new_inactivity_floor_loc: Location, } @@ -189,8 +194,9 @@ where /// Resolve appends into operations, merkleize, and return an `Arc`. /// /// `inactivity_floor` is the application-declared floor embedded in the commit. It must - /// be monotonically non-decreasing (enforced on `apply_batch`) and must not exceed the - /// batch's total operation count. + /// be monotonically non-decreasing across the chain (enforced on `apply_batch`) and must + /// be at most this batch's own commit location (`total_size - 1`). A floor past the commit + /// would let a later `prune(floor)` remove the last readable commit. pub fn merkleize( self, db: &Keyless, @@ -220,10 +226,13 @@ where let journal = db.journal.with_mem(|mem| journal_batch.merkleize(mem)); let mut ancestor_batch_ends = Vec::new(); + let mut ancestor_new_inactivity_floor_locs = Vec::new(); if let Some(parent) = &self.parent { ancestor_batch_ends.push(parent.total_size); + ancestor_new_inactivity_floor_locs.push(parent.new_inactivity_floor_loc); for batch in parent.ancestors() { ancestor_batch_ends.push(batch.total_size); + ancestor_new_inactivity_floor_locs.push(batch.new_inactivity_floor_loc); } } @@ -234,6 +243,7 @@ where total_size, db_size: self.db_size, ancestor_batch_ends, + ancestor_new_inactivity_floor_locs, new_inactivity_floor_loc: inactivity_floor, }) } diff --git a/storage/src/qmdb/keyless/fixed.rs b/storage/src/qmdb/keyless/fixed.rs index f53beec8417..13d92acfa39 100644 --- a/storage/src/qmdb/keyless/fixed.rs +++ b/storage/src/qmdb/keyless/fixed.rs @@ -354,10 +354,10 @@ mod test { } #[test_traced("INFO")] - fn test_keyless_fixed_floor_beyond_size_rejected() { + fn test_keyless_fixed_floor_beyond_commit_loc_rejected() { deterministic::Runner::default().start(|ctx| async move { let db = open_db::(ctx.with_label("db")).await; - tests::test_keyless_db_floor_beyond_size_rejected(db).await; + tests::test_keyless_db_floor_beyond_commit_loc_rejected(db).await; }); } @@ -379,10 +379,10 @@ mod test { } #[test_traced("INFO")] - fn test_keyless_fixed_floor_at_total_size_accepted() { + fn test_keyless_fixed_floor_at_commit_loc_accepted() { deterministic::Runner::default().start(|ctx| async move { let db = open_db::(ctx.with_label("db")).await; - tests::test_keyless_db_floor_at_total_size_accepted(db).await; + tests::test_keyless_db_floor_at_commit_loc_accepted(db).await; }); } @@ -395,6 +395,30 @@ mod test { }); } + #[test_traced("INFO")] + fn test_keyless_fixed_ancestor_floor_regression_rejected() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_ancestor_floor_regression_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_fixed_ancestor_floor_beyond_commit_loc_rejected() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_ancestor_floor_beyond_commit_loc_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_fixed_chained_apply_with_valid_floors_succeeds() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_chained_apply_with_valid_floors_succeeds(db).await; + }); + } + // mmb::Family variants #[test_traced("INFO")] @@ -655,10 +679,10 @@ mod test { } #[test_traced("INFO")] - fn test_keyless_fixed_floor_beyond_size_rejected_mmb() { + fn test_keyless_fixed_floor_beyond_commit_loc_rejected_mmb() { deterministic::Runner::default().start(|ctx| async move { let db = open_db::(ctx.with_label("db")).await; - tests::test_keyless_db_floor_beyond_size_rejected(db).await; + tests::test_keyless_db_floor_beyond_commit_loc_rejected(db).await; }); } @@ -680,10 +704,10 @@ mod test { } #[test_traced("INFO")] - fn test_keyless_fixed_floor_at_total_size_accepted_mmb() { + fn test_keyless_fixed_floor_at_commit_loc_accepted_mmb() { deterministic::Runner::default().start(|ctx| async move { let db = open_db::(ctx.with_label("db")).await; - tests::test_keyless_db_floor_at_total_size_accepted(db).await; + tests::test_keyless_db_floor_at_commit_loc_accepted(db).await; }); } @@ -695,4 +719,28 @@ mod test { .await; }); } + + #[test_traced("INFO")] + fn test_keyless_fixed_ancestor_floor_regression_rejected_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_ancestor_floor_regression_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_fixed_ancestor_floor_beyond_commit_loc_rejected_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_ancestor_floor_beyond_commit_loc_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_fixed_chained_apply_with_valid_floors_succeeds_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_chained_apply_with_valid_floors_succeeds(db).await; + }); + } } diff --git a/storage/src/qmdb/keyless/mod.rs b/storage/src/qmdb/keyless/mod.rs index 3be664e8c3e..4b98d98ad8f 100644 --- a/storage/src/qmdb/keyless/mod.rs +++ b/storage/src/qmdb/keyless/mod.rs @@ -357,6 +357,7 @@ where total_size: journal_size, db_size: journal_size, ancestor_batch_ends: Vec::new(), + ancestor_new_inactivity_floor_locs: Vec::new(), new_inactivity_floor_loc: self.inactivity_floor_loc, }) } @@ -367,10 +368,19 @@ where /// ancestor chain was created is an ancestor of this batch. Applying a batch from a /// different fork returns [`Error::StaleBatch`]. /// - /// The batch's declared inactivity floor must be monotonically non-decreasing with respect - /// to the current floor and must not exceed the batch's total operation count. Violations - /// return [`Error::FloorRegressed`] or [`Error::FloorBeyondSize`]. Floor validation happens - /// before any journal mutation, so the database is untouched on floor errors. + /// Every commit operation in the batch chain (each unapplied ancestor's commit plus the + /// tip's) must satisfy two per-commit invariants: + /// + /// 1. The floor is monotonically non-decreasing across the chain, starting from the + /// database's current inactivity floor. + /// 2. The floor is at most the commit operation's own location (`total_size - 1` at that + /// point). A floor past the commit would let a later `prune(floor)` remove the last + /// readable commit from the journal. + /// + /// Violations return [`Error::FloorRegressed`] or [`Error::FloorBeyondSize`] identifying + /// the offending floor and the bound it crossed (the prior validated floor, or the commit + /// location, respectively). Floor validation happens before any journal mutation, so the + /// database is untouched on floor errors. /// /// Returns the range of locations written. /// @@ -392,18 +402,40 @@ where batch_base_size: batch.base_size, }); } - // Enforce floor monotonicity and bounds before mutating the journal. - if batch.new_inactivity_floor_loc < self.inactivity_floor_loc { + // Validate every unapplied commit's floor (each ancestor in the chain, then the tip) + // before mutating the journal. The invariant is per-commit: + // - floors are monotonically non-decreasing across the chain, and + // - each floor is at most its own commit location (= total_size - 1 at that point). + // Ancestors are stored newest-first, so walk in reverse to get oldest-first. + let mut prev_floor = self.inactivity_floor_loc; + for i in (0..batch.ancestor_batch_ends.len()).rev() { + let ancestor_end = batch.ancestor_batch_ends[i]; + if ancestor_end <= db_size { + // Already on disk — its floor was validated when it was first applied. + continue; + } + let ancestor_floor = batch.ancestor_new_inactivity_floor_locs[i]; + let ancestor_commit_loc = Location::new(ancestor_end - 1); + if ancestor_floor < prev_floor { + return Err(Error::FloorRegressed(ancestor_floor, prev_floor)); + } + if ancestor_floor > ancestor_commit_loc { + return Err(Error::FloorBeyondSize(ancestor_floor, ancestor_commit_loc)); + } + prev_floor = ancestor_floor; + } + // Tip checks chain off the last validated ancestor floor. + if batch.new_inactivity_floor_loc < prev_floor { return Err(Error::FloorRegressed( batch.new_inactivity_floor_loc, - self.inactivity_floor_loc, + prev_floor, )); } - let batch_end = Location::new(batch.total_size); - if batch.new_inactivity_floor_loc > batch_end { + let tip_commit_loc = Location::new(batch.total_size - 1); + if batch.new_inactivity_floor_loc > tip_commit_loc { return Err(Error::FloorBeyondSize( batch.new_inactivity_floor_loc, - batch_end, + tip_commit_loc, )); } let start_loc = self.last_commit_loc + 1; @@ -2012,7 +2044,7 @@ pub(crate) mod tests { db.destroy().await.unwrap(); } - pub(crate) async fn test_keyless_db_floor_beyond_size_rejected( + pub(crate) async fn test_keyless_db_floor_beyond_commit_loc_rejected( mut db: Keyless, ) where V: ValueEncoding, @@ -2021,7 +2053,8 @@ pub(crate) mod tests { Operation: EncodeShared, { // Batch of 2 appends + 1 commit lands at locations [1..4); commit at 3, total_size = 4. - // A floor > 4 is invalid. + // A floor > 3 (the commit location) is invalid — even floor == 4 (one past the commit) + // is rejected so a subsequent prune cannot remove the last readable commit. let merkleized = db .new_batch() .append(V::Value::make(1)) @@ -2029,7 +2062,19 @@ pub(crate) mod tests { .merkleize(&db, None, Location::new(999)); let err = db.apply_batch(merkleized).await.unwrap_err(); assert!( - matches!(err, Error::FloorBeyondSize(floor, size) if *floor == 999 && *size == 4), + matches!(err, Error::FloorBeyondSize(floor, commit) if *floor == 999 && *commit == 3), + "unexpected error: {err:?}" + ); + + // Boundary: floor == total_size (= commit_loc + 1) is also rejected. + let merkleized = db + .new_batch() + .append(V::Value::make(3)) + .append(V::Value::make(4)) + .merkleize(&db, None, Location::new(4)); + let err = db.apply_batch(merkleized).await.unwrap_err(); + assert!( + matches!(err, Error::FloorBeyondSize(floor, commit) if *floor == 4 && *commit == 3), "unexpected error: {err:?}" ); @@ -2118,8 +2163,8 @@ pub(crate) mod tests { db_b.destroy().await.unwrap(); } - /// A floor equal to the batch's total operation count is on the boundary of acceptance. - pub(crate) async fn test_keyless_db_floor_at_total_size_accepted( + /// A floor equal to the commit operation's location is on the tight boundary of acceptance. + pub(crate) async fn test_keyless_db_floor_at_commit_loc_accepted( mut db: Keyless, ) where V: ValueEncoding, @@ -2127,18 +2172,18 @@ pub(crate) mod tests { H: Hasher, Operation: EncodeShared, { - // 2 appends + 1 commit on top of the initial commit: total_size = 4. - // Setting floor = 4 is on the boundary (inclusive). - let total_size = Location::::new(4); + // 2 appends + 1 commit on top of the initial commit: commit lands at location 3. + // floor == 3 (= commit_loc) is the maximum accepted value under the per-commit bound. + let commit_loc = Location::::new(3); db.apply_batch( db.new_batch() .append(V::Value::make(1)) .append(V::Value::make(2)) - .merkleize(&db, None, total_size), + .merkleize(&db, None, commit_loc), ) .await .unwrap(); - assert_eq!(db.inactivity_floor_loc(), total_size); + assert_eq!(db.inactivity_floor_loc(), commit_loc); db.destroy().await.unwrap(); } @@ -2197,4 +2242,117 @@ pub(crate) mod tests { db.destroy().await.unwrap(); } + + /// A chained batch that applies a tip with a floor *lower than* its parent's floor must + /// be rejected — the parent's `Commit` is written to the journal by the same + /// `journal.apply_batch` call, so its floor participates in the per-commit monotonicity + /// invariant. + pub(crate) async fn test_keyless_db_ancestor_floor_regression_rejected( + mut db: Keyless, + ) where + F: Family, + V: ValueEncoding, + C: Mutable> + Persistable, + H: Hasher, + Operation: EncodeShared, + { + // parent: 1 append + commit at loc 2 with floor=2 (the parent's commit_loc). + let parent = + db.new_batch() + .append(V::Value::make(1)) + .merkleize(&db, None, Location::new(2)); + // child: 1 append + commit at loc 4 with floor=1 (regressed from parent's floor=2). + let child = parent.new_batch::().append(V::Value::make(2)).merkleize( + &db, + None, + Location::new(1), + ); + + let root_before = db.root(); + let last_commit_before = db.last_commit_loc(); + let floor_before = db.inactivity_floor_loc(); + + let err = db.apply_batch(child).await.unwrap_err(); + assert!( + matches!(err, Error::FloorRegressed(new, prev) if *new == 1 && *prev == 2), + "unexpected error: {err:?}" + ); + + // DB state untouched by the rejected chain. + assert_eq!(db.root(), root_before); + assert_eq!(db.last_commit_loc(), last_commit_before); + assert_eq!(db.inactivity_floor_loc(), floor_before); + + db.destroy().await.unwrap(); + } + + /// A chained batch where an *ancestor's* floor exceeds its own commit location must be + /// rejected — identifying the ancestor's bound, not the tip's. + pub(crate) async fn test_keyless_db_ancestor_floor_beyond_commit_loc_rejected( + mut db: Keyless, + ) where + F: Family, + V: ValueEncoding, + C: Mutable> + Persistable, + H: Hasher, + Operation: EncodeShared, + { + // parent: 1 append + commit at loc 2. Declare floor = 3 (one past the commit). + let parent = + db.new_batch() + .append(V::Value::make(1)) + .merkleize(&db, None, Location::new(3)); + // child: valid on its own (floor = 0 ≤ child's commit_loc), but parent's floor is bad. + let child = parent.new_batch::().append(V::Value::make(2)).merkleize( + &db, + None, + Location::new(0), + ); + + let err = db.apply_batch(child).await.unwrap_err(); + // Error should identify the ancestor's commit_loc (2), not the tip's. + assert!( + matches!(err, Error::FloorBeyondSize(floor, commit) if *floor == 3 && *commit == 2), + "unexpected error: {err:?}" + ); + + db.destroy().await.unwrap(); + } + + /// A multi-level chain with strictly-monotonic, within-bounds floors applies cleanly. + pub(crate) async fn test_keyless_db_chained_apply_with_valid_floors_succeeds( + mut db: Keyless, + ) where + F: Family, + V: ValueEncoding, + C: Mutable> + Persistable, + H: Hasher, + Operation: EncodeShared, + { + // parent: 1 append, commit at loc 2, floor = 2. + // child: 1 append, commit at loc 4, floor = 3. + // grandchild: 1 append, commit at loc 6, floor = 5. + let parent = + db.new_batch() + .append(V::Value::make(1)) + .merkleize(&db, None, Location::new(2)); + let child = parent.new_batch::().append(V::Value::make(2)).merkleize( + &db, + None, + Location::new(3), + ); + let grandchild = + child + .new_batch::() + .append(V::Value::make(3)) + .merkleize(&db, None, Location::new(5)); + + db.apply_batch(grandchild).await.unwrap(); + + // Grandchild's commit is the last op; tip's floor is the live floor. + assert_eq!(db.last_commit_loc(), Location::new(6)); + assert_eq!(db.inactivity_floor_loc(), Location::new(5)); + + db.destroy().await.unwrap(); + } } diff --git a/storage/src/qmdb/keyless/sync.rs b/storage/src/qmdb/keyless/sync.rs index 2412763eef8..2a3b4b5ef4b 100644 --- a/storage/src/qmdb/keyless/sync.rs +++ b/storage/src/qmdb/keyless/sync.rs @@ -275,6 +275,7 @@ mod tests { let target_op_count = bounds.end; let target_oldest_retained_loc = bounds.start; let target_root = target_db.root(); + let target_floor = target_db.inactivity_floor_loc(); let db_config = create_sync_config(&format!("sync_client_{}", context.next_u64()), &context); @@ -303,6 +304,8 @@ mod tests { assert_eq!(bounds.end, target_op_count); assert_eq!(bounds.start, target_oldest_retained_loc); assert_eq!(got_db.root(), target_root); + // Explicit: sync must reproduce the inactivity floor, not just the root. + assert_eq!(got_db.inactivity_floor_loc(), target_floor); // Verify values match for (i, op) in target_ops.iter().enumerate() { @@ -595,6 +598,7 @@ mod tests { let bounds = target_db.bounds().await; let lower_bound = bounds.start; let upper_bound = bounds.end; + let target_floor = target_db.inactivity_floor_loc(); let target_db = Arc::new(target_db); let config = Config { @@ -617,6 +621,7 @@ mod tests { assert_eq!(sync_db.bounds().await.end, upper_bound); assert_eq!(sync_db.root(), root); + assert_eq!(sync_db.inactivity_floor_loc(), target_floor); sync_db.destroy().await.unwrap(); let target_db = @@ -649,6 +654,7 @@ mod tests { let bounds = target_db.bounds().await; let lower_bound = bounds.start; let upper_bound = bounds.end; + let target_floor = target_db.inactivity_floor_loc(); let resolver = Arc::new(target_db); let config = Config { @@ -671,6 +677,7 @@ mod tests { assert_eq!(sync_db.bounds().await.end, upper_bound); assert_eq!(sync_db.root(), root); + assert_eq!(sync_db.inactivity_floor_loc(), target_floor); sync_db.destroy().await.unwrap(); let target_db = diff --git a/storage/src/qmdb/keyless/variable.rs b/storage/src/qmdb/keyless/variable.rs index f46094687d4..1a73b2eb5d9 100644 --- a/storage/src/qmdb/keyless/variable.rs +++ b/storage/src/qmdb/keyless/variable.rs @@ -616,10 +616,10 @@ mod test { } #[test_traced("INFO")] - fn test_keyless_variable_floor_beyond_size_rejected_mmb() { + fn test_keyless_variable_floor_beyond_commit_loc_rejected_mmb() { deterministic::Runner::default().start(|ctx| async move { let db = open_db::(ctx.with_label("db")).await; - tests::test_keyless_db_floor_beyond_size_rejected(db).await; + tests::test_keyless_db_floor_beyond_commit_loc_rejected(db).await; }); } @@ -641,10 +641,10 @@ mod test { } #[test_traced("INFO")] - fn test_keyless_variable_floor_at_total_size_accepted_mmb() { + fn test_keyless_variable_floor_at_commit_loc_accepted_mmb() { deterministic::Runner::default().start(|ctx| async move { let db = open_db::(ctx.with_label("db")).await; - tests::test_keyless_db_floor_at_total_size_accepted(db).await; + tests::test_keyless_db_floor_at_commit_loc_accepted(db).await; }); } @@ -657,6 +657,30 @@ mod test { }); } + #[test_traced("INFO")] + fn test_keyless_variable_ancestor_floor_regression_rejected_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_ancestor_floor_regression_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_ancestor_floor_beyond_commit_loc_rejected_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_ancestor_floor_beyond_commit_loc_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_chained_apply_with_valid_floors_succeeds_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_chained_apply_with_valid_floors_succeeds(db).await; + }); + } + #[test_traced("INFO")] fn test_keyless_variable_floor_tracking() { deterministic::Runner::default().start(|ctx| async move { @@ -674,10 +698,10 @@ mod test { } #[test_traced("INFO")] - fn test_keyless_variable_floor_beyond_size_rejected() { + fn test_keyless_variable_floor_beyond_commit_loc_rejected() { deterministic::Runner::default().start(|ctx| async move { let db = open_db::(ctx.with_label("db")).await; - tests::test_keyless_db_floor_beyond_size_rejected(db).await; + tests::test_keyless_db_floor_beyond_commit_loc_rejected(db).await; }); } @@ -699,10 +723,10 @@ mod test { } #[test_traced("INFO")] - fn test_keyless_variable_floor_at_total_size_accepted() { + fn test_keyless_variable_floor_at_commit_loc_accepted() { deterministic::Runner::default().start(|ctx| async move { let db = open_db::(ctx.with_label("db")).await; - tests::test_keyless_db_floor_at_total_size_accepted(db).await; + tests::test_keyless_db_floor_at_commit_loc_accepted(db).await; }); } @@ -715,6 +739,30 @@ mod test { }); } + #[test_traced("INFO")] + fn test_keyless_variable_ancestor_floor_regression_rejected() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_ancestor_floor_regression_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_ancestor_floor_beyond_commit_loc_rejected() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_ancestor_floor_beyond_commit_loc_rejected(db).await; + }); + } + + #[test_traced("INFO")] + fn test_keyless_variable_chained_apply_with_valid_floors_succeeds() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db")).await; + tests::test_keyless_db_chained_apply_with_valid_floors_succeeds(db).await; + }); + } + fn is_send(_: T) {} #[allow(dead_code)] diff --git a/storage/src/qmdb/mod.rs b/storage/src/qmdb/mod.rs index 7059719a394..27ccfba9a03 100644 --- a/storage/src/qmdb/mod.rs +++ b/storage/src/qmdb/mod.rs @@ -118,8 +118,10 @@ pub enum Error { #[error("floor regressed: batch floor {0} < current floor {1}")] FloorRegressed(Location, Location), - /// The batch's inactivity floor exceeds its total operation count. - #[error("floor beyond size: floor {0} > total size {1}")] + /// The batch's inactivity floor exceeds its own commit operation's location. The floor + /// must not sit past the commit, since a subsequent `prune(floor)` would then remove the + /// last readable commit from the journal. + #[error("floor beyond commit location: floor {0} > commit loc {1}")] FloorBeyondSize(Location, Location), } From e3c225274d6d0e607200c059b00f85b18f5ae0ad Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 20 Apr 2026 16:22:53 -0400 Subject: [PATCH 4/6] add empty live set tests --- storage/src/qmdb/keyless/fixed.rs | 16 +++++ storage/src/qmdb/keyless/mod.rs | 98 ++++++++++++++++++++++++++++ storage/src/qmdb/keyless/variable.rs | 16 +++++ 3 files changed, 130 insertions(+) diff --git a/storage/src/qmdb/keyless/fixed.rs b/storage/src/qmdb/keyless/fixed.rs index 13d92acfa39..b82935c1b82 100644 --- a/storage/src/qmdb/keyless/fixed.rs +++ b/storage/src/qmdb/keyless/fixed.rs @@ -419,6 +419,14 @@ mod test { }); } + #[test_traced("INFO")] + fn test_keyless_fixed_single_commit_live_set() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db1")).await; + tests::test_keyless_db_single_commit_live_set(ctx, db, reopen::()).await; + }); + } + // mmb::Family variants #[test_traced("INFO")] @@ -743,4 +751,12 @@ mod test { tests::test_keyless_db_chained_apply_with_valid_floors_succeeds(db).await; }); } + + #[test_traced("INFO")] + fn test_keyless_fixed_single_commit_live_set_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db1")).await; + tests::test_keyless_db_single_commit_live_set(ctx, db, reopen::()).await; + }); + } } diff --git a/storage/src/qmdb/keyless/mod.rs b/storage/src/qmdb/keyless/mod.rs index 4b98d98ad8f..6edfffe2799 100644 --- a/storage/src/qmdb/keyless/mod.rs +++ b/storage/src/qmdb/keyless/mod.rs @@ -2319,6 +2319,104 @@ pub(crate) mod tests { db.destroy().await.unwrap(); } + /// After committing with `floor = commit_loc` and pruning down to it, the live set is + /// exactly one operation — the commit itself. This is the minimum non-empty live set + /// achievable under the per-commit bound. The DB must remain fully usable: the commit is + /// readable, the root is preserved, reopen recovers `inactivity_floor_loc` from the sole + /// remaining op, and a follow-on batch applies cleanly on top. + pub(crate) async fn test_keyless_db_single_commit_live_set( + context: deterministic::Context, + mut db: Keyless, + reopen: Reopen>, + ) where + F: Family, + V: ValueEncoding, + C: Mutable> + Persistable, + H: Hasher, + Operation: EncodeShared, + { + // Initial commit is at loc 0. 3 appends + commit → commit lands at loc 4. + // Declare floor = 4 (= commit_loc), the tight maximum. + let metadata = V::Value::make(42); + let commit_loc = Location::::new(4); + db.apply_batch( + db.new_batch() + .append(V::Value::make(1)) + .append(V::Value::make(2)) + .append(V::Value::make(3)) + .merkleize(&db, Some(metadata.clone()), commit_loc), + ) + .await + .unwrap(); + db.commit().await.unwrap(); + assert_eq!(db.last_commit_loc(), commit_loc); + assert_eq!(db.inactivity_floor_loc(), commit_loc); + let root_after_commit = db.root(); + + // Prune at the floor — the maximum prune allowed under the invariant. + // Pruning is blob-aligned, so `bounds.start` may not physically advance all the way + // to `commit_loc`; what matters semantically is that the floor has authorized pruning + // of everything below the commit and that any further prune is rejected. + db.prune(commit_loc).await.unwrap(); + let bounds = db.bounds().await; + assert!( + bounds.start <= commit_loc, + "prune must not advance bounds.start past the floor" + ); + assert_eq!(bounds.end, Location::new(*commit_loc + 1)); + + // Pruning one past the floor must be rejected — the floor is the hard ceiling. + let err = db.prune(Location::new(*commit_loc + 1)).await.unwrap_err(); + assert!(matches!(err, Error::PruneBeyondMinRequired(p, f) + if *p == *commit_loc + 1 && *f == *commit_loc)); + + // The commit op remains readable; its metadata is intact. + assert_eq!(db.get(commit_loc).await.unwrap(), Some(metadata.clone())); + assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone())); + assert_eq!(db.last_commit_loc(), commit_loc); + assert_eq!(db.inactivity_floor_loc(), commit_loc); + // Prune does not affect the root (documented invariant on `prune`). + assert_eq!(db.root(), root_after_commit); + + // Persist the prune, then reopen: `init_from_journal` must recover the floor from + // the last commit op. + db.sync().await.unwrap(); + drop(db); + let mut db = reopen(context.with_label("reopened")).await; + let reopened_bounds = db.bounds().await; + assert_eq!(reopened_bounds.end, Location::new(*commit_loc + 1)); + assert_eq!(db.last_commit_loc(), commit_loc); + assert_eq!(db.inactivity_floor_loc(), commit_loc); + assert_eq!(db.root(), root_after_commit); + assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone())); + + // A follow-on batch applies on top. Monotonicity requires the new floor to be at + // least `commit_loc` (= 4); advancing to the new tight max (= 7) exercises the + // ancestor-to-tip floor transition from a minimum-live-set starting point. + let next_commit_loc = Location::::new(7); + let v5 = V::Value::make(5); + let v6 = V::Value::make(6); + db.apply_batch( + db.new_batch() + .append(v5.clone()) + .append(v6.clone()) + .merkleize(&db, None, next_commit_loc), + ) + .await + .unwrap(); + db.commit().await.unwrap(); + assert_eq!(db.last_commit_loc(), next_commit_loc); + assert_eq!(db.inactivity_floor_loc(), next_commit_loc); + + // New appends readable; the original commit op is also still in the live set (not + // re-pruned), so reading it still returns its metadata. + assert_eq!(db.get(Location::new(5)).await.unwrap(), Some(v5)); + assert_eq!(db.get(Location::new(6)).await.unwrap(), Some(v6)); + assert_eq!(db.get(commit_loc).await.unwrap(), Some(metadata)); + + db.destroy().await.unwrap(); + } + /// A multi-level chain with strictly-monotonic, within-bounds floors applies cleanly. pub(crate) async fn test_keyless_db_chained_apply_with_valid_floors_succeeds( mut db: Keyless, diff --git a/storage/src/qmdb/keyless/variable.rs b/storage/src/qmdb/keyless/variable.rs index 1a73b2eb5d9..7e1d41a60f1 100644 --- a/storage/src/qmdb/keyless/variable.rs +++ b/storage/src/qmdb/keyless/variable.rs @@ -681,6 +681,14 @@ mod test { }); } + #[test_traced("INFO")] + fn test_keyless_variable_single_commit_live_set_mmb() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db1")).await; + tests::test_keyless_db_single_commit_live_set(ctx, db, reopen::()).await; + }); + } + #[test_traced("INFO")] fn test_keyless_variable_floor_tracking() { deterministic::Runner::default().start(|ctx| async move { @@ -763,6 +771,14 @@ mod test { }); } + #[test_traced("INFO")] + fn test_keyless_variable_single_commit_live_set() { + deterministic::Runner::default().start(|ctx| async move { + let db = open_db::(ctx.with_label("db1")).await; + tests::test_keyless_db_single_commit_live_set(ctx, db, reopen::()).await; + }); + } + fn is_send(_: T) {} #[allow(dead_code)] From 39ceb11723d4e6160a5f6e7c0828e6839e233c1a Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Tue, 21 Apr 2026 13:59:47 -0400 Subject: [PATCH 5/6] fix merge --- storage/src/qmdb/conformance.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/storage/src/qmdb/conformance.rs b/storage/src/qmdb/conformance.rs index 0f1325d6094..bbf6b5aeb4a 100644 --- a/storage/src/qmdb/conformance.rs +++ b/storage/src/qmdb/conformance.rs @@ -249,11 +249,12 @@ macro_rules! apply_sets { /// Apply a batch of keyless appends to the database. macro_rules! apply_appends { ($db:ident, $vals:expr) => {{ + let floor = $db.inactivity_floor_loc(); let mut batch = $db.new_batch(); for v in $vals { batch = batch.append(v); } - let merkleized = batch.merkleize(&$db, None); + let merkleized = batch.merkleize(&$db, None, floor); $db.apply_batch(merkleized).await.unwrap(); }}; } From 667153bae90d5ebf6ade4686a4998d7e3dd25999 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Tue, 21 Apr 2026 14:31:56 -0400 Subject: [PATCH 6/6] update conformance test --- storage/conformance.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/storage/conformance.toml b/storage/conformance.toml index b859c09fe16..9d370d9430a 100644 --- a/storage/conformance.toml +++ b/storage/conformance.toml @@ -220,19 +220,19 @@ hash = "f32fda4f6c0bbde62f8f1959edeb2ac67b28f5347bd108e4118e740aa1fd8fd1" ["commonware_storage::qmdb::conformance::KeylessMmbFixedConf"] n_cases = 200 -hash = "41f832011239e8d76ded66ea5e2559a6eaeb6ef65875b9d50db1296a2f26e5f8" +hash = "d9ad4deb528e8bc514d028efa34e9b31b6642584c8a0c0e98c7a6d9b36f355df" ["commonware_storage::qmdb::conformance::KeylessMmbVariableConf"] n_cases = 200 -hash = "ce612a2c4c3c43db5e797cdeff7df04b872660529ccdfb825feb0efcaaf322a3" +hash = "c1f535aaa1a4ff8d8594b70d4d121a2ee48c342a68c8549c4b411d299cfa43dc" ["commonware_storage::qmdb::conformance::KeylessMmrFixedConf"] n_cases = 200 -hash = "79ac92e88026d3d73d1cf37edf85fcd82a5fd6824566b05fe04d0ad1fd7ec9e0" +hash = "49c54ba4ea198f1e5811abfb3a1a077925d1075b006c620cb3e4aa3769c997cb" ["commonware_storage::qmdb::conformance::KeylessMmrVariableConf"] n_cases = 200 -hash = "aa70f866ae6b4104e94c741f46806a430b2ce76e3af5798ba2fb23dff6d1fbd9" +hash = "aee37bfef687a6855e53f2e4a2ef7190cf9d33cdb67b5e177a7f56f7b44e4b9b" ["commonware_storage::qmdb::immutable::operation::fixed::tests::conformance::CodecConformance"] n_cases = 65536