Skip to content

Commit 84bd84c

Browse files
[storage/qmdb/current] Generalized grafting core to support current QMDB (#3514)
1 parent b9660be commit 84bd84c

34 files changed

Lines changed: 3343 additions & 1370 deletions

examples/sync/src/bin/server.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use commonware_runtime::{
88
tokio as tokio_runtime, BufferPooler, Clock, Listener, Metrics, Network, Runner, SinkOf,
99
Spawner, Storage, StreamOf,
1010
};
11-
use commonware_storage::qmdb::sync::Target;
11+
use commonware_storage::{mmr, qmdb::sync::Target};
1212
use commonware_stream::utils::codec::{recv_frame, send_frame};
1313
use commonware_sync::{
1414
any, crate_version, current,
@@ -106,7 +106,7 @@ async fn maybe_add_operations<DB, E>(
106106
config: &Config,
107107
) -> Result<(), Box<dyn std::error::Error>>
108108
where
109-
DB: Syncable,
109+
DB: Syncable<Family = mmr::Family>,
110110
E: Storage + Clock + Metrics + RngCore,
111111
{
112112
let now = context.current();
@@ -155,7 +155,7 @@ async fn handle_get_sync_target<DB>(
155155
request: wire::GetSyncTargetRequest,
156156
) -> Result<wire::GetSyncTargetResponse<Key>, Error>
157157
where
158-
DB: Syncable,
158+
DB: Syncable<Family = mmr::Family>,
159159
{
160160
state.request_counter.inc();
161161

@@ -186,7 +186,7 @@ async fn handle_get_operations<DB>(
186186
request: wire::GetOperationsRequest,
187187
) -> Result<wire::GetOperationsResponse<DB::Operation, Key>, Error>
188188
where
189-
DB: Syncable,
189+
DB: Syncable<Family = mmr::Family>,
190190
{
191191
state.request_counter.inc();
192192
request.validate()?;
@@ -263,7 +263,7 @@ async fn handle_message<DB>(
263263
message: wire::Message<DB::Operation, Key>,
264264
) -> wire::Message<DB::Operation, Key>
265265
where
266-
DB: Syncable,
266+
DB: Syncable<Family = mmr::Family>,
267267
{
268268
let request_id = message.request_id();
269269
match message {
@@ -317,7 +317,7 @@ async fn recv_loop<DB, E>(
317317
response_sender: mpsc::Sender<wire::Message<DB::Operation, Key>>,
318318
client_addr: SocketAddr,
319319
) where
320-
DB: Syncable + Send + Sync + 'static,
320+
DB: Syncable<Family = mmr::Family> + Send + Sync + 'static,
321321
DB::Operation: Read + Send,
322322
<DB::Operation as Read>::Cfg: commonware_codec::IsUnit,
323323
E: Metrics + Network + Spawner,
@@ -367,7 +367,7 @@ async fn handle_client<DB, E>(
367367
client_addr: SocketAddr,
368368
) -> Result<(), Box<dyn std::error::Error>>
369369
where
370-
DB: Syncable + Send + Sync + 'static,
370+
DB: Syncable<Family = mmr::Family> + Send + Sync + 'static,
371371
DB::Operation: Read + Send,
372372
<DB::Operation as Read>::Cfg: commonware_codec::IsUnit,
373373
E: Storage + Clock + Metrics + Network + Spawner,
@@ -408,7 +408,7 @@ async fn initialize_database<DB, E>(
408408
context: &mut E,
409409
) -> Result<DB, Box<dyn std::error::Error>>
410410
where
411-
DB: Syncable,
411+
DB: Syncable<Family = mmr::Family>,
412412
E: RngCore,
413413
{
414414
info!("starting {} database", DB::name());
@@ -446,7 +446,7 @@ async fn run_helper<DB, E>(
446446
database: DB,
447447
) -> Result<(), Box<dyn std::error::Error>>
448448
where
449-
DB: Syncable + Send + Sync + 'static,
449+
DB: Syncable<Family = mmr::Family> + Send + Sync + 'static,
450450
DB::Operation: Read + Send,
451451
<DB::Operation as Read>::Cfg: commonware_codec::IsUnit,
452452
E: Storage + Clock + Metrics + Network + Spawner + RngCore + Clone,

examples/sync/src/databases/any.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ impl<E> crate::databases::Syncable for Database<E>
5454
where
5555
E: Storage + Clock + Metrics,
5656
{
57+
type Family = mmr::Family;
5758
type Operation = Operation;
5859

5960
fn create_test_operations(count: usize, seed: u64) -> Vec<Self::Operation> {

examples/sync/src/databases/current.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
//! Current database types and helpers for the sync example.
22
//!
33
//! A `current` database extends an `any` database with an activity bitmap that tracks which
4-
//! operations are active (i.e. represent the current state of their key) vs inactive
5-
//! (superseded or deleted). Its canonical root folds the ops root, a grafted MMR root
6-
//! (combining bitmap chunks with ops subtree roots), and an optional partial-chunk digest.
7-
//! See [current] module documentation for more details.
4+
//! operations are active (i.e. represent the current state of their key) vs inactive (superseded or
5+
//! deleted). Its canonical root folds the ops root, a grafted merkle root (combining bitmap chunks
6+
//! with ops subtree roots), and an optional partial-chunk digest. See [current] module
7+
//! documentation for more details.
88
//!
9-
//! For sync, the engine targets the **ops root** (not the canonical root). The operations and
10-
//! proof format are identical to `any` -- the bitmap is reconstructed deterministically from
11-
//! the operations after sync completes. See the
12-
//! [Root structure](commonware_storage::qmdb::current) module documentation for details.
9+
//! For sync, the engine targets the **ops root** (not the canonical root). The operations and proof
10+
//! format are identical to `any` -- the bitmap is reconstructed deterministically from the
11+
//! operations after sync completes. See the [Root structure](commonware_storage::qmdb::current)
12+
//! module documentation for details.
1313
//!
1414
//! This module re-uses the same [`Operation`] type as [`super::any`] since the underlying
1515
//! operations log is the same.
@@ -36,7 +36,8 @@ use tracing::error;
3636
const CHUNK_SIZE: usize = sha256::Digest::SIZE;
3737

3838
/// Database type alias.
39-
pub type Database<E> = current::unordered::fixed::Db<E, Key, Value, Hasher, Translator, CHUNK_SIZE>;
39+
pub type Database<E> =
40+
current::unordered::fixed::Db<mmr::Family, E, Key, Value, Hasher, Translator, CHUNK_SIZE>;
4041

4142
/// Operation type alias. Same as the `any` operation type.
4243
pub type Operation = FixedOperation<mmr::Family, Key, Value>;
@@ -45,7 +46,7 @@ pub type Operation = FixedOperation<mmr::Family, Key, Value>;
4546
pub fn create_config(context: &impl BufferPooler) -> Config<Translator> {
4647
let page_cache = buffer::paged::CacheRef::from_pooler(context, NZU16!(2048), NZUsize!(10));
4748
Config {
48-
mmr_config: MmrConfig {
49+
merkle_config: MmrConfig {
4950
journal_partition: "mmr-journal".into(),
5051
metadata_partition: "mmr-metadata".into(),
5152
items_per_blob: NZU64!(4096),
@@ -59,7 +60,7 @@ pub fn create_config(context: &impl BufferPooler) -> Config<Translator> {
5960
write_buffer: NZUsize!(4096),
6061
page_cache,
6162
},
62-
grafted_mmr_metadata_partition: "grafted-mmr-metadata".into(),
63+
grafted_metadata_partition: "grafted-mmr-metadata".into(),
6364
translator: Translator::default(),
6465
}
6566
}
@@ -68,6 +69,7 @@ impl<E> super::Syncable for Database<E>
6869
where
6970
E: Storage + Clock + Metrics,
7071
{
72+
type Family = mmr::Family;
7173
type Operation = Operation;
7274

7375
fn create_test_operations(count: usize, seed: u64) -> Vec<Self::Operation> {

examples/sync/src/databases/immutable.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ impl<E> super::Syncable for Database<E>
8787
where
8888
E: Storage + Clock + Metrics,
8989
{
90+
type Family = mmr::Family;
9091
type Operation = Operation;
9192

9293
fn create_test_operations(count: usize, seed: u64) -> Vec<Self::Operation> {

examples/sync/src/databases/mod.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use crate::Key;
44
use commonware_codec::Encode;
55
use commonware_storage::{
6-
mmr::{self, Location, Proof},
6+
merkle::{self, Location, Proof},
77
qmdb::{self, operation::Operation},
88
};
99
use std::{future::Future, num::NonZeroU64};
@@ -48,8 +48,11 @@ impl DatabaseType {
4848
/// Helper trait for databases that can be synced.
4949
#[allow(clippy::type_complexity)]
5050
pub trait Syncable: Sized {
51+
/// The merkle family used by this database.
52+
type Family: merkle::Family;
53+
5154
/// The type of operations in the database.
52-
type Operation: Operation<mmr::Family> + Encode + Sync + 'static;
55+
type Operation: Operation<Self::Family> + Encode + Sync + 'static;
5356

5457
/// Create test operations with the given count and seed.
5558
/// The returned operations must end with a commit operation.
@@ -60,30 +63,35 @@ pub trait Syncable: Sized {
6063
fn add_operations(
6164
&mut self,
6265
operations: Vec<Self::Operation>,
63-
) -> impl Future<Output = Result<(), qmdb::Error<mmr::Family>>>;
66+
) -> impl Future<Output = Result<(), qmdb::Error<Self::Family>>>;
6467

6568
/// Get the database's root digest.
6669
fn root(&self) -> Key;
6770

6871
/// Get the total number of operations in the database (including pruned operations).
69-
fn size(&self) -> impl Future<Output = Location> + Send;
72+
fn size(&self) -> impl Future<Output = Location<Self::Family>> + Send;
7073

7174
/// Get the inactivity floor, the location below which all operations are inactive.
72-
fn inactivity_floor(&self) -> impl Future<Output = Location> + Send;
75+
fn inactivity_floor(&self) -> impl Future<Output = Location<Self::Family>> + Send;
7376

7477
/// Get historical proof and operations.
7578
fn historical_proof(
7679
&self,
77-
op_count: Location,
78-
start_loc: Location,
80+
op_count: Location<Self::Family>,
81+
start_loc: Location<Self::Family>,
7982
max_ops: NonZeroU64,
80-
) -> impl Future<Output = Result<(Proof<Key>, Vec<Self::Operation>), qmdb::Error<mmr::Family>>> + Send;
83+
) -> impl Future<
84+
Output = Result<
85+
(Proof<Self::Family, Key>, Vec<Self::Operation>),
86+
qmdb::Error<Self::Family>,
87+
>,
88+
> + Send;
8189

82-
/// Get the pinned MMR nodes for a lower operation boundary of `loc`.
90+
/// Get the pinned nodes for a lower operation boundary of `loc`.
8391
fn pinned_nodes_at(
8492
&self,
85-
loc: Location,
86-
) -> impl Future<Output = Result<Vec<Key>, qmdb::Error<mmr::Family>>> + Send;
93+
loc: Location<Self::Family>,
94+
) -> impl Future<Output = Result<Vec<Key>, qmdb::Error<Self::Family>>> + Send;
8795

8896
/// Get the database type name for logging.
8997
fn name() -> &'static str;

storage/fuzz/fuzz_targets/current_crash_recovery.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use commonware_runtime::{
1515
};
1616
use commonware_storage::{
1717
journal::contiguous::variable::Config as VConfig,
18-
mmr::{journaled::Config as MmrConfig, Location},
18+
mmr::{self, journaled::Config as MmrConfig, Location},
1919
qmdb::current::{unordered::variable::Db as Current, VariableConfig},
2020
translator::TwoCap,
2121
};
@@ -34,7 +34,7 @@ type RawValue = [u8; 32];
3434
/// Maximum write buffer size.
3535
const MAX_WRITE_BUF: usize = 2048;
3636

37-
type Db = Current<deterministic::Context, Key, Value, Sha256, TwoCap, 32>;
37+
type Db = Current<mmr::Family, deterministic::Context, Key, Value, Sha256, TwoCap, 32>;
3838

3939
fn bounded_page_size(u: &mut Unstructured<'_>) -> Result<u16> {
4040
u.int_in_range(1..=256)
@@ -98,7 +98,7 @@ fn make_config(
9898
) -> VariableConfig<TwoCap, ((), ())> {
9999
let page_cache = CacheRef::from_pooler(ctx, page_size, page_cache_size);
100100
VariableConfig {
101-
mmr_config: MmrConfig {
101+
merkle_config: MmrConfig {
102102
journal_partition: format!("crash-mmr-journal-{suffix}"),
103103
metadata_partition: format!("crash-mmr-metadata-{suffix}"),
104104
items_per_blob: NZU64!(mmr_items_per_blob),
@@ -114,7 +114,7 @@ fn make_config(
114114
codec_config: ((), ()),
115115
page_cache,
116116
},
117-
grafted_mmr_metadata_partition: format!("crash-grafted-mmr-metadata-{suffix}"),
117+
grafted_metadata_partition: format!("crash-grafted-mmr-metadata-{suffix}"),
118118
translator: TwoCap,
119119
}
120120
}

storage/fuzz/fuzz_targets/current_ordered_operations.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
55
use commonware_runtime::{buffer::paged::CacheRef, deterministic, Runner};
66
use commonware_storage::{
77
journal::contiguous::fixed::Config as FConfig,
8-
mmr::{journaled::Config as MmrConfig, Location},
8+
mmr::{self, journaled::Config as MmrConfig, Location},
99
qmdb::current::{ordered::fixed::Db as CurrentDb, FixedConfig as Config},
1010
translator::TwoCap,
1111
};
@@ -20,7 +20,7 @@ type Key = FixedBytes<32>;
2020
type Value = FixedBytes<32>;
2121
type RawKey = [u8; 32];
2222
type RawValue = [u8; 32];
23-
type Db = CurrentDb<deterministic::Context, Key, Value, Sha256, TwoCap, 32>;
23+
type Db = CurrentDb<mmr::Family, deterministic::Context, Key, Value, Sha256, TwoCap, 32>;
2424

2525
#[derive(Arbitrary, Debug, Clone)]
2626
enum CurrentOperation {
@@ -115,7 +115,7 @@ fn fuzz(data: FuzzInput) {
115115
NZUsize!(PAGE_CACHE_SIZE),
116116
);
117117
let cfg = Config {
118-
mmr_config: MmrConfig {
118+
merkle_config: MmrConfig {
119119
journal_partition: "fuzz-current-mmr-journal".into(),
120120
metadata_partition: "fuzz-current-mmr-metadata".into(),
121121
items_per_blob: NZU64!(MMR_ITEMS_PER_BLOB),
@@ -129,7 +129,7 @@ fn fuzz(data: FuzzInput) {
129129
write_buffer: NZUsize!(WRITE_BUFFER_SIZE),
130130
page_cache,
131131
},
132-
grafted_mmr_metadata_partition: "fuzz-current-grafted-mmr-metadata".into(),
132+
grafted_metadata_partition: "fuzz-current-grafted-mmr-metadata".into(),
133133
translator: TwoCap,
134134
};
135135

storage/fuzz/fuzz_targets/current_unordered_batch_root.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use commonware_cryptography::Sha256;
55
use commonware_runtime::{buffer::paged::CacheRef, deterministic, BufferPooler, Runner};
66
use commonware_storage::{
77
journal::contiguous::fixed::Config as FConfig,
8-
mmr::journaled::Config as MmrConfig,
8+
mmr::{self, journaled::Config as MmrConfig},
99
qmdb::current::{unordered::fixed::Db as CurrentDb, FixedConfig as Config},
1010
translator::OneCap,
1111
};
@@ -15,7 +15,7 @@ use std::num::NonZeroU16;
1515

1616
type Key = FixedBytes<32>;
1717
type Value = FixedBytes<32>;
18-
type Db = CurrentDb<deterministic::Context, Key, Value, Sha256, OneCap, 32>;
18+
type Db = CurrentDb<mmr::Family, deterministic::Context, Key, Value, Sha256, OneCap, 32>;
1919

2020
const PAGE_SIZE: NonZeroU16 = NZU16!(137);
2121
const COLLISION_GROUPS: u8 = 4;
@@ -76,7 +76,7 @@ impl<'a> Arbitrary<'a> for FuzzInput {
7676
fn test_config(name: &str, pooler: &impl BufferPooler) -> Config<OneCap> {
7777
let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, NZUsize!(2));
7878
Config {
79-
mmr_config: MmrConfig {
79+
merkle_config: MmrConfig {
8080
journal_partition: format!("{name}-mmr"),
8181
metadata_partition: format!("{name}-meta"),
8282
items_per_blob: NZU64!(17),
@@ -90,7 +90,7 @@ fn test_config(name: &str, pooler: &impl BufferPooler) -> Config<OneCap> {
9090
write_buffer: NZUsize!(1024),
9191
page_cache,
9292
},
93-
grafted_mmr_metadata_partition: format!("{name}-grafted"),
93+
grafted_metadata_partition: format!("{name}-grafted"),
9494
translator: OneCap,
9595
}
9696
}

storage/fuzz/fuzz_targets/current_unordered_operations.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
55
use commonware_runtime::{buffer::paged::CacheRef, deterministic, Runner};
66
use commonware_storage::{
77
journal::contiguous::fixed::Config as FConfig,
8-
mmr::{journaled::Config as MmrConfig, Location},
8+
mmr::{self, journaled::Config as MmrConfig, Location},
99
qmdb::current::{unordered::fixed::Db as CurrentDb, FixedConfig as Config},
1010
translator::TwoCap,
1111
};
@@ -20,7 +20,7 @@ type Key = FixedBytes<32>;
2020
type Value = FixedBytes<32>;
2121
type RawKey = [u8; 32];
2222
type RawValue = [u8; 32];
23-
type Db = CurrentDb<deterministic::Context, Key, Value, Sha256, TwoCap, 32>;
23+
type Db = CurrentDb<mmr::Family, deterministic::Context, Key, Value, Sha256, TwoCap, 32>;
2424

2525
#[derive(Arbitrary, Debug, Clone)]
2626
enum CurrentOperation {
@@ -105,7 +105,7 @@ fn fuzz(data: FuzzInput) {
105105
NZUsize!(PAGE_CACHE_SIZE),
106106
);
107107
let cfg = Config {
108-
mmr_config: MmrConfig {
108+
merkle_config: MmrConfig {
109109
journal_partition: "fuzz-current-mmr-journal".into(),
110110
metadata_partition: "fuzz-current-mmr-metadata".into(),
111111
items_per_blob: NZU64!(MMR_ITEMS_PER_BLOB),
@@ -119,7 +119,7 @@ fn fuzz(data: FuzzInput) {
119119
write_buffer: NZUsize!(WRITE_BUFFER_SIZE),
120120
page_cache,
121121
},
122-
grafted_mmr_metadata_partition: "fuzz-current-grafted-mmr-metadata".into(),
122+
grafted_metadata_partition: "fuzz-current-grafted-mmr-metadata".into(),
123123
translator: TwoCap,
124124
};
125125

0 commit comments

Comments
 (0)