Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion examples/sync/src/net/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use commonware_storage::{
mmr::{self, Location},
qmdb::{
any::sync::Target,
current::sync::Target as CurrentTarget,
current::sync::{CurrentResolver, Target as CurrentTarget},
sync::{self, compact},
},
};
Expand Down Expand Up @@ -226,6 +226,19 @@ where
}
}

impl<Op, D> CurrentResolver for Resolver<Op, D>
where
Op: Clone + Read + EncodeShared,
Op::Cfg: IsUnit,
D: Digest,
{
async fn current_target(
&self,
) -> Result<CurrentTarget<Self::Family, Self::Digest>, Self::Error> {
self.get_current_sync_target().await
}
}

impl<Op, D> compact::Resolver for Resolver<Op, D>
where
Op: Clone + Read + EncodeShared,
Expand Down
314 changes: 313 additions & 1 deletion storage/src/qmdb/current/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,68 @@
//! forwarding its ops root to the shared sync engine, then checks the reconstructed database root
//! for the target the engine finishes on.
//!
//! ## Consensus-aware target matching
//!
//! Live resolvers usually serve their latest full [Target], while consensus (or another trust
//! source) separately reports canonical database roots. Use [CurrentResolver] and [TargetMatcher]
//! to join those streams: a candidate target becomes trusted only once its `root` appears in the
//! trusted-root stream.
//!
//! ```ignore
//! use commonware_storage::qmdb::current::sync::{self as current_sync, CurrentResolver, TargetMatcher};
//!
//! let mut matcher = TargetMatcher::new(64, 64);
//! let mut last_accepted = None;
//! let (accepted_tx, mut accepted_rx) = commonware_utils::channel::mpsc::channel(16);
//!
//! // `is_forward` is application policy, usually based on consensus metadata
//! // (height, round, view, certificate sequence) associated with each root.
//! // Roots are hashes and do not have a natural ordering on their own.
//!
//! // Consensus task: insert roots that the chain has made canonical.
//! loop {
//! let trusted_root = consensus_client.canonical_root_at_tip().await?;
//! if let Some(target) = matcher.insert_trusted_root(trusted_root) {
//! // Join succeeded: this trusted root already had a resolver target cached.
//! if is_forward(&last_accepted, &target) {
//! last_accepted = Some(target.clone());
//! accepted_tx.clone().send(target).await?;
//! }
//! }
//! }
//!
//! // Resolver task: insert latest atomic targets from the sync server.
//! loop {
//! let target = resolver.current_target().await?;
//! if let Some(target) = matcher.insert_candidate_target(target) {
//! // Join succeeded: this target's root was already trusted by consensus.
//! if is_forward(&last_accepted, &target) {
//! last_accepted = Some(target.clone());
//! accepted_tx.clone().send(target).await?;
//! }
//! }
//! }
//!
//! // Sync task: use only targets that matched consensus roots.
//! let initial_target = accepted_rx.recv().await.expect("target stream closed");
//! let (update_tx, update_rx) = commonware_utils::channel::mpsc::channel(16);
//!
//! // Forward later accepted targets into `update_tx` while sync runs.
//! let db = current_sync::sync(current_sync::Config {
//! context,
//! resolver,
//! target: initial_target,
//! max_outstanding_requests,
//! fetch_batch_size,
//! apply_batch_size,
//! db_config,
//! update_rx: Some(update_rx),
//! finish_rx,
//! reached_target_tx,
//! max_retained_roots,
//! }).await?;
//! ```
//!
//! After all operations are synced, the bitmap and grafted tree are reconstructed deterministically
//! from the operations. The database root is then computed from the ops root, the reconstructed
//! grafted root, and any pending or partial chunk digests.
Expand Down Expand Up @@ -81,16 +143,154 @@ use commonware_parallel::Strategy;
use commonware_utils::{
bitmap::Prunable as BitMap,
channel::{mpsc, oneshot},
non_empty_range,
range::NonEmptyRange,
sync::AsyncMutex,
Array,
};
use futures::future::{select, Either};
use std::{num::NonZeroU64, sync::Arc};
use std::{
collections::{HashMap, HashSet, VecDeque},
future::Future,
num::NonZeroU64,
sync::Arc,
};

#[cfg(test)]
pub(crate) mod tests;

// --- CurrentResolver types ---

/// Resolver extension for fetching the latest full authenticated `current` target.
///
/// The returned [`Target`] is an atomic snapshot: `root`, `ops_root`, witness, and range must all
/// describe the same database state. Callers that learn trusted canonical roots from another source
/// can cache these targets and use [`TargetMatcher`] to find a target whose root has been trusted
/// before passing it to [`sync`].
pub trait CurrentResolver: qmdb_sync::Resolver
where
Self::Family: Graftable,
{
/// Fetch the latest full authenticated [`Target`].
#[allow(clippy::type_complexity)]
fn current_target(
&self,
) -> impl Future<Output = Result<Target<Self::Family, Self::Digest>, Self::Error>> + Send;
}

/// Bounded matcher for joining trusted roots with candidate `current` targets.
///
/// Consensus or another trust source inserts canonical roots with [`Self::insert_trusted_root`].
/// A resolver inserts latest atomic targets with [`Self::insert_candidate_target`]. The first time
/// a root appears in both caches, the matcher returns the matching target and consumes that root so
/// it is emitted at most once.
Comment on lines +184 to +186
pub struct TargetMatcher<F: Graftable, D: Digest> {
trusted_roots: HashSet<D>,
trusted_order: VecDeque<D>,
candidate_targets: HashMap<D, Target<F, D>>,
candidate_order: VecDeque<D>,
emitted_roots: HashSet<D>,
emitted_order: VecDeque<D>,
max_trusted_roots: usize,
max_candidate_targets: usize,
}

impl<F: Graftable, D: Digest> TargetMatcher<F, D> {
/// Create a matcher with bounded root and target caches.
pub fn new(max_trusted_roots: usize, max_candidate_targets: usize) -> Self {
Self {
trusted_roots: HashSet::new(),
trusted_order: VecDeque::new(),
candidate_targets: HashMap::new(),
candidate_order: VecDeque::new(),
emitted_roots: HashSet::new(),
emitted_order: VecDeque::new(),
max_trusted_roots,
max_candidate_targets,
}
}

/// Insert a trusted canonical root.
///
/// Returns the matching target if one has already been observed.
pub fn insert_trusted_root(&mut self, root: D) -> Option<Target<F, D>> {
if self.emitted_roots.contains(&root) {
return None;
}
if let Some(target) = self.candidate_targets.remove(&root) {
self.trusted_roots.remove(&root);
self.record_emitted(root);
return Some(target);
}
if self.max_trusted_roots == 0 {
Comment on lines +216 to +225
return None;
}
if self.trusted_roots.insert(root) {
self.trusted_order.push_back(root);
}
self.evict_trusted_roots();
None
}

/// Insert a candidate target from a resolver.
///
/// Returns the target if its root has already been trusted.
pub fn insert_candidate_target(&mut self, target: Target<F, D>) -> Option<Target<F, D>> {
let root = target.root;
if self.emitted_roots.contains(&root) {
return None;
}
if self.trusted_roots.remove(&root) {
self.candidate_targets.remove(&root);
self.record_emitted(root);
return Some(target);
}
if self.max_candidate_targets == 0 {
return None;
}
if !self.candidate_targets.contains_key(&root) {
self.candidate_order.push_back(root);
}
self.candidate_targets.insert(root, target);
self.evict_candidate_targets();
None
}

fn record_emitted(&mut self, root: D) {
if self.emitted_roots.insert(root) {
self.emitted_order.push_back(root);
}
let max_emitted_roots = self.max_trusted_roots.max(self.max_candidate_targets);
while self.emitted_roots.len() > max_emitted_roots {
if let Some(root) = self.emitted_order.pop_front() {
self.emitted_roots.remove(&root);
} else {
break;
}
}
}

fn evict_trusted_roots(&mut self) {
while self.trusted_roots.len() > self.max_trusted_roots {
if let Some(root) = self.trusted_order.pop_front() {
self.trusted_roots.remove(&root);
} else {
break;
}
}
}

fn evict_candidate_targets(&mut self) {
while self.candidate_targets.len() > self.max_candidate_targets {
if let Some(root) = self.candidate_order.pop_front() {
self.candidate_targets.remove(&root);
} else {
break;
}
}
}
}

/// Sync target for `current` databases, anchored by a trusted database root.
///
/// The witness authenticates `ops_root` against `root`; the shared sync engine
Expand Down Expand Up @@ -796,6 +996,118 @@ impl_current_resolver!(
OrderedVariableOp<F, K, V>: CodecShared,
);

// --- CurrentResolver implementations ---
//
// Each server-side database type returns a full target from one live database snapshot.

macro_rules! impl_current_current_resolver {
($db:ident, $val_bound:ident, $key_bound:path $(; $($where_extra:tt)+)?) => {
impl<F, E, K, V, H, T, const N: usize, S> CurrentResolver
for std::sync::Arc<$db<F, E, K, V, H, T, N, S>>
where
F: Graftable,
E: Context,
K: $key_bound,
V: $val_bound + Send + Sync + 'static,
H: Hasher,
T: Translator + Send + Sync + 'static,
T::Key: Send + Sync,
S: Strategy,
$($($where_extra)+)?
{
async fn current_target(&self) -> Result<Target<F, H::Digest>, Self::Error> {
let hasher = qmdb::hasher::<H>();
let witness = self.ops_root_witness(&hasher).await?;
let lower = self.sync_boundary();
let upper = self.bounds().await.end;
Ok(Target::new(
self.root(),
self.ops_root(),
witness,
non_empty_range!(lower, upper),
))
}
}

impl<F, E, K, V, H, T, const N: usize, S> CurrentResolver
for std::sync::Arc<
commonware_utils::sync::AsyncRwLock<
$db<F, E, K, V, H, T, N, S>,
>,
>
where
F: Graftable,
E: Context,
K: $key_bound,
V: $val_bound + Send + Sync + 'static,
H: Hasher,
T: Translator + Send + Sync + 'static,
T::Key: Send + Sync,
S: Strategy,
$($($where_extra)+)?
{
async fn current_target(&self) -> Result<Target<F, H::Digest>, Self::Error> {
let db = self.read().await;
let hasher = qmdb::hasher::<H>();
let witness = db.ops_root_witness(&hasher).await?;
let lower = db.sync_boundary();
let upper = db.bounds().await.end;
Ok(Target::new(
db.root(),
db.ops_root(),
witness,
non_empty_range!(lower, upper),
))
}
}

impl<F, E, K, V, H, T, const N: usize, S> CurrentResolver
for std::sync::Arc<
commonware_utils::sync::AsyncRwLock<
Option<$db<F, E, K, V, H, T, N, S>>,
>,
>
where
F: Graftable,
E: Context,
K: $key_bound,
V: $val_bound + Send + Sync + 'static,
H: Hasher,
T: Translator + Send + Sync + 'static,
T::Key: Send + Sync,
S: Strategy,
$($($where_extra)+)?
{
async fn current_target(&self) -> Result<Target<F, H::Digest>, Self::Error> {
let guard = self.read().await;
let db = guard.as_ref()
.ok_or(qmdb::Error::<F>::KeyNotFound)?;
let hasher = qmdb::hasher::<H>();
let witness = db.ops_root_witness(&hasher).await?;
let lower = db.sync_boundary();
let upper = db.bounds().await.end;
Ok(Target::new(
db.root(),
db.ops_root(),
witness,
non_empty_range!(lower, upper),
))
}
}
};
}

impl_current_current_resolver!(CurrentUnorderedFixedDb, FixedValue, Array);
impl_current_current_resolver!(
CurrentUnorderedVariableDb, VariableValue, Key;
UnorderedVariableOp<F, K, V>: CodecShared,
);
impl_current_current_resolver!(CurrentOrderedFixedDb, FixedValue, Array);
impl_current_current_resolver!(
CurrentOrderedVariableDb, VariableValue, Key;
OrderedVariableOp<F, K, V>: CodecShared,
);

#[cfg(feature = "arbitrary")]
impl<F: Graftable, D: Digest> arbitrary::Arbitrary<'_> for Target<F, D>
where
Expand Down
Loading
Loading