Skip to content

Commit d90fe22

Browse files
authored
[Storage] Implement state sync for qmdb::keyless (#3587)
1 parent 2cff773 commit d90fe22

9 files changed

Lines changed: 1448 additions & 14 deletions

File tree

examples/sync/src/bin/client.rs

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use commonware_runtime::{
1010
};
1111
use commonware_storage::qmdb::sync;
1212
use commonware_sync::{
13-
any, crate_version, current, databases::DatabaseType, immutable, net::Resolver, Digest, Error,
14-
Key,
13+
any, crate_version, current, databases::DatabaseType, immutable, keyless, net::Resolver,
14+
Digest, Error, Key,
1515
};
1616
use commonware_utils::{
1717
channel::mpsc::{self, error::TrySendError},
@@ -299,6 +299,69 @@ where
299299
}
300300
}
301301

302+
/// Repeatedly sync a Keyless database to the server's state.
303+
async fn run_keyless<E>(context: E, config: Config) -> Result<(), Box<dyn std::error::Error>>
304+
where
305+
E: BufferPooler + Storage + Clock + Metrics + Network + Spawner,
306+
{
307+
info!("starting Keyless database sync process");
308+
let mut iteration = 0u32;
309+
loop {
310+
let resolver = Resolver::<keyless::Operation, Key>::connect(
311+
context.with_label("resolver"),
312+
config.server,
313+
)
314+
.await?;
315+
316+
let initial_target = resolver.get_sync_target().await?;
317+
318+
let db_config = keyless::create_config(&context);
319+
let (update_sender, update_receiver) = mpsc::channel(UPDATE_CHANNEL_SIZE);
320+
321+
let target_update_handle = {
322+
let resolver = resolver.clone();
323+
let initial_target_clone = initial_target.clone();
324+
let target_update_interval = config.target_update_interval;
325+
context.with_label("target_update").spawn(move |context| {
326+
target_update_task(
327+
context,
328+
resolver,
329+
update_sender,
330+
target_update_interval,
331+
initial_target_clone,
332+
)
333+
})
334+
};
335+
336+
let sync_config =
337+
sync::engine::Config::<keyless::Database<_>, Resolver<keyless::Operation, Key>> {
338+
context: context.with_label("sync"),
339+
db_config,
340+
fetch_batch_size: config.batch_size,
341+
target: initial_target,
342+
resolver,
343+
apply_batch_size: 1024,
344+
max_outstanding_requests: config.max_outstanding_requests,
345+
update_rx: Some(update_receiver),
346+
finish_rx: None,
347+
reached_target_tx: None,
348+
max_retained_roots: 8,
349+
};
350+
351+
let database: keyless::Database<_> = sync::sync(sync_config).await?;
352+
let got_root = database.root();
353+
info!(
354+
sync_iteration = iteration,
355+
root = %got_root,
356+
sync_interval = ?config.sync_interval,
357+
"✅ Keyless sync completed successfully"
358+
);
359+
target_update_handle.abort();
360+
context.sleep(config.sync_interval).await;
361+
iteration += 1;
362+
}
363+
}
364+
302365
fn parse_config() -> Result<Config, Box<dyn std::error::Error>> {
303366
// Parse command line arguments
304367
let matches = Command::new("Sync Client")
@@ -307,8 +370,8 @@ fn parse_config() -> Result<Config, Box<dyn std::error::Error>> {
307370
.arg(
308371
Arg::new("db")
309372
.long("db")
310-
.value_name("any|current|immutable")
311-
.help("Database type to use. Must be `any`, `current`, or `immutable`.")
373+
.value_name("any|current|immutable|keyless")
374+
.help("Database type to use. Must be `any`, `current`, `immutable`, or `keyless`.")
312375
.default_value("any"),
313376
)
314377
.arg(
@@ -468,6 +531,7 @@ fn main() {
468531
DatabaseType::Any => run_any(context.with_label("sync"), config).await,
469532
DatabaseType::Current => run_current(context.with_label("sync"), config).await,
470533
DatabaseType::Immutable => run_immutable(context.with_label("sync"), config).await,
534+
DatabaseType::Keyless => run_keyless(context.with_label("sync"), config).await,
471535
};
472536

473537
if let Err(err) = result {

examples/sync/src/bin/server.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//! Server that serves operations and proofs to clients attempting to sync an
2-
//! `any`, `current`, or `immutable` database.
2+
//! `any`, `current`, `immutable`, or `keyless` database.
33
44
use clap::{Arg, Command};
55
use commonware_codec::{DecodeExt, Encode, Read};
@@ -13,7 +13,7 @@ use commonware_stream::utils::codec::{recv_frame, send_frame};
1313
use commonware_sync::{
1414
any, crate_version, current,
1515
databases::{DatabaseType, Syncable},
16-
immutable,
16+
immutable, keyless,
1717
net::{wire, ErrorCode, ErrorResponse, MAX_MESSAGE_SIZE},
1818
Error, Key,
1919
};
@@ -537,6 +537,17 @@ where
537537
run_helper(context, config, database).await
538538
}
539539

540+
/// Run the Keyless database server.
541+
async fn run_keyless<E>(context: E, config: Config) -> Result<(), Box<dyn std::error::Error>>
542+
where
543+
E: BufferPooler + Storage + Clock + Metrics + Network + Spawner + RngCore + Clone,
544+
{
545+
let db_config = keyless::create_config(&context);
546+
let database = keyless::Database::init(context.with_label("database"), db_config).await?;
547+
548+
run_helper(context, config, database).await
549+
}
550+
540551
/// Parse command line arguments and return configuration.
541552
fn parse_config() -> Result<Config, Box<dyn std::error::Error>> {
542553
// Parse command line arguments
@@ -546,8 +557,8 @@ fn parse_config() -> Result<Config, Box<dyn std::error::Error>> {
546557
.arg(
547558
Arg::new("db")
548559
.long("db")
549-
.value_name("any|current|immutable")
550-
.help("Database type to use. Must be `any`, `current`, or `immutable`.")
560+
.value_name("any|current|immutable|keyless")
561+
.help("Database type to use. Must be `any`, `current`, `immutable`, or `keyless`.")
551562
.default_value("any"),
552563
)
553564
.arg(
@@ -680,6 +691,7 @@ fn main() {
680691
DatabaseType::Any => run_any(context, config).await,
681692
DatabaseType::Current => run_current(context, config).await,
682693
DatabaseType::Immutable => run_immutable(context, config).await,
694+
DatabaseType::Keyless => run_keyless(context, config).await,
683695
};
684696

685697
if let Err(err) = result {
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
//! Keyless database types and helpers for the sync example.
2+
//!
3+
//! A `keyless` database is append-only: operations are stored by location rather than by key.
4+
//! It supports `Append(value)` and `Commit(metadata)` operations. For sync, the engine targets
5+
//! the Merkle root over all operations, and the client reconstructs the same state by replaying
6+
//! the fetched operations.
7+
8+
use crate::{Hasher, Key, Value};
9+
use commonware_cryptography::{Hasher as CryptoHasher, Sha256};
10+
use commonware_runtime::{buffer, BufferPooler, Clock, Metrics, Storage};
11+
use commonware_storage::{
12+
journal::contiguous::fixed::Config as FConfig,
13+
merkle::{
14+
journaled::Config as MmrConfig,
15+
mmr::{self, Location, Proof},
16+
},
17+
qmdb::{
18+
self,
19+
keyless::{self, fixed},
20+
operation::Committable,
21+
},
22+
};
23+
use commonware_utils::{NZUsize, NZU16, NZU64};
24+
use std::num::NonZeroU64;
25+
use tracing::error;
26+
27+
/// Database type alias.
28+
pub type Database<E> = fixed::Db<mmr::Family, E, Value, Hasher>;
29+
30+
/// Operation type alias.
31+
pub type Operation = fixed::Operation<Value>;
32+
33+
/// Create a database configuration for the keyless variant.
34+
pub fn create_config(context: &(impl BufferPooler + commonware_runtime::Metrics)) -> fixed::Config {
35+
let page_cache = buffer::paged::CacheRef::from_pooler(
36+
&context.with_label("page_cache"),
37+
NZU16!(2048),
38+
NZUsize!(10),
39+
);
40+
keyless::Config {
41+
merkle: MmrConfig {
42+
journal_partition: "mmr-journal".into(),
43+
metadata_partition: "mmr-metadata".into(),
44+
items_per_blob: NZU64!(4096),
45+
write_buffer: NZUsize!(4096),
46+
thread_pool: None,
47+
page_cache: page_cache.clone(),
48+
},
49+
log: FConfig {
50+
partition: "log-journal".into(),
51+
items_per_blob: NZU64!(4096),
52+
write_buffer: NZUsize!(4096),
53+
page_cache,
54+
},
55+
}
56+
}
57+
58+
/// Create deterministic test operations for demonstration purposes.
59+
/// Generates Append operations and periodic Commit operations.
60+
pub fn create_test_operations(count: usize, seed: u64) -> Vec<Operation> {
61+
let mut operations = Vec::new();
62+
let mut hasher = <Hasher as CryptoHasher>::new();
63+
64+
for i in 0..count {
65+
let value = {
66+
hasher.update(&i.to_be_bytes());
67+
hasher.update(&seed.to_be_bytes());
68+
hasher.finalize()
69+
};
70+
71+
operations.push(Operation::Append(value));
72+
73+
if (i + 1) % 10 == 0 {
74+
operations.push(Operation::Commit(None));
75+
}
76+
}
77+
78+
// Always end with a commit
79+
operations.push(Operation::Commit(Some(Sha256::fill(1))));
80+
operations
81+
}
82+
83+
impl<E> super::Syncable for Database<E>
84+
where
85+
E: Storage + Clock + Metrics,
86+
{
87+
type Family = mmr::Family;
88+
type Operation = Operation;
89+
90+
fn create_test_operations(count: usize, seed: u64) -> Vec<Self::Operation> {
91+
create_test_operations(count, seed)
92+
}
93+
94+
async fn add_operations(
95+
&mut self,
96+
operations: Vec<Self::Operation>,
97+
) -> Result<(), qmdb::Error<mmr::Family>> {
98+
if operations.last().is_none() || !operations.last().unwrap().is_commit() {
99+
// Ignore bad inputs rather than return errors.
100+
error!("operations must end with a commit");
101+
return Ok(());
102+
}
103+
104+
let mut batch = self.new_batch();
105+
for operation in operations {
106+
match operation {
107+
Operation::Append(value) => {
108+
batch = batch.append(value);
109+
}
110+
Operation::Commit(metadata) => {
111+
let merkleized = batch.merkleize(self, metadata);
112+
self.apply_batch(merkleized).await?;
113+
self.commit().await?;
114+
batch = self.new_batch();
115+
}
116+
}
117+
}
118+
Ok(())
119+
}
120+
121+
fn root(&self) -> Key {
122+
self.root()
123+
}
124+
125+
async fn size(&self) -> Location {
126+
self.bounds().await.end
127+
}
128+
129+
async fn inactivity_floor(&self) -> Location {
130+
// Keyless databases have no inactivity floor concept.
131+
// Use the pruning boundary, same as immutable.
132+
self.bounds().await.start
133+
}
134+
135+
async fn historical_proof(
136+
&self,
137+
op_count: Location,
138+
start_loc: Location,
139+
max_ops: NonZeroU64,
140+
) -> Result<(Proof<Key>, Vec<Self::Operation>), qmdb::Error<mmr::Family>> {
141+
self.historical_proof(op_count, start_loc, max_ops).await
142+
}
143+
144+
async fn pinned_nodes_at(&self, loc: Location) -> Result<Vec<Key>, qmdb::Error<mmr::Family>> {
145+
self.pinned_nodes_at(loc).await
146+
}
147+
148+
fn name() -> &'static str {
149+
"keyless"
150+
}
151+
}
152+
153+
#[cfg(test)]
154+
mod tests {
155+
use super::*;
156+
use crate::databases::Syncable;
157+
use commonware_runtime::deterministic;
158+
159+
type KeylessDb = Database<deterministic::Context>;
160+
161+
#[test]
162+
fn test_create_test_operations() {
163+
let ops = <KeylessDb as Syncable>::create_test_operations(5, 12345);
164+
assert_eq!(ops.len(), 6); // 5 operations + 1 commit
165+
166+
if let Operation::Commit(Some(_)) = &ops[5] {
167+
// ok
168+
} else {
169+
panic!("last operation should be a commit with metadata");
170+
}
171+
}
172+
173+
#[test]
174+
fn test_deterministic_operations() {
175+
// Operations should be deterministic based on seed
176+
let ops1 = <KeylessDb as Syncable>::create_test_operations(3, 12345);
177+
let ops2 = <KeylessDb as Syncable>::create_test_operations(3, 12345);
178+
assert_eq!(ops1, ops2);
179+
180+
// Different seeds should produce different operations
181+
let ops3 = <KeylessDb as Syncable>::create_test_operations(3, 54321);
182+
assert_ne!(ops1, ops3);
183+
}
184+
}

examples/sync/src/databases/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,22 @@ use crate::Key;
44
use commonware_codec::Encode;
55
use commonware_storage::{
66
merkle::{self, Location, Proof},
7-
qmdb::{self, operation::Operation},
7+
qmdb,
88
};
99
use std::{future::Future, num::NonZeroU64};
1010

1111
pub mod any;
1212
pub mod current;
1313
pub mod immutable;
14+
pub mod keyless;
1415

1516
/// Database type to sync.
1617
#[derive(Debug, Clone, Copy)]
1718
pub enum DatabaseType {
1819
Any,
1920
Current,
2021
Immutable,
22+
Keyless,
2123
}
2224

2325
impl std::str::FromStr for DatabaseType {
@@ -28,8 +30,9 @@ impl std::str::FromStr for DatabaseType {
2830
"any" => Ok(Self::Any),
2931
"current" => Ok(Self::Current),
3032
"immutable" => Ok(Self::Immutable),
33+
"keyless" => Ok(Self::Keyless),
3134
_ => Err(format!(
32-
"Invalid database type: '{s}'. Must be 'any', 'current', or 'immutable'",
35+
"Invalid database type: '{s}'. Must be 'any', 'current', 'immutable', or 'keyless'",
3336
)),
3437
}
3538
}
@@ -41,6 +44,7 @@ impl DatabaseType {
4144
Self::Any => "any",
4245
Self::Current => "current",
4346
Self::Immutable => "immutable",
47+
Self::Keyless => "keyless",
4448
}
4549
}
4650
}
@@ -52,7 +56,7 @@ pub trait Syncable: Sized {
5256
type Family: merkle::Family;
5357

5458
/// The type of operations in the database.
55-
type Operation: Operation<Self::Family> + Encode + Sync + 'static;
59+
type Operation: Encode + Sync + 'static;
5660

5761
/// Create test operations with the given count and seed.
5862
/// The returned operations must end with a commit operation.

0 commit comments

Comments
 (0)