Skip to content

Commit 94bba2b

Browse files
[storage/qmdb/sync] Avoid requiring a trusted ops root in current qmdb sync (#3778)
Co-authored-by: Patrick O'Grady <me@patrickogrady.xyz>
1 parent 468b7a9 commit 94bba2b

32 files changed

Lines changed: 3890 additions & 2913 deletions

File tree

examples/sync/src/bin/client.rs

Lines changed: 112 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ use commonware_runtime::{
1111
};
1212
use commonware_storage::{
1313
mmr,
14-
qmdb::sync::{self, compact},
14+
qmdb::{
15+
any::sync::Target,
16+
current as current_qmdb,
17+
sync::{self, compact},
18+
},
1519
};
1620
use commonware_sync::{
1721
any, crate_version, current,
@@ -70,9 +74,9 @@ struct Config {
7074
async fn target_update_task<E, Op, D>(
7175
context: E,
7276
resolver: Resolver<Op, D>,
73-
update_tx: mpsc::Sender<sync::Target<mmr::Family, D>>,
77+
update_tx: mpsc::Sender<Target<mmr::Family, D>>,
7478
interval_duration: Duration,
75-
initial_target: sync::Target<mmr::Family, D>,
79+
initial_target: Target<mmr::Family, D>,
7680
) -> Result<(), Error>
7781
where
7882
E: Clock,
@@ -132,8 +136,8 @@ where
132136
E,
133137
Config,
134138
Resolver<Op, Key>,
135-
sync::Target<mmr::Family, Key>,
136-
mpsc::Receiver<sync::Target<mmr::Family, Key>>,
139+
Target<mmr::Family, Key>,
140+
mpsc::Receiver<Target<mmr::Family, Key>>,
137141
u32,
138142
) -> SyncFut,
139143
SyncFut: Future<Output = Result<DB, Box<dyn std::error::Error>>>,
@@ -188,7 +192,7 @@ where
188192
|context, config, resolver, initial_target, update_receiver, iteration| async move {
189193
let db_config = any::create_config(&context);
190194
let sync_config =
191-
sync::engine::Config::<any::Database<_>, Resolver<any::Operation, Key>> {
195+
sync::engine::Config::<any::Database<_>, Resolver<any::Operation, Key>, _> {
192196
context,
193197
db_config,
194198
fetch_batch_size: config.batch_size,
@@ -217,44 +221,94 @@ where
217221

218222
/// Repeatedly sync a Current database to the server's state.
219223
///
220-
/// The sync engine targets the ops root (not the canonical root). After sync completes,
221-
/// the bitmap and grafted MMR are reconstructed from the synced operations.
224+
/// Uses the `current::sync::sync` wrapper. The wrapper verifies each target's `OpsRootWitness`
225+
/// before forwarding its ops root to the shared sync engine, then checks the database root for the
226+
/// target the engine finishes on.
222227
async fn run_current<E>(context: E, config: Config) -> Result<(), Box<dyn std::error::Error>>
223228
where
224229
E: BufferPooler + Storage + Clock + Metrics + Network + Spawner,
225230
{
226-
run_full_sync::<current::Database<_>, current::Operation, _, _, _>(
227-
context,
228-
config,
229-
|context, config, resolver, initial_target, update_receiver, iteration| async move {
230-
let db_config = current::create_config(&context);
231-
let sync_config =
232-
sync::engine::Config::<current::Database<_>, Resolver<current::Operation, Key>> {
233-
context,
234-
db_config,
235-
fetch_batch_size: config.batch_size,
236-
target: initial_target,
237-
resolver,
238-
apply_batch_size: 1024,
239-
max_outstanding_requests: config.max_outstanding_requests,
240-
update_rx: Some(update_receiver),
241-
finish_rx: None,
242-
reached_target_tx: None,
243-
max_retained_roots: 8,
244-
};
245-
let database: current::Database<_> = sync::sync(sync_config).await?;
246-
info!(
247-
sync_iteration = iteration,
248-
canonical_root = %database.root(),
249-
ops_root = %database.ops_root(),
250-
sync_interval = ?config.sync_interval,
251-
"Current sync completed successfully"
252-
);
253-
Ok(database)
254-
},
255-
"Current database",
256-
)
257-
.await
231+
info!("starting Current database sync process");
232+
let mut iteration = 0u32;
233+
loop {
234+
let resolver =
235+
Resolver::<current::Operation, Key>::connect(context.child("resolver"), config.server)
236+
.await?;
237+
238+
let initial_target = resolver.get_current_sync_target().await?;
239+
info!(
240+
root = %initial_target.root,
241+
ops_root = %initial_target.ops_root,
242+
range = ?initial_target.range,
243+
"received current sync target"
244+
);
245+
246+
let (update_sender, update_receiver) = mpsc::channel(UPDATE_CHANNEL_SIZE);
247+
248+
let target_update_handle = {
249+
let resolver = resolver.clone();
250+
let mut current_target_root = initial_target.root;
251+
let target_update_interval = config.target_update_interval;
252+
context
253+
.child("target_update")
254+
.spawn(move |context| async move {
255+
loop {
256+
context.sleep(target_update_interval).await;
257+
match resolver.get_current_sync_target().await {
258+
Ok(new_target) => {
259+
if current_target_root != new_target.root {
260+
let new_root = new_target.root;
261+
match update_sender.clone().try_send(new_target) {
262+
Ok(()) => {
263+
info!("target updated");
264+
current_target_root = new_root;
265+
}
266+
Err(mpsc::error::TrySendError::Closed(_)) => return Ok(()),
267+
Err(err) => {
268+
warn!(?err, "failed to send target update");
269+
return Err(Error::TargetUpdateChannel {
270+
reason: err.to_string(),
271+
});
272+
}
273+
}
274+
}
275+
}
276+
Err(err) => {
277+
warn!(?err, "failed to get sync target from server");
278+
}
279+
}
280+
}
281+
})
282+
};
283+
284+
let db_config = current::create_config(&context);
285+
let database: current::Database<_> = current_qmdb::sync::sync(current_qmdb::sync::Config {
286+
context: context.child("sync"),
287+
resolver,
288+
target: initial_target,
289+
max_outstanding_requests: config.max_outstanding_requests,
290+
fetch_batch_size: config.batch_size,
291+
apply_batch_size: 1024,
292+
db_config,
293+
update_rx: Some(update_receiver),
294+
finish_rx: None,
295+
reached_target_tx: None,
296+
max_retained_roots: 8,
297+
})
298+
.await?;
299+
300+
target_update_handle.abort();
301+
info!(
302+
sync_iteration = iteration,
303+
root = %database.root(),
304+
ops_root = %database.ops_root(),
305+
sync_interval = ?config.sync_interval,
306+
"Current sync completed successfully"
307+
);
308+
database.destroy().await?;
309+
context.sleep(config.sync_interval).await;
310+
iteration += 1;
311+
}
258312
}
259313

260314
/// Repeatedly sync an Immutable database to the server's state.
@@ -270,6 +324,7 @@ where
270324
let sync_config = sync::engine::Config::<
271325
immutable::Database<_>,
272326
Resolver<immutable::Operation, Key>,
327+
_,
273328
> {
274329
context,
275330
db_config,
@@ -307,20 +362,23 @@ where
307362
config,
308363
|context, config, resolver, initial_target, update_receiver, iteration| async move {
309364
let db_config = keyless::create_config(&context);
310-
let sync_config =
311-
sync::engine::Config::<keyless::Database<_>, Resolver<keyless::Operation, Key>> {
312-
context,
313-
db_config,
314-
fetch_batch_size: config.batch_size,
315-
target: initial_target,
316-
resolver,
317-
apply_batch_size: 1024,
318-
max_outstanding_requests: config.max_outstanding_requests,
319-
update_rx: Some(update_receiver),
320-
finish_rx: None,
321-
reached_target_tx: None,
322-
max_retained_roots: 8,
323-
};
365+
let sync_config = sync::engine::Config::<
366+
keyless::Database<_>,
367+
Resolver<keyless::Operation, Key>,
368+
_,
369+
> {
370+
context,
371+
db_config,
372+
fetch_batch_size: config.batch_size,
373+
target: initial_target,
374+
resolver,
375+
apply_batch_size: 1024,
376+
max_outstanding_requests: config.max_outstanding_requests,
377+
update_rx: Some(update_receiver),
378+
finish_rx: None,
379+
reached_target_tx: None,
380+
max_retained_roots: 8,
381+
};
324382
let database: keyless::Database<_> = sync::sync(sync_config).await?;
325383
info!(
326384
sync_iteration = iteration,

examples/sync/src/bin/server.rs

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use commonware_runtime::{
1414
};
1515
use commonware_storage::{
1616
mmr,
17-
qmdb::sync::{compact, Target},
17+
qmdb::{any::sync::Target, sync::compact},
1818
};
1919
use commonware_stream::utils::codec::{recv_frame, send_frame};
2020
use commonware_sync::{
@@ -220,7 +220,7 @@ where
220220
state.request_counter.inc();
221221

222222
// Get the current database state
223-
let (root, sync_boundary, size) = {
223+
let (ops_root, sync_boundary, size) = {
224224
let database = state.database.read().await;
225225
(
226226
database.root(),
@@ -230,10 +230,7 @@ where
230230
};
231231
let response = wire::GetSyncTargetResponse::<Key> {
232232
request_id: request.request_id,
233-
target: Target {
234-
root,
235-
range: non_empty_range!(sync_boundary, size),
236-
},
233+
target: Target::new(ops_root, non_empty_range!(sync_boundary, size)),
237234
};
238235

239236
debug!(?response, "serving target update");
@@ -407,6 +404,49 @@ where
407404
}
408405
}
409406

407+
struct CurrentFullMode;
408+
409+
impl<E> ServeMode<current::Database<E>> for CurrentFullMode
410+
where
411+
E: Storage + Clock + Metrics + Send + Sync + 'static,
412+
{
413+
const LISTENING_MESSAGE: &'static str =
414+
"current server listening and continuously adding operations";
415+
const SHUTDOWN_MESSAGE: &'static str = "context shutdown, stopping current server";
416+
417+
async fn handle_message(
418+
state: &State<current::Database<E>>,
419+
message: wire::Message<current::Operation, Key>,
420+
) -> wire::Message<current::Operation, Key> {
421+
let request_id = message.request_id();
422+
match message {
423+
wire::Message::GetOperationsRequest(request) => dispatch_message!(
424+
state,
425+
request_id,
426+
wire::Message::GetOperationsResponse,
427+
handle_get_operations::<current::Database<E>>(state, request)
428+
),
429+
wire::Message::GetSyncTargetRequest(request) => {
430+
state.request_counter.inc();
431+
let database = state.database.read().await;
432+
match current::current_sync_target(&*database).await {
433+
Ok(target) => {
434+
debug!(?target, "serving current sync target");
435+
wire::Message::GetCurrentSyncTargetResponse(
436+
wire::GetCurrentSyncTargetResponse {
437+
request_id: request.request_id,
438+
target,
439+
},
440+
)
441+
}
442+
Err(err) => error_message(state, request_id, err.into()),
443+
}
444+
}
445+
_ => unexpected_message(state, request_id),
446+
}
447+
}
448+
}
449+
410450
impl<DB> ServeMode<DB> for CompactMode
411451
where
412452
DB: CompactSyncable<Family = mmr::Family> + Send + Sync + 'static,
@@ -710,14 +750,18 @@ where
710750
}
711751

712752
/// Run the Current database server.
713-
async fn run_current<E>(context: E, config: Config) -> Result<(), Box<dyn std::error::Error>>
753+
///
754+
/// Uses `CurrentFullMode` to serve database-root-aware sync targets with an
755+
/// [OpsRootWitness](commonware_storage::qmdb::current::proof::OpsRootWitness).
756+
async fn run_current<E>(mut context: E, config: Config) -> Result<(), Box<dyn std::error::Error>>
714757
where
715758
E: BufferPooler + Storage + Clock + Metrics + Network + Spawner + RngCore + Send,
716759
{
717760
let db_config = current::create_config(&context);
718761
let database = current::Database::init(context.child("database"), db_config).await?;
762+
let database = initialize_database(database, &config, &mut context).await?;
719763

720-
run_helper(context, config, database).await
764+
run_server::<current::Database<_>, E, CurrentFullMode>(context, config, database).await
721765
}
722766

723767
/// Run the Immutable database server.

examples/sync/src/databases/current.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@
22
//!
33
//! A `current` database extends an `any` database with an activity bitmap that tracks which
44
//! operations are active (i.e. represent the current state of their key) vs inactive (superseded or
5-
//! deleted). Its canonical root folds the ops root, a grafted merkle root (combining bitmap chunks
5+
//! deleted). Its database root folds the ops root, a grafted merkle root (combining bitmap chunks
66
//! with ops subtree roots), and an optional partial-chunk digest. See [current] module
77
//! documentation for more details.
88
//!
9-
//! For sync, the engine targets the **ops root** (not the canonical root). The operations and proof
10-
//! format are identical to `any`; direct proof verifiers should use `qmdb::hasher`. The bitmap is
11-
//! reconstructed deterministically from the operations after sync completes. See the
12-
//! [Root structure](commonware_storage::qmdb::current) module documentation for details.
9+
//! For sync, the engine internally targets the **ops root** (not the database root). The
10+
//! operations and proof format are identical to `any`; direct proof verifiers should use
11+
//! `qmdb::hasher`. The bitmap is reconstructed deterministically from the operations after sync
12+
//! completes. The `current::sync` wrapper verifies each target's `OpsRootWitness` against its
13+
//! trusted database root, then checks the reconstructed database root for the target the
14+
//! engine finishes on.
1315
//!
1416
//! This module re-uses the same [`Operation`] type as [`super::any`] since the underlying
1517
//! operations log is the same.
@@ -25,11 +27,11 @@ use commonware_storage::{
2527
qmdb::{
2628
self,
2729
any::unordered::{fixed::Operation as FixedOperation, Update},
28-
current::{self, FixedConfig as Config},
30+
current::{self, sync::Target as CurrentTarget, FixedConfig as Config},
2931
operation::Committable,
3032
},
3133
};
32-
use commonware_utils::{NZUsize, NZU16, NZU64};
34+
use commonware_utils::{non_empty_range, NZUsize, NZU16, NZU64};
3335
use std::{future::Future, num::NonZeroU64};
3436
use tracing::error;
3537

@@ -145,9 +147,7 @@ where
145147
}
146148

147149
fn root(&self) -> Key {
148-
// Return the ops root (not the canonical root) because this is what the
149-
// sync engine verifies against.
150-
self.ops_root()
150+
self.root()
151151
}
152152

153153
fn name() -> &'static str {
@@ -186,6 +186,21 @@ where
186186
}
187187
}
188188

189+
pub async fn current_sync_target<E: Storage + Clock + Metrics>(
190+
db: &Database<E>,
191+
) -> Result<CurrentTarget<mmr::Family, Key>, qmdb::Error<mmr::Family>> {
192+
let hasher = qmdb::hasher::<Hasher>();
193+
let witness = db.ops_root_witness(&hasher).await?;
194+
let sync_boundary = db.sync_boundary();
195+
let size = db.bounds().await.end;
196+
Ok(CurrentTarget::new(
197+
db.root(),
198+
db.ops_root(),
199+
witness,
200+
non_empty_range!(sync_boundary, size),
201+
))
202+
}
203+
189204
#[cfg(test)]
190205
mod tests {
191206
use super::*;

0 commit comments

Comments
 (0)