Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/sync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ curl http://localhost:9090/metrics
4. **Sync Iteration Loop**: For each sync iteration:
- **Database Initialization**: Client opens a new database (or reopens existing one)
- **Connection**: Client establishes connection to server
- **Initial Sync Target**: Client requests server metadata to determine sync target (inactivity floor, size, and root digest)
- **Initial Sync Target**: Client requests server metadata to determine sync target (sync boundary, size, and root digest)
- **Dynamic Target Updates**: Client periodically requests target updates during sync to handle new operations added by the server
- **Sync Completion**: Client continues until all operations are applied and state matches server's target
- **Database Closure**: Client closes the database to prepare for next iteration
Expand Down
6 changes: 3 additions & 3 deletions examples/sync/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use commonware_codec::{EncodeShared, Read};
use commonware_runtime::{
tokio as tokio_runtime, BufferPooler, Clock, Metrics, Network, Runner, Spawner, Storage,
};
use commonware_storage::qmdb::sync;
use commonware_storage::{mmr, qmdb::sync};
use commonware_sync::{
any, crate_version, current, databases::DatabaseType, immutable, keyless, net::Resolver,
Digest, Error, Key,
Expand Down Expand Up @@ -60,9 +60,9 @@ struct Config {
async fn target_update_task<E, Op, D>(
context: E,
resolver: Resolver<Op, D>,
update_tx: mpsc::Sender<sync::Target<D>>,
update_tx: mpsc::Sender<sync::Target<mmr::Family, D>>,
interval_duration: Duration,
initial_target: sync::Target<D>,
initial_target: sync::Target<mmr::Family, D>,
) -> Result<(), Error>
where
E: Clock,
Expand Down
8 changes: 4 additions & 4 deletions examples/sync/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,19 +160,19 @@ where
state.request_counter.inc();

// Get the current database state
let (root, inactivity_floor, size) = {
let (root, sync_boundary, size) = {
let database = state.database.read().await;
(
database.root(),
database.inactivity_floor().await,
database.sync_boundary().await,
database.size().await,
)
};
let response = wire::GetSyncTargetResponse::<Key> {
request_id: request.request_id,
target: Target {
root,
range: non_empty_range!(inactivity_floor, size),
range: non_empty_range!(sync_boundary, size),
},
};

Expand Down Expand Up @@ -430,7 +430,7 @@ where
.collect::<String>();
info!(
size = ?database.size().await,
inactivity_floor = ?database.inactivity_floor().await,
sync_boundary = ?database.sync_boundary().await,
root = %root_hex,
"{} database ready",
DB::name()
Expand Down
4 changes: 2 additions & 2 deletions examples/sync/src/databases/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ where
self.bounds().await.end
}

async fn inactivity_floor(&self) -> Location {
self.inactivity_floor_loc()
async fn sync_boundary(&self) -> Location {
self.sync_boundary()
}

fn historical_proof(
Expand Down
4 changes: 2 additions & 2 deletions examples/sync/src/databases/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ where
self.bounds().await.end
}

async fn inactivity_floor(&self) -> Location {
self.inactivity_floor_loc()
async fn sync_boundary(&self) -> Location {
self.sync_boundary()
}

fn historical_proof(
Expand Down
6 changes: 2 additions & 4 deletions examples/sync/src/databases/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,8 @@ where
self.bounds().await.end
}

async fn inactivity_floor(&self) -> Location {
// For Immutable databases, all retained operations are active,
// so the inactivity floor equals the pruning boundary.
self.bounds().await.start
async fn sync_boundary(&self) -> Location {
self.sync_boundary().await
}

fn historical_proof(
Expand Down
6 changes: 2 additions & 4 deletions examples/sync/src/databases/keyless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,8 @@ where
self.bounds().await.end
}

async fn inactivity_floor(&self) -> Location {
// Keyless databases have no inactivity floor concept.
// Use the pruning boundary, same as immutable.
self.bounds().await.start
async fn sync_boundary(&self) -> Location {
self.sync_boundary().await
}

async fn historical_proof(
Expand Down
7 changes: 5 additions & 2 deletions examples/sync/src/databases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ pub trait Syncable: Sized {
/// Get the total number of operations in the database (including pruned operations).
fn size(&self) -> impl Future<Output = Location<Self::Family>> + Send;

/// Get the inactivity floor, the location below which all operations are inactive.
fn inactivity_floor(&self) -> impl Future<Output = Location<Self::Family>> + Send;
/// Get the most recent location from which this database can safely be synced.
///
/// Callers constructing a sync target should use this value (or any earlier retained
/// location) as the `range.start`.
fn sync_boundary(&self) -> impl Future<Output = Location<Self::Family>> + Send;

/// Get historical proof and operations.
fn historical_proof(
Expand Down
11 changes: 8 additions & 3 deletions examples/sync/src/net/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use crate::net::request_id;
use commonware_codec::{EncodeShared, IsUnit, Read};
use commonware_cryptography::Digest;
use commonware_runtime::{Network, Spawner};
use commonware_storage::{mmr::Location, qmdb::sync};
use commonware_storage::{
mmr::{self, Location},
qmdb::sync,
};
use commonware_utils::channel::{mpsc, oneshot};
use std::num::NonZeroU64;

Expand Down Expand Up @@ -42,7 +45,7 @@ where
}

/// Returns the current sync target from the server.
pub async fn get_sync_target(&self) -> Result<sync::Target<D>, crate::Error> {
pub async fn get_sync_target(&self) -> Result<sync::Target<mmr::Family, D>, crate::Error> {
let request_id = self.request_id_generator.next();
let request =
wire::Message::GetSyncTargetRequest(wire::GetSyncTargetRequest { request_id });
Expand Down Expand Up @@ -75,6 +78,7 @@ where
Op::Cfg: IsUnit,
D: Digest,
{
type Family = mmr::Family;
type Digest = D;
type Op = Op;
type Error = crate::Error;
Expand All @@ -86,7 +90,8 @@ where
max_ops: NonZeroU64,
include_pinned_nodes: bool,
_cancel_rx: oneshot::Receiver<()>,
) -> Result<sync::resolver::FetchResult<Self::Op, Self::Digest>, Self::Error> {
) -> Result<sync::resolver::FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error>
{
let request_id = self.request_id_generator.next();
let request = wire::Message::GetOperationsRequest(wire::GetOperationsRequest {
request_id,
Expand Down
6 changes: 3 additions & 3 deletions examples/sync/src/net/wire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use commonware_codec::{
use commonware_cryptography::Digest;
use commonware_runtime::{Buf, BufMut};
use commonware_storage::{
mmr::{Location, Proof},
mmr::{self, Location, Proof},
qmdb::sync::Target,
};
use std::num::NonZeroU64;
Expand Down Expand Up @@ -51,7 +51,7 @@ where
D: Digest,
{
pub request_id: RequestId,
pub target: Target<D>,
pub target: Target<mmr::Family, D>,
}

/// Messages that can be sent over the wire.
Expand Down Expand Up @@ -334,7 +334,7 @@ where
type Cfg = ();
fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
let request_id = RequestId::read_cfg(buf, &())?;
let target = Target::<D>::read_cfg(buf, &())?;
let target = Target::<mmr::Family, D>::read_cfg(buf, &())?;
Ok(Self { request_id, target })
}
}
2 changes: 1 addition & 1 deletion storage/conformance.toml
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ hash = "c692a20fa40180844888e7a26401f99a45ce3127faeca5f1228a41b454424623"
n_cases = 65536
hash = "b41d6c6ec560bde9caf2e206526864c618a0721af367585a1719617ca7ce9291"

["commonware_storage::qmdb::sync::target::tests::conformance::CodecConformance<Target<sha256::Digest>>"]
["commonware_storage::qmdb::sync::target::tests::conformance::CodecConformance<Target<MmrFamily,sha256::Digest>>"]
n_cases = 65536
hash = "f742d92a0af0af78a9519bf637bc52ea869965a85a84101a4c53f53eb39325ca"

Expand Down
5 changes: 2 additions & 3 deletions storage/fuzz/fuzz_targets/current_crash_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,7 @@ fn fuzz(input: FuzzInput) {
{
break;
}
let floor = db.inactivity_floor_loc();
if db.prune(floor).await.is_err() {
if db.prune(db.sync_boundary()).await.is_err() {
break;
}
}
Expand Down Expand Up @@ -325,7 +324,7 @@ fn fuzz(input: FuzzInput) {
}

// Verify range proofs over the recovered DB.
let floor = *db.inactivity_floor_loc();
let floor = *db.sync_boundary();
let size = *db.bounds().await.end;
for i in floor..size {
let loc = Location::new(i);
Expand Down
2 changes: 1 addition & 1 deletion storage/fuzz/fuzz_targets/current_mmb_prune_grow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ async fn commit_pending(
}

async fn prune_to_floor(db: &mut Db, reference_db: &Db, context: &str) {
db.prune(db.inactivity_floor_loc())
db.prune(db.sync_boundary())
.await
.expect("prune should not fail");
assert_matches_reference(db, reference_db, context).await;
Expand Down
4 changes: 2 additions & 2 deletions storage/fuzz/fuzz_targets/current_ordered_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ fn fuzz(data: FuzzInput) {
&mut pending_inserts, &mut pending_deletes,
).await;
committed_op_count = db.bounds().await.end;
db.prune(db.inactivity_floor_loc()).await.expect("Prune should not fail");
db.prune(db.sync_boundary()).await.expect("Prune should not fail");
}

CurrentOperation::Root => {
Expand Down Expand Up @@ -243,7 +243,7 @@ fn fuzz(data: FuzzInput) {
let current_op_count = db.bounds().await.end;
let start_loc = Location::new(start_loc % *current_op_count);

let oldest_loc = db.inactivity_floor_loc();
let oldest_loc = db.sync_boundary();
if start_loc >= oldest_loc {
let (proof, ops, chunks) = db
.range_proof(&mut hasher, start_loc, *max_ops)
Expand Down
4 changes: 2 additions & 2 deletions storage/fuzz/fuzz_targets/current_unordered_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ fn fuzz(data: FuzzInput) {
CurrentOperation::Prune => {
commit_pending(&mut db, &mut pending_writes, &mut committed_state, &mut pending_expected).await;
committed_op_count = db.bounds().await.end;
db.prune(db.inactivity_floor_loc()).await.expect("Prune should not fail");
db.prune(db.sync_boundary()).await.expect("Prune should not fail");
}

CurrentOperation::Root => {
Expand All @@ -218,7 +218,7 @@ fn fuzz(data: FuzzInput) {

let current_op_count = db.bounds().await.end;
let start_loc = Location::new(start_loc % *current_op_count);
let oldest_loc = db.inactivity_floor_loc();
let oldest_loc = db.sync_boundary();
if start_loc >= oldest_loc {
let (proof, ops, chunks) = db
.range_proof(&mut hasher, start_loc, *max_ops)
Expand Down
4 changes: 2 additions & 2 deletions storage/fuzz/fuzz_targets/merkle_journaled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use commonware_storage::merkle::{
hasher::Standard, journaled::Config, mem::Mem, mmb, mmr, Error, Family as MerkleFamily,
Location, LocationRangeExt as _, Position,
};
use commonware_utils::{NZUsize, NZU16, NZU64};
use commonware_utils::{non_empty_range, NZUsize, NZU16, NZU64};
use libfuzzer_sys::fuzz_target;
use std::num::NonZeroU16;

Expand Down Expand Up @@ -356,7 +356,7 @@ fn fuzz_family<F: MerkleFamily>(input: &FuzzInput, suffix: &str) {
let sync_suffix = format!("{suffix}-sync");
let sync_config = SyncConfig::<F, _> {
config: test_config(&sync_suffix, &context),
range: lower_bound_loc..upper_bound_loc,
range: non_empty_range!(lower_bound_loc, upper_bound_loc),
pinned_nodes: None,
};

Expand Down
7 changes: 4 additions & 3 deletions storage/fuzz/fuzz_targets/qmdb_any_fixed_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,14 @@ fn test_config(test_name: &str, pooler: &impl BufferPooler) -> Config<TwoCap> {

async fn test_sync<
R: sync::resolver::Resolver<
Family = Family,
Digest = commonware_cryptography::sha256::Digest,
Op = FixedOperation<Family, Key, Value>,
>,
>(
context: deterministic::Context,
resolver: R,
target: sync::Target<commonware_cryptography::sha256::Digest>,
target: sync::Target<Family, commonware_cryptography::sha256::Digest>,
fetch_batch_size: u64,
test_name: &str,
sync_id: usize,
Expand Down Expand Up @@ -209,7 +210,7 @@ fn fuzz(mut input: FuzzInput) {
}

Operation::Prune => {
db.prune(db.inactivity_floor_loc())
db.prune(db.sync_boundary())
.await
.expect("Prune should not fail");
}
Expand All @@ -235,7 +236,7 @@ fn fuzz(mut input: FuzzInput) {
db.commit().await.expect("Commit should not fail");
let target = sync::Target {
root: db.root(),
range: non_empty_range!(db.inactivity_floor_loc(), db.bounds().await.end),
range: non_empty_range!(db.sync_boundary(), db.bounds().await.end),
};

let wrapped_src = Arc::new(db);
Expand Down
6 changes: 3 additions & 3 deletions storage/fuzz/fuzz_targets/qmdb_any_variable_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ fn fuzz(input: FuzzInput) {
}

Operation::Prune => {
db.prune(db.inactivity_floor_loc())
db.prune(db.sync_boundary())
.await
.expect("Prune should not fail");
}
Expand All @@ -230,7 +230,7 @@ fn fuzz(input: FuzzInput) {
db.commit().await.expect("Commit should not fail");
historical_roots.insert(db.bounds().await.end, db.root());
let op_count = db.bounds().await.end;
let oldest_retained_loc = db.inactivity_floor_loc();
let oldest_retained_loc = db.sync_boundary();
if *start_loc >= oldest_retained_loc && *start_loc < *op_count {
if let Ok((proof, log)) = db.proof(*start_loc, *max_ops).await {
let root = db.root();
Expand Down Expand Up @@ -291,7 +291,7 @@ fn fuzz(input: FuzzInput) {
}

Operation::InactivityFloorLoc => {
let _ = db.inactivity_floor_loc();
let _ = db.sync_boundary();
}

Operation::OpCount => {
Expand Down
Loading
Loading