Skip to content

Commit 9642b4d

Browse files
generalize sync protocol on merkle family
1 parent 0726cac commit 9642b4d

41 files changed

Lines changed: 1744 additions & 584 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

examples/sync/src/bin/client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use commonware_codec::{EncodeShared, Read};
88
use commonware_runtime::{
99
tokio as tokio_runtime, BufferPooler, Clock, Metrics, Network, Runner, Spawner, Storage,
1010
};
11-
use commonware_storage::qmdb::sync;
11+
use commonware_storage::{mmr, qmdb::sync};
1212
use commonware_sync::{
1313
any, crate_version, current, databases::DatabaseType, immutable, keyless, net::Resolver,
1414
Digest, Error, Key,
@@ -60,9 +60,9 @@ struct Config {
6060
async fn target_update_task<E, Op, D>(
6161
context: E,
6262
resolver: Resolver<Op, D>,
63-
update_tx: mpsc::Sender<sync::Target<D>>,
63+
update_tx: mpsc::Sender<sync::Target<mmr::Family, D>>,
6464
interval_duration: Duration,
65-
initial_target: sync::Target<D>,
65+
initial_target: sync::Target<mmr::Family, D>,
6666
) -> Result<(), Error>
6767
where
6868
E: Clock,

examples/sync/src/bin/server.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,19 +160,19 @@ where
160160
state.request_counter.inc();
161161

162162
// Get the current database state
163-
let (root, inactivity_floor, size) = {
163+
let (root, sync_boundary, size) = {
164164
let database = state.database.read().await;
165165
(
166166
database.root(),
167-
database.inactivity_floor().await,
167+
database.sync_boundary().await,
168168
database.size().await,
169169
)
170170
};
171171
let response = wire::GetSyncTargetResponse::<Key> {
172172
request_id: request.request_id,
173173
target: Target {
174174
root,
175-
range: non_empty_range!(inactivity_floor, size),
175+
range: non_empty_range!(sync_boundary, size),
176176
},
177177
};
178178

@@ -430,7 +430,7 @@ where
430430
.collect::<String>();
431431
info!(
432432
size = ?database.size().await,
433-
inactivity_floor = ?database.inactivity_floor().await,
433+
sync_boundary = ?database.sync_boundary().await,
434434
root = %root_hex,
435435
"{} database ready",
436436
DB::name()

examples/sync/src/databases/any.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ where
123123
self.bounds().await.end
124124
}
125125

126-
async fn inactivity_floor(&self) -> Location {
127-
self.inactivity_floor_loc()
126+
async fn sync_boundary(&self) -> Location {
127+
self.sync_boundary()
128128
}
129129

130130
fn historical_proof(

examples/sync/src/databases/current.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,9 @@ where
139139
self.bounds().await.end
140140
}
141141

142-
async fn inactivity_floor(&self) -> Location {
143-
self.inactivity_floor_loc()
142+
async fn sync_boundary(&self) -> Location {
143+
self.sync_boundary()
144+
.expect("sync_boundary should not overflow")
144145
}
145146

146147
fn historical_proof(

examples/sync/src/databases/immutable.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,8 @@ where
129129
self.bounds().await.end
130130
}
131131

132-
async fn inactivity_floor(&self) -> Location {
133-
// For Immutable databases, all retained operations are active,
134-
// so the inactivity floor equals the pruning boundary.
135-
self.bounds().await.start
132+
async fn sync_boundary(&self) -> Location {
133+
self.sync_boundary().await
136134
}
137135

138136
fn historical_proof(

examples/sync/src/databases/keyless.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,8 @@ where
126126
self.bounds().await.end
127127
}
128128

129-
async fn inactivity_floor(&self) -> Location {
130-
// Keyless databases have no inactivity floor concept.
131-
// Use the pruning boundary, same as immutable.
132-
self.bounds().await.start
129+
async fn sync_boundary(&self) -> Location {
130+
self.sync_boundary().await
133131
}
134132

135133
async fn historical_proof(

examples/sync/src/databases/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,11 @@ pub trait Syncable: Sized {
7575
/// Get the total number of operations in the database (including pruned operations).
7676
fn size(&self) -> impl Future<Output = Location<Self::Family>> + Send;
7777

78-
/// Get the inactivity floor, the location below which all operations are inactive.
79-
fn inactivity_floor(&self) -> impl Future<Output = Location<Self::Family>> + Send;
78+
/// Get the most recent location from which this database can safely be synced.
79+
///
80+
/// Callers constructing a sync target should use this value (or any earlier retained
81+
/// location) as the `range.start`.
82+
fn sync_boundary(&self) -> impl Future<Output = Location<Self::Family>> + Send;
8083

8184
/// Get historical proof and operations.
8285
fn historical_proof(

examples/sync/src/net/resolver.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ use crate::net::request_id;
33
use commonware_codec::{EncodeShared, IsUnit, Read};
44
use commonware_cryptography::Digest;
55
use commonware_runtime::{Network, Spawner};
6-
use commonware_storage::{mmr::Location, qmdb::sync};
6+
use commonware_storage::{
7+
mmr::{self, Location},
8+
qmdb::sync,
9+
};
710
use commonware_utils::channel::{mpsc, oneshot};
811
use std::num::NonZeroU64;
912

@@ -42,7 +45,7 @@ where
4245
}
4346

4447
/// Returns the current sync target from the server.
45-
pub async fn get_sync_target(&self) -> Result<sync::Target<D>, crate::Error> {
48+
pub async fn get_sync_target(&self) -> Result<sync::Target<mmr::Family, D>, crate::Error> {
4649
let request_id = self.request_id_generator.next();
4750
let request =
4851
wire::Message::GetSyncTargetRequest(wire::GetSyncTargetRequest { request_id });
@@ -75,6 +78,7 @@ where
7578
Op::Cfg: IsUnit,
7679
D: Digest,
7780
{
81+
type Family = mmr::Family;
7882
type Digest = D;
7983
type Op = Op;
8084
type Error = crate::Error;
@@ -86,7 +90,8 @@ where
8690
max_ops: NonZeroU64,
8791
include_pinned_nodes: bool,
8892
_cancel_rx: oneshot::Receiver<()>,
89-
) -> Result<sync::resolver::FetchResult<Self::Op, Self::Digest>, Self::Error> {
93+
) -> Result<sync::resolver::FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error>
94+
{
9095
let request_id = self.request_id_generator.next();
9196
let request = wire::Message::GetOperationsRequest(wire::GetOperationsRequest {
9297
request_id,

examples/sync/src/net/wire.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use commonware_codec::{
55
use commonware_cryptography::Digest;
66
use commonware_runtime::{Buf, BufMut};
77
use commonware_storage::{
8-
mmr::{Location, Proof},
8+
mmr::{self, Location, Proof},
99
qmdb::sync::Target,
1010
};
1111
use std::num::NonZeroU64;
@@ -51,7 +51,7 @@ where
5151
D: Digest,
5252
{
5353
pub request_id: RequestId,
54-
pub target: Target<D>,
54+
pub target: Target<mmr::Family, D>,
5555
}
5656

5757
/// Messages that can be sent over the wire.
@@ -334,7 +334,7 @@ where
334334
type Cfg = ();
335335
fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
336336
let request_id = RequestId::read_cfg(buf, &())?;
337-
let target = Target::<D>::read_cfg(buf, &())?;
337+
let target = Target::<mmr::Family, D>::read_cfg(buf, &())?;
338338
Ok(Self { request_id, target })
339339
}
340340
}

storage/conformance.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ hash = "c692a20fa40180844888e7a26401f99a45ce3127faeca5f1228a41b454424623"
158158
n_cases = 65536
159159
hash = "b41d6c6ec560bde9caf2e206526864c618a0721af367585a1719617ca7ce9291"
160160

161-
["commonware_storage::qmdb::sync::target::tests::conformance::CodecConformance<Target<sha256::Digest>>"]
161+
["commonware_storage::qmdb::sync::target::tests::conformance::CodecConformance<Target<MmrFamily,sha256::Digest>>"]
162162
n_cases = 65536
163163
hash = "f742d92a0af0af78a9519bf637bc52ea869965a85a84101a4c53f53eb39325ca"
164164

0 commit comments

Comments
 (0)