diff --git a/examples/sync/src/net/resolver.rs b/examples/sync/src/net/resolver.rs index ca7d6b11b8..ee429c7382 100644 --- a/examples/sync/src/net/resolver.rs +++ b/examples/sync/src/net/resolver.rs @@ -7,7 +7,7 @@ use commonware_storage::{ mmr::{self, Location}, qmdb::{ any::sync::Target, - current::sync::Target as CurrentTarget, + current::sync::{CurrentResolver, Target as CurrentTarget}, sync::{self, compact}, }, }; @@ -226,6 +226,19 @@ where } } +impl CurrentResolver for Resolver +where + Op: Clone + Read + EncodeShared, + Op::Cfg: IsUnit, + D: Digest, +{ + async fn current_target( + &self, + ) -> Result, Self::Error> { + self.get_current_sync_target().await + } +} + impl compact::Resolver for Resolver where Op: Clone + Read + EncodeShared, diff --git a/storage/src/qmdb/current/sync/mod.rs b/storage/src/qmdb/current/sync/mod.rs index 8708511117..d662506685 100644 --- a/storage/src/qmdb/current/sync/mod.rs +++ b/storage/src/qmdb/current/sync/mod.rs @@ -15,6 +15,68 @@ //! forwarding its ops root to the shared sync engine, then checks the reconstructed database root //! for the target the engine finishes on. //! +//! ## Consensus-aware target matching +//! +//! Live resolvers usually serve their latest full [Target], while consensus (or another trust +//! source) separately reports canonical database roots. Use [CurrentResolver] and [TargetMatcher] +//! to join those streams: a candidate target becomes trusted only once its `root` appears in the +//! trusted-root stream. +//! +//! ```ignore +//! use commonware_storage::qmdb::current::sync::{self as current_sync, CurrentResolver, TargetMatcher}; +//! +//! let mut matcher = TargetMatcher::new(64, 64); +//! let mut last_accepted = None; +//! let (accepted_tx, mut accepted_rx) = commonware_utils::channel::mpsc::channel(16); +//! +//! // `is_forward` is application policy, usually based on consensus metadata +//! // (height, round, view, certificate sequence) associated with each root. +//! // Roots are hashes and do not have a natural ordering on their own. +//! +//! // Consensus task: insert roots that the chain has made canonical. +//! loop { +//! let trusted_root = consensus_client.canonical_root_at_tip().await?; +//! if let Some(target) = matcher.insert_trusted_root(trusted_root) { +//! // Join succeeded: this trusted root already had a resolver target cached. +//! if is_forward(&last_accepted, &target) { +//! last_accepted = Some(target.clone()); +//! accepted_tx.clone().send(target).await?; +//! } +//! } +//! } +//! +//! // Resolver task: insert latest atomic targets from the sync server. +//! loop { +//! let target = resolver.current_target().await?; +//! if let Some(target) = matcher.insert_candidate_target(target) { +//! // Join succeeded: this target's root was already trusted by consensus. +//! if is_forward(&last_accepted, &target) { +//! last_accepted = Some(target.clone()); +//! accepted_tx.clone().send(target).await?; +//! } +//! } +//! } +//! +//! // Sync task: use only targets that matched consensus roots. +//! let initial_target = accepted_rx.recv().await.expect("target stream closed"); +//! let (update_tx, update_rx) = commonware_utils::channel::mpsc::channel(16); +//! +//! // Forward later accepted targets into `update_tx` while sync runs. +//! let db = current_sync::sync(current_sync::Config { +//! context, +//! resolver, +//! target: initial_target, +//! max_outstanding_requests, +//! fetch_batch_size, +//! apply_batch_size, +//! db_config, +//! update_rx: Some(update_rx), +//! finish_rx, +//! reached_target_tx, +//! max_retained_roots, +//! }).await?; +//! ``` +//! //! After all operations are synced, the bitmap and grafted tree are reconstructed deterministically //! from the operations. The database root is then computed from the ops root, the reconstructed //! grafted root, and any pending or partial chunk digests. @@ -81,16 +143,154 @@ use commonware_parallel::Strategy; use commonware_utils::{ bitmap::Prunable as BitMap, channel::{mpsc, oneshot}, + non_empty_range, range::NonEmptyRange, sync::AsyncMutex, Array, }; use futures::future::{select, Either}; -use std::{num::NonZeroU64, sync::Arc}; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + future::Future, + num::NonZeroU64, + sync::Arc, +}; #[cfg(test)] pub(crate) mod tests; +// --- CurrentResolver types --- + +/// Resolver extension for fetching the latest full authenticated `current` target. +/// +/// The returned [`Target`] is an atomic snapshot: `root`, `ops_root`, witness, and range must all +/// describe the same database state. Callers that learn trusted canonical roots from another source +/// can cache these targets and use [`TargetMatcher`] to find a target whose root has been trusted +/// before passing it to [`sync`]. +pub trait CurrentResolver: qmdb_sync::Resolver +where + Self::Family: Graftable, +{ + /// Fetch the latest full authenticated [`Target`]. + #[allow(clippy::type_complexity)] + fn current_target( + &self, + ) -> impl Future, Self::Error>> + Send; +} + +/// Bounded matcher for joining trusted roots with candidate `current` targets. +/// +/// Consensus or another trust source inserts canonical roots with [`Self::insert_trusted_root`]. +/// A resolver inserts latest atomic targets with [`Self::insert_candidate_target`]. The first time +/// a root appears in both caches, the matcher returns the matching target and consumes that root so +/// it is emitted at most once. +pub struct TargetMatcher { + trusted_roots: HashSet, + trusted_order: VecDeque, + candidate_targets: HashMap>, + candidate_order: VecDeque, + emitted_roots: HashSet, + emitted_order: VecDeque, + max_trusted_roots: usize, + max_candidate_targets: usize, +} + +impl TargetMatcher { + /// Create a matcher with bounded root and target caches. + pub fn new(max_trusted_roots: usize, max_candidate_targets: usize) -> Self { + Self { + trusted_roots: HashSet::new(), + trusted_order: VecDeque::new(), + candidate_targets: HashMap::new(), + candidate_order: VecDeque::new(), + emitted_roots: HashSet::new(), + emitted_order: VecDeque::new(), + max_trusted_roots, + max_candidate_targets, + } + } + + /// Insert a trusted canonical root. + /// + /// Returns the matching target if one has already been observed. + pub fn insert_trusted_root(&mut self, root: D) -> Option> { + if self.emitted_roots.contains(&root) { + return None; + } + if let Some(target) = self.candidate_targets.remove(&root) { + self.trusted_roots.remove(&root); + self.record_emitted(root); + return Some(target); + } + if self.max_trusted_roots == 0 { + return None; + } + if self.trusted_roots.insert(root) { + self.trusted_order.push_back(root); + } + self.evict_trusted_roots(); + None + } + + /// Insert a candidate target from a resolver. + /// + /// Returns the target if its root has already been trusted. + pub fn insert_candidate_target(&mut self, target: Target) -> Option> { + let root = target.root; + if self.emitted_roots.contains(&root) { + return None; + } + if self.trusted_roots.remove(&root) { + self.candidate_targets.remove(&root); + self.record_emitted(root); + return Some(target); + } + if self.max_candidate_targets == 0 { + return None; + } + if !self.candidate_targets.contains_key(&root) { + self.candidate_order.push_back(root); + } + self.candidate_targets.insert(root, target); + self.evict_candidate_targets(); + None + } + + fn record_emitted(&mut self, root: D) { + if self.emitted_roots.insert(root) { + self.emitted_order.push_back(root); + } + let max_emitted_roots = self.max_trusted_roots.max(self.max_candidate_targets); + while self.emitted_roots.len() > max_emitted_roots { + if let Some(root) = self.emitted_order.pop_front() { + self.emitted_roots.remove(&root); + } else { + break; + } + } + } + + fn evict_trusted_roots(&mut self) { + while self.trusted_roots.len() > self.max_trusted_roots { + if let Some(root) = self.trusted_order.pop_front() { + self.trusted_roots.remove(&root); + } else { + break; + } + } + } + + fn evict_candidate_targets(&mut self) { + while self.candidate_targets.len() > self.max_candidate_targets { + if let Some(root) = self.candidate_order.pop_front() { + self.candidate_targets.remove(&root); + } else { + break; + } + } + } +} + /// Sync target for `current` databases, anchored by a trusted database root. /// /// The witness authenticates `ops_root` against `root`; the shared sync engine @@ -796,6 +996,118 @@ impl_current_resolver!( OrderedVariableOp: CodecShared, ); +// --- CurrentResolver implementations --- +// +// Each server-side database type returns a full target from one live database snapshot. + +macro_rules! impl_current_current_resolver { + ($db:ident, $val_bound:ident, $key_bound:path $(; $($where_extra:tt)+)?) => { + impl CurrentResolver + for std::sync::Arc<$db> + where + F: Graftable, + E: Context, + K: $key_bound, + V: $val_bound + Send + Sync + 'static, + H: Hasher, + T: Translator + Send + Sync + 'static, + T::Key: Send + Sync, + S: Strategy, + $($($where_extra)+)? + { + async fn current_target(&self) -> Result, Self::Error> { + let hasher = qmdb::hasher::(); + let witness = self.ops_root_witness(&hasher).await?; + let lower = self.sync_boundary(); + let upper = self.bounds().await.end; + Ok(Target::new( + self.root(), + self.ops_root(), + witness, + non_empty_range!(lower, upper), + )) + } + } + + impl CurrentResolver + for std::sync::Arc< + commonware_utils::sync::AsyncRwLock< + $db, + >, + > + where + F: Graftable, + E: Context, + K: $key_bound, + V: $val_bound + Send + Sync + 'static, + H: Hasher, + T: Translator + Send + Sync + 'static, + T::Key: Send + Sync, + S: Strategy, + $($($where_extra)+)? + { + async fn current_target(&self) -> Result, Self::Error> { + let db = self.read().await; + let hasher = qmdb::hasher::(); + let witness = db.ops_root_witness(&hasher).await?; + let lower = db.sync_boundary(); + let upper = db.bounds().await.end; + Ok(Target::new( + db.root(), + db.ops_root(), + witness, + non_empty_range!(lower, upper), + )) + } + } + + impl CurrentResolver + for std::sync::Arc< + commonware_utils::sync::AsyncRwLock< + Option<$db>, + >, + > + where + F: Graftable, + E: Context, + K: $key_bound, + V: $val_bound + Send + Sync + 'static, + H: Hasher, + T: Translator + Send + Sync + 'static, + T::Key: Send + Sync, + S: Strategy, + $($($where_extra)+)? + { + async fn current_target(&self) -> Result, Self::Error> { + let guard = self.read().await; + let db = guard.as_ref() + .ok_or(qmdb::Error::::KeyNotFound)?; + let hasher = qmdb::hasher::(); + let witness = db.ops_root_witness(&hasher).await?; + let lower = db.sync_boundary(); + let upper = db.bounds().await.end; + Ok(Target::new( + db.root(), + db.ops_root(), + witness, + non_empty_range!(lower, upper), + )) + } + } + }; +} + +impl_current_current_resolver!(CurrentUnorderedFixedDb, FixedValue, Array); +impl_current_current_resolver!( + CurrentUnorderedVariableDb, VariableValue, Key; + UnorderedVariableOp: CodecShared, +); +impl_current_current_resolver!(CurrentOrderedFixedDb, FixedValue, Array); +impl_current_current_resolver!( + CurrentOrderedVariableDb, VariableValue, Key; + OrderedVariableOp: CodecShared, +); + #[cfg(feature = "arbitrary")] impl arbitrary::Arbitrary<'_> for Target where diff --git a/storage/src/qmdb/current/sync/tests.rs b/storage/src/qmdb/current/sync/tests.rs index 5728c6b5e3..4a0c53859a 100644 --- a/storage/src/qmdb/current/sync/tests.rs +++ b/storage/src/qmdb/current/sync/tests.rs @@ -835,6 +835,75 @@ current_sync_tests_for_harness!(harnesses::OrderedFixedMmbHarness, ordered_fixed current_sync_tests_for_harness!(harnesses::OrderedVariableMmrHarness, ordered_variable_mmr); current_sync_tests_for_harness!(harnesses::OrderedVariableMmbHarness, ordered_variable_mmb); +mod target_matcher { + use super::*; + use crate::{ + merkle::{mmr, Location}, + qmdb::current::sync::TargetMatcher, + }; + + fn target(byte: u8) -> crate::qmdb::current::sync::Target { + let root = Digest::from([byte; 32]); + dummy_current_target( + root, + root, + non_empty_range!(Location::new(0), Location::new(1)), + ) + } + + #[test] + fn test_trusted_root_before_candidate_target() { + let mut matcher = TargetMatcher::new(4, 4); + let target = target(1); + + assert!(matcher.insert_trusted_root(target.root).is_none()); + assert_eq!( + matcher.insert_candidate_target(target.clone()), + Some(target.clone()) + ); + assert!(matcher.insert_candidate_target(target).is_none()); + } + + #[test] + fn test_candidate_target_before_trusted_root() { + let mut matcher = TargetMatcher::new(4, 4); + let target = target(2); + + assert!(matcher.insert_candidate_target(target.clone()).is_none()); + assert_eq!( + matcher.insert_trusted_root(target.root), + Some(target.clone()) + ); + assert!(matcher.insert_trusted_root(target.root).is_none()); + } + + #[test] + fn test_matching_target_emitted_once() { + let mut matcher = TargetMatcher::new(4, 4); + let target = target(5); + + assert!(matcher.insert_candidate_target(target.clone()).is_none()); + assert_eq!( + matcher.insert_trusted_root(target.root), + Some(target.clone()) + ); + assert!(matcher.insert_candidate_target(target.clone()).is_none()); + assert!(matcher.insert_trusted_root(target.root).is_none()); + } + + #[test] + fn test_unmatched_targets_are_evicted() { + let mut matcher = TargetMatcher::new(4, 1); + let evicted = target(3); + let retained = target(4); + + assert!(matcher.insert_candidate_target(evicted.clone()).is_none()); + assert!(matcher.insert_candidate_target(retained.clone()).is_none()); + assert!(matcher.insert_trusted_root(evicted.root).is_none()); + assert_eq!(matcher.insert_trusted_root(retained.root), Some(retained)); + } +} + mod root_sync { use super::*; use crate::{ @@ -843,7 +912,7 @@ mod root_sync { self, current::{ proof::OpsRootWitness, - sync::{self as current_sync, Target as CurrentTarget}, + sync::{self as current_sync, CurrentResolver, Target as CurrentTarget}, tests::variable_config, }, }, @@ -899,6 +968,80 @@ mod root_sync { ) } + async fn assert_current_target_matches_db(db: &Db, target: CurrentTarget) { + let expected = make_current_target(db).await; + let hasher = qmdb::hasher::(); + assert_eq!(target, expected); + assert!(target.verify(&hasher)); + } + + #[test_traced("INFO")] + fn test_current_resolver_arc_db_returns_atomic_target() { + let executor = deterministic::Runner::default(); + executor.start(|mut context: Context| async move { + let target_db = build_target_db(&mut context).await; + let resolver = std::sync::Arc::new(target_db); + let target = CurrentResolver::current_target(&resolver).await.unwrap(); + + assert_current_target_matches_db(resolver.as_ref(), target).await; + + let target_db = std::sync::Arc::into_inner(resolver).unwrap(); + target_db.destroy().await.unwrap(); + }); + } + + #[test_traced("INFO")] + fn test_current_resolver_locked_db_returns_atomic_target() { + let executor = deterministic::Runner::default(); + executor.start(|mut context: Context| async move { + let target_db = build_target_db(&mut context).await; + let resolver = std::sync::Arc::new(commonware_utils::sync::AsyncRwLock::new(target_db)); + let target = CurrentResolver::current_target(&resolver).await.unwrap(); + + { + let db = resolver.read().await; + assert_current_target_matches_db(&db, target).await; + } + + let target_db = std::sync::Arc::into_inner(resolver).unwrap().into_inner(); + target_db.destroy().await.unwrap(); + }); + } + + #[test_traced("INFO")] + fn test_current_resolver_locked_option_returns_atomic_target() { + let executor = deterministic::Runner::default(); + executor.start(|mut context: Context| async move { + let target_db = build_target_db(&mut context).await; + let resolver = + std::sync::Arc::new(commonware_utils::sync::AsyncRwLock::new(Some(target_db))); + let target = CurrentResolver::current_target(&resolver).await.unwrap(); + + { + let guard = resolver.read().await; + assert_current_target_matches_db(guard.as_ref().unwrap(), target).await; + } + + let target_db = std::sync::Arc::into_inner(resolver) + .unwrap() + .into_inner() + .unwrap(); + target_db.destroy().await.unwrap(); + }); + } + + #[test_traced("INFO")] + fn test_current_resolver_locked_option_empty_errors() { + let executor = deterministic::Runner::default(); + executor.start(|_context: Context| async move { + let resolver: std::sync::Arc>> = + std::sync::Arc::new(commonware_utils::sync::AsyncRwLock::new(None)); + let result = CurrentResolver::current_target(&resolver).await; + + assert!(matches!(result, Err(crate::qmdb::Error::KeyNotFound))); + }); + } + #[test_traced("INFO")] fn test_root_sync_succeeds() { let executor = deterministic::Runner::default();