Skip to content
14 changes: 6 additions & 8 deletions examples/sync/src/databases/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tracing::error;
pub type Database<E> = immutable::variable::Db<mmr::Family, E, Key, Value, Hasher, Translator>;

/// Operation type alias.
pub type Operation = immutable::variable::Operation<Key, Value>;
pub type Operation = immutable::variable::Operation<mmr::Family, Key, Value>;

/// Create a database configuration with appropriate partitioning for Immutable.
pub fn create_config(context: &impl BufferPooler) -> Config<Translator, VConfig<((), ())>> {
Expand Down Expand Up @@ -74,12 +74,12 @@ pub fn create_test_operations(count: usize, seed: u64) -> Vec<Operation> {
operations.push(Operation::Set(key, value));

if (i + 1) % 10 == 0 {
operations.push(Operation::Commit(None));
operations.push(Operation::Commit(None, Location::new(0)));
}
}

// Always end with a commit
operations.push(Operation::Commit(Some(Sha256::fill(1))));
operations.push(Operation::Commit(Some(Sha256::fill(1)), Location::new(0)));
operations
}

Expand Down Expand Up @@ -110,8 +110,8 @@ where
Operation::Set(key, value) => {
batch = batch.set(key, value);
}
Operation::Commit(metadata) => {
let merkleized = batch.merkleize(self, metadata);
Operation::Commit(metadata, floor) => {
let merkleized = batch.merkleize(self, metadata, floor);
self.apply_batch(merkleized).await?;
self.commit().await?;
batch = self.new_batch();
Expand All @@ -130,9 +130,7 @@ where
}

async fn inactivity_floor(&self) -> Location {
// For Immutable databases, all retained operations are active,
// so the inactivity floor equals the pruning boundary.
self.bounds().await.start
self.inactivity_floor_loc()
}

fn historical_proof(
Expand Down
6 changes: 3 additions & 3 deletions storage/conformance.toml
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,15 @@ hash = "aa70f866ae6b4104e94c741f46806a430b2ce76e3af5798ba2fb23dff6d1fbd9"

["commonware_storage::qmdb::immutable::operation::fixed::tests::conformance::CodecConformance<FixedOp>"]
n_cases = 65536
hash = "4f75cdf8952431729e7a3dfad38a8d5bfbb92bf6b54b1ca180a66745aed618d5"
hash = "1737c49c112fc9dfdc4dde3626d8ab08a9df8353b9a702e488d3a1fd8e9428dc"

["commonware_storage::qmdb::immutable::operation::variable::tests::conformance::CodecConformance<VarKeyOp>"]
n_cases = 65536
hash = "cce5f888e506282f861e0e49e176b26c65f92e457f0c5f5353fc9b7196f07478"
hash = "bdee433c53489ee67a42ad2029081d3f233799bd360d6e5e7f29cbaec87c9064"

["commonware_storage::qmdb::immutable::operation::variable::tests::conformance::CodecConformance<VarOp>"]
n_cases = 65536
hash = "fdca5df62d243b28676ee15034663694cd219d5ef80749079126b0ed73effe0d"
hash = "7952a9bb3a9cec87a95af6dd96a5b88b535106f0241770e0bfdb8aeaf9421056"

["commonware_storage::qmdb::keyless::operation::tests::conformance::CodecConformance<Operation<FixedEncoding<U64>>>"]
n_cases = 65536
Expand Down
33 changes: 26 additions & 7 deletions storage/fuzz/fuzz_targets/qmdb_immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum ImmutableOperation {
Commit {
has_metadata: bool,
metadata_size: usize,
advance_floor: bool,
},
Prune {
loc: u64,
Expand Down Expand Up @@ -179,24 +180,35 @@ fn fuzz_family<F: MerkleFamily>(input: &FuzzInput, suffix: &str) {
ImmutableOperation::Commit {
has_metadata,
metadata_size,
advance_floor,
} => {
let metadata = if has_metadata {
Some(generate_value(&mut rng, metadata_size))
} else {
None
};

let end = db.bounds().await.end;
let pending_count = pending_sets.len() as u64;
assign_pending_locations(
&pending_sets,
db.bounds().await.end,
end,
&mut keys_set,
&mut set_locations,
);
let mut batch = db.new_batch();
for (k, v) in pending_sets.drain(..) {
batch = batch.set(k, v);
}
let merkleized = batch.merkleize(&db, metadata);
let floor = if advance_floor {
// Advance floor to the commit location (end of this batch).
// total_size = end + pending_count + 1 (commit op).
// Floor at the commit op is the maximum valid value.
Location::new(*end + pending_count)
} else {
db.inactivity_floor_loc()
};
let merkleized = batch.merkleize(&db, metadata, floor);
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
last_commit_loc = Some(db.bounds().await.end - 1);
Expand All @@ -216,7 +228,10 @@ fn fuzz_family<F: MerkleFamily>(input: &FuzzInput, suffix: &str) {
for (k, v) in pending_sets.drain(..) {
batch = batch.set(k, v);
}
let merkleized = batch.merkleize(&db, None);
// Set the floor to at least safe_loc so the prune succeeds,
// but never below the current floor (monotonicity).
let floor = safe_loc.max(db.inactivity_floor_loc());
let merkleized = batch.merkleize(&db, None, floor);
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
last_commit_loc = Some(db.bounds().await.end - 1);
Expand Down Expand Up @@ -247,7 +262,8 @@ fn fuzz_family<F: MerkleFamily>(input: &FuzzInput, suffix: &str) {
for (k, v) in pending_sets.drain(..) {
batch = batch.set(k, v);
}
let merkleized = batch.merkleize(&db, None);
let floor = db.inactivity_floor_loc();
let merkleized = batch.merkleize(&db, None, floor);
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
last_commit_loc = Some(db.bounds().await.end - 1);
Expand All @@ -272,7 +288,8 @@ fn fuzz_family<F: MerkleFamily>(input: &FuzzInput, suffix: &str) {
let safe_max_ops =
NonZeroU64::new((max_ops % MAX_PROOF_OPS).max(1)).unwrap();

let batch = db.new_batch().merkleize(&db, None);
let floor = db.inactivity_floor_loc();
let batch = db.new_batch().merkleize(&db, None, floor);
db.apply_batch(batch).await.unwrap();
db.commit().await.unwrap();
last_commit_loc = Some(db.bounds().await.end - 1);
Expand Down Expand Up @@ -307,7 +324,8 @@ fn fuzz_family<F: MerkleFamily>(input: &FuzzInput, suffix: &str) {
for (k, v) in pending_sets.drain(..) {
batch = batch.set(k, v);
}
let merkleized = batch.merkleize(&db, None);
let floor = db.inactivity_floor_loc();
let merkleized = batch.merkleize(&db, None, floor);
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
last_commit_loc = Some(db.bounds().await.end - 1);
Expand All @@ -326,7 +344,8 @@ fn fuzz_family<F: MerkleFamily>(input: &FuzzInput, suffix: &str) {
for (k, v) in pending_sets.drain(..) {
batch = batch.set(k, v);
}
let merkleized = batch.merkleize(&db, None);
let floor = db.inactivity_floor_loc();
let merkleized = batch.merkleize(&db, None, floor);
db.apply_batch(merkleized).await.unwrap();
db.destroy().await.unwrap();
}
Expand Down
9 changes: 6 additions & 3 deletions storage/src/qmdb/conformance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,12 @@ async fn apply_writes<F: Family, D: DbAny<F, Key = Digest, Value = Digest>>(
/// Apply a batch of immutable sets to the database.
macro_rules! apply_sets {
($db:ident, $ops:expr) => {{
let floor = $db.inactivity_floor_loc();
let mut batch = $db.new_batch();
for (k, v) in $ops {
batch = batch.set(k, v);
}
let merkleized = batch.merkleize(&$db, None);
let merkleized = batch.merkleize(&$db, None, floor);
$db.apply_batch(merkleized).await.unwrap();
}};
}
Expand Down Expand Up @@ -641,18 +642,20 @@ macro_rules! assert_immutable_order_independent {
ops.push((colliding_digest(0xCD, i), to_val(i, 100)));
}

let fwd_floor = $fwd.inactivity_floor_loc();
let mut batch = $fwd.new_batch();
for &(k, v) in &ops {
batch = batch.set(k, v);
}
let merkleized = batch.merkleize(&$fwd, None);
let merkleized = batch.merkleize(&$fwd, None, fwd_floor);
$fwd.apply_batch(merkleized).await.unwrap();

let rev_floor = $rev.inactivity_floor_loc();
let mut batch = $rev.new_batch();
for &(k, v) in ops.iter().rev() {
batch = batch.set(k, v);
}
let merkleized = batch.merkleize(&$rev, None);
let merkleized = batch.merkleize(&$rev, None, rev_floor);
$rev.apply_batch(merkleized).await.unwrap();

assert_eq!(
Expand Down
31 changes: 20 additions & 11 deletions storage/src/qmdb/immutable/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ where
H: CHasher,
{
/// Authenticated journal batch for computing the speculative Merkle root.
journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<K, V>>,
journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<F, K, V>>,

/// Pending mutations.
mutations: BTreeMap<K, V::Value>,
Expand All @@ -65,7 +65,7 @@ where
#[derive(Clone)]
pub struct MerkleizedBatch<F: Family, D: Digest, K: Key, V: ValueEncoding> {
/// Authenticated journal batch (Merkle state + local items).
pub(super) journal_batch: Arc<authenticated::MerkleizedBatch<F, D, Operation<K, V>>>,
pub(super) journal_batch: Arc<authenticated::MerkleizedBatch<F, D, Operation<F, K, V>>>,

/// This batch's local key-level changes only (not accumulated from ancestors).
/// Sorted by key with no duplicates; queried via `lookup_sorted` (binary search).
Expand All @@ -92,6 +92,9 @@ pub struct MerkleizedBatch<F: Family, D: Digest, K: Key, V: ValueEncoding> {
/// 1:1 with `ancestor_diffs`: `ancestor_diff_ends[i]` is the boundary for
/// `ancestor_diffs[i]`. A batch is committed when `ancestor_diff_ends[i] <= db_size`.
pub(super) ancestor_diff_ends: Vec<u64>,

/// The inactivity floor declared by this batch's commit operation.
pub(super) new_inactivity_floor_loc: Location<F>,
}

impl<F, H, K, V> UnmerkleizedBatch<F, H, K, V>
Expand All @@ -100,7 +103,7 @@ where
K: Key,
V: ValueEncoding,
H: CHasher,
Operation<K, V>: EncodeShared,
Operation<F, K, V>: EncodeShared,
{
/// Create a batch from a committed DB (no parent chain).
pub(super) fn new<E, C, T>(
Expand All @@ -109,7 +112,7 @@ where
) -> Self
where
E: Context,
C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
T: Translator,
{
Expand Down Expand Up @@ -139,7 +142,7 @@ where
) -> Result<Option<V::Value>, Error<F>>
where
E: Context,
C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
T: Translator,
{
Expand All @@ -164,22 +167,26 @@ where
}

/// Resolve mutations into operations, merkleize, and return an `Arc<MerkleizedBatch>`.
///
/// `inactivity_floor` declares that all operations before this location are inactive.
/// It must be >= the database's current inactivity floor (monotonically non-decreasing).
pub fn merkleize<E, C, T>(
self,
db: &Immutable<F, E, K, V, C, H, T>,
metadata: Option<V::Value>,
inactivity_floor: Location<F>,
) -> Arc<MerkleizedBatch<F, H::Digest, K, V>>
where
E: Context,
C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
T: Translator,
{
let base = self.base_size;

// Build operations: one Set per key, then Commit. `self.mutations` is a BTreeMap, so
// iteration yields keys in sorted order, which `diff` relies on for binary search.
let mut ops: Vec<Operation<K, V>> = Vec::with_capacity(self.mutations.len() + 1);
let mut ops: Vec<Operation<F, K, V>> = Vec::with_capacity(self.mutations.len() + 1);
let mut diff: DiffVec<K, F, V::Value> = Vec::with_capacity(self.mutations.len());

for (key, value) in self.mutations {
Expand All @@ -189,7 +196,7 @@ where
}
debug_assert!(diff.is_sorted_by(|a, b| a.0 < b.0));

ops.push(Operation::Commit(metadata));
ops.push(Operation::Commit(metadata, inactivity_floor));

let total_size = base + ops.len() as u64;

Expand Down Expand Up @@ -220,13 +227,14 @@ where
db_size: self.db_size,
ancestor_diffs,
ancestor_diff_ends,
new_inactivity_floor_loc: inactivity_floor,
})
}
}

impl<F: Family, D: Digest, K: Key, V: ValueEncoding> MerkleizedBatch<F, D, K, V>
where
Operation<K, V>: EncodeShared,
Operation<F, K, V>: EncodeShared,
{
/// Return the speculative root.
pub fn root(&self) -> D {
Expand All @@ -251,7 +259,7 @@ where
) -> Result<Option<V::Value>, Error<F>>
where
E: Context,
C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
H: CHasher<Digest = D>,
T: Translator,
Expand Down Expand Up @@ -292,7 +300,7 @@ where
E: Context,
K: Key,
V: ValueEncoding,
C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
H: CHasher,
T: Translator,
Expand All @@ -309,6 +317,7 @@ where
db_size: journal_size,
ancestor_diffs: Vec::new(),
ancestor_diff_ends: Vec::new(),
new_inactivity_floor_loc: self.inactivity_floor_loc,
})
}
}
Loading
Loading