diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 8b0dc91b66..7cab38d178 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -237,7 +237,7 @@ stability_scope!(ALPHA { }); stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) { use commonware_cryptography::certificate::Scheme; - use futures::Stream; + use crate::marshal::ancestry::Ancestry; use commonware_runtime::{Clock, Metrics, Spawner}; use rand::Rng; @@ -272,7 +272,7 @@ stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) { fn propose( &mut self, context: (E, Self::Context), - ancestry: impl Stream + Send, + ancestry: impl Ancestry, ) -> impl Future> + Send; /// Verify a block produced by the application's proposer, relative to its ancestry. @@ -287,7 +287,7 @@ stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) { fn verify( &mut self, context: (E, Self::Context), - ancestry: impl Stream + Send, + ancestry: impl Ancestry, ) -> impl Future + Send; } }); diff --git a/consensus/src/marshal/ancestry.rs b/consensus/src/marshal/ancestry.rs index 6da2056fad..067ab4407f 100644 --- a/consensus/src/marshal/ancestry.rs +++ b/consensus/src/marshal/ancestry.rs @@ -13,45 +13,37 @@ use std::{ task::{Context, Poll}, }; -/// An interface for providing blocks. +/// A stream of blocks used by application propose and verify calls. +pub trait Ancestry: Stream + Send + Unpin + 'static {} + +impl Ancestry for T +where + T: Stream + Send + Unpin + 'static, + B: Block, +{ +} + +/// An interface for providing parent blocks. pub trait BlockProvider: Clone + Send + 'static { /// The block type the provider walks. type Block: Block; - /// Subscribe to a block by its digest without requesting it from the network. + /// Subscribe to the parent of a known block. /// - /// If the block is found available locally, the block will be returned immediately. + /// If the parent is found available locally, the parent will be returned immediately. /// - /// If the block is not available locally, the subscription will be registered and the caller - /// will be notified when the block is available. If the block is not finalized, it's possible + /// If the parent is not available locally, the subscription will be registered and the caller + /// will be notified when the parent is available. If the parent is not finalized, it's possible /// that it may never become available. /// /// Returns `None` when the subscription is canceled or the provider can no longer deliver - /// the block. + /// the parent. /// - /// This is intentionally narrower than [`Self::subscribe_parent`]. A digest is enough to - /// identify a block in local storage, but it may not be enough to form the network request - /// used by a marshal variant. Variants whose consensus commitment contains extra context - /// should keep that logic in [`Self::subscribe_parent`], where the known child block is - /// still available. - fn subscribe( - self, - digest: ::Digest, - ) -> impl Future> + Send; - - /// Subscribe to the parent of a known block. - /// - /// This is a separate hook from [`Self::subscribe`] because the child block can carry - /// variant-specific context needed to retrieve its parent. The default implementation - /// follows the digest link and waits locally, but providers may override this to derive a - /// full parent commitment and issue a fetching subscription. + /// The child block can carry variant-specific context needed to retrieve its parent. fn subscribe_parent( self, block: Self::Block, - ) -> impl Future> + Send { - let digest = block.parent(); - self.subscribe(digest) - } + ) -> impl Future> + Send; } /// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block. @@ -114,9 +106,9 @@ where // If a result has been buffered, return it and queue the parent fetch if needed. if let Some(block) = this.buffered.pop() { let height = block.height(); - let should_subscribe_parent = height > END_BOUND; + let should_walk_parent = height > END_BOUND; let end_of_buffered = this.buffered.is_empty(); - if should_subscribe_parent && end_of_buffered { + if should_walk_parent && end_of_buffered { let future = this.marshal.clone().subscribe_parent(block.clone()).boxed(); *this.pending.as_mut() = Some(future).into(); @@ -131,7 +123,7 @@ where } Poll::Ready(None) | Poll::Pending => {} } - } else if !should_subscribe_parent { + } else if !should_walk_parent { // No more parents to fetch; Finish the stream. *this.pending.as_mut() = None.into(); } @@ -147,8 +139,8 @@ where } Poll::Ready(Some(Some(block))) => { let height = block.height(); - let should_subscribe_parent = height > END_BOUND; - if should_subscribe_parent { + let should_walk_parent = height > END_BOUND; + if should_walk_parent { let future = this.marshal.clone().subscribe_parent(block.clone()).boxed(); *this.pending.as_mut() = Some(future).into(); @@ -187,8 +179,9 @@ mod test { impl BlockProvider for MockProvider { type Block = Block; - async fn subscribe(self, digest: Sha256Digest) -> Option { - self.0.into_iter().find(|b| b.digest() == digest) + async fn subscribe_parent(self, block: Self::Block) -> Option { + let parent = block.parent; + self.0.into_iter().find(|b| b.digest() == parent) } } diff --git a/consensus/src/marshal/coding/mod.rs b/consensus/src/marshal/coding/mod.rs index 17382d9cdb..65363ad9b9 100644 --- a/consensus/src/marshal/coding/mod.rs +++ b/consensus/src/marshal/coding/mod.rs @@ -65,6 +65,7 @@ pub use marshaled::{Marshaled, MarshaledConfig}; mod tests { use crate::{ marshal::{ + ancestry::BlockProvider, coding::{ marshaled::genesis_coding_commitment, shards, @@ -103,7 +104,7 @@ mod tests { use commonware_parallel::Sequential; use commonware_resolver::{Delivery, Fetch, Resolver}; use commonware_runtime::{ - buffer::paged::CacheRef, deterministic, Clock, Metrics, Runner, Supervisor as _, + buffer::paged::CacheRef, deterministic, Clock, Metrics, Runner, Spawner, Supervisor as _, }; use commonware_storage::archive::immutable; use commonware_utils::{ @@ -115,16 +116,27 @@ mod tests { type TestCodedBlock = CodedBlock, Sha256>; type CodingSendRecord = (Round, TestCodedBlock, Recipients); + #[test] + fn mailbox_provides_application_blocks() { + fn assert_provider>() {} + assert_provider::>(); + } + /// A coding buffer that records subscriptions and never resolves them. #[derive(Clone, Default)] struct RecordingCodingBuffer { - subscriptions: Arc>>>, + digest_subscriptions: Arc>>>, + commitment_subscriptions: Arc>>>, sends: Arc>>, } impl RecordingCodingBuffer { fn subscription_count(&self) -> usize { - self.subscriptions.lock().len() + self.digest_subscriptions.lock().len() + self.commitment_subscriptions.lock().len() + } + + fn commitment_subscription_count(&self) -> usize { + self.commitment_subscriptions.lock().len() } } @@ -141,7 +153,7 @@ mod tests { fn subscribe_by_digest(&self, _digest: D) -> oneshot::Receiver { let (sender, receiver) = oneshot::channel(); - self.subscriptions.lock().push(sender); + self.digest_subscriptions.lock().push(sender); receiver } @@ -150,7 +162,7 @@ mod tests { _commitment: Commitment, ) -> oneshot::Receiver { let (sender, receiver) = oneshot::channel(); - self.subscriptions.lock().push(sender); + self.commitment_subscriptions.lock().push(sender); receiver } @@ -452,6 +464,46 @@ mod tests { (candidate_ctx, coded_candidate) } + #[test_traced("WARN")] + fn test_coding_block_provider_parent_fetches_by_commitment() { + let runner = deterministic::Runner::timed(Duration::from_secs(30)); + runner.start(|mut context| async move { + let Fixture { + participants, + schemes, + .. + } = bls12381_threshold_vrf::fixture::(&mut context, NAMESPACE, NUM_VALIDATORS); + let provider = ConstantProvider::new(schemes[0].clone()); + let buffer = RecordingCodingBuffer::default(); + let (marshal, _resolver, _actor_handle) = start_coding_actor_with_recording( + context.child("actor_stack"), + "coding-provider-parent-commitment", + provider, + buffer.clone(), + ) + .await; + + let (parent_ctx, parent) = missing_candidate(participants[0].clone()); + let child_ctx = CodingCtx { + round: Round::new(Epoch::zero(), View::new(2)), + leader: participants[0].clone(), + parent: (parent_ctx.round.view(), parent.commitment()), + }; + let child = make_coding_block(child_ctx, parent.digest(), Height::new(2), 200); + let subscription = context + .child("subscribe") + .spawn(move |_| BlockProvider::subscribe_parent(marshal, child)); + + context.sleep(Duration::from_millis(100)).await; + assert_eq!( + buffer.commitment_subscription_count(), + 1, + "parent walkback should use the coding parent commitment" + ); + drop(subscription); + }); + } + #[test_traced("WARN")] fn test_coding_verify_missing_candidate_waits_without_fetching() { let runner = deterministic::Runner::timed(Duration::from_secs(30)); diff --git a/consensus/src/marshal/coding/variant.rs b/consensus/src/marshal/coding/variant.rs index 9673dd8ac6..bf0e792b05 100644 --- a/consensus/src/marshal/coding/variant.rs +++ b/consensus/src/marshal/coding/variant.rs @@ -1,10 +1,11 @@ use crate::{ marshal::{ + ancestry::BlockProvider, coding::{ shards, types::{CodedBlock, CodedBlockCfg, StoredCodedBlock}, }, - core::{Buffer, Variant}, + core::{Buffer, CommitmentFallback, Mailbox, Variant}, }, simplex::types::Context, types::{coding::Commitment, Round}, @@ -12,7 +13,7 @@ use crate::{ }; use commonware_codec::Read; use commonware_coding::Scheme as CodingScheme; -use commonware_cryptography::{Committable, Digestible, Hasher, PublicKey}; +use commonware_cryptography::{certificate::Scheme, Committable, Digestible, Hasher, PublicKey}; use commonware_p2p::Recipients; use commonware_utils::channel::oneshot; @@ -113,3 +114,28 @@ where self.proposed(round, block); } } + +impl BlockProvider for Mailbox> +where + S: Scheme, + B: CertifiableBlock>, + C: CodingScheme, + H: Hasher, + P: PublicKey, +{ + type Block = B; + + async fn subscribe_parent(self, block: Self::Block) -> Option { + let parent_height = block.height().previous()?; + let commitment = block.context().parent.1; + self.subscribe_by_commitment( + commitment, + CommitmentFallback::FetchByCommitment { + height: parent_height, + }, + ) + .await + .ok() + .map( as Variant>::into_inner) + } +} diff --git a/consensus/src/marshal/core/mailbox.rs b/consensus/src/marshal/core/mailbox.rs index b4cb84d06f..8f0fd665b4 100644 --- a/consensus/src/marshal/core/mailbox.rs +++ b/consensus/src/marshal/core/mailbox.rs @@ -6,7 +6,7 @@ use crate::{ }, simplex::types::{Activity, Finalization, Notarization}, types::{Height, Round}, - Heightable, Reporter, + Reporter, }; use commonware_actor::{ mailbox::{Overflow, Policy, Sender}, @@ -15,7 +15,7 @@ use commonware_actor::{ use commonware_cryptography::{certificate::Scheme, Digestible}; use commonware_p2p::Recipients; use commonware_utils::{channel::oneshot, vec::NonEmptyVec}; -use futures::{Stream, StreamExt}; +use futures::Stream; use std::collections::{btree_map::Entry, BTreeMap, VecDeque}; /// Messages sent to the marshal [Actor](super::Actor). @@ -514,12 +514,6 @@ pub struct Mailbox { sender: Sender>, } -/// Provider used by [`Mailbox::ancestor_stream`] to fetch missing certified parents. -#[derive(Clone)] -pub(crate) struct AncestryProvider { - mailbox: Mailbox, -} - impl Mailbox { /// Creates a new mailbox. pub(crate) const fn new(sender: Sender>) -> Self { @@ -540,15 +534,10 @@ impl Mailbox { initial: I, ) -> impl Stream + Send + use where + Self: BlockProvider, I: IntoIterator, { - AncestorStream::new( - AncestryProvider { - mailbox: self.clone(), - }, - initial, - ) - .map(V::into_inner) + AncestorStream::new(self.clone(), initial.into_iter().map(V::into_inner)) } /// A request to retrieve the information about the highest finalized block. @@ -690,7 +679,10 @@ impl Mailbox { pub async fn ancestry( &self, (fallback, start_digest): (DigestFallback, ::Digest), - ) -> Option + Send + use> { + ) -> Option + Send + use> + where + Self: BlockProvider, + { let receiver = self.subscribe_by_digest(start_digest, fallback); receiver .await @@ -786,31 +778,6 @@ impl Mailbox { } } -impl BlockProvider for AncestryProvider { - type Block = V::Block; - - async fn subscribe(self, digest: ::Digest) -> Option { - let subscription = self - .mailbox - .subscribe_by_digest(digest, DigestFallback::Wait); - subscription.await.ok() - } - - async fn subscribe_parent(self, block: Self::Block) -> Option { - // Ancestry walking does not carry the certified parent round. By this - // point the stream is walking accepted ancestry, so this height should - // be correct; it remains a local pruning bound rather than a peer - // response validity condition. - let parent_height = block.height().previous()?; - let commitment = V::parent_commitment(&block); - let fallback = CommitmentFallback::FetchByCommitment { - height: parent_height, - }; - let subscription = self.mailbox.subscribe_by_commitment(commitment, fallback); - subscription.await.ok() - } -} - impl Reporter for Mailbox { type Activity = Activity; diff --git a/consensus/src/marshal/mocks/harness.rs b/consensus/src/marshal/mocks/harness.rs index 2507bc7c92..248966aa32 100644 --- a/consensus/src/marshal/mocks/harness.rs +++ b/consensus/src/marshal/mocks/harness.rs @@ -5,6 +5,7 @@ use crate::{ marshal::{ + ancestry::BlockProvider, coding::{ shards, types::{coding_config_for_participants, CodedBlock}, @@ -4661,7 +4662,10 @@ pub fn hint_finalized_triggers_fetch() { } /// Test ancestry stream. -pub fn ancestry_stream() { +pub fn ancestry_stream() +where + Mailbox: BlockProvider, +{ let runner = deterministic::Runner::timed(Duration::from_secs(60)); runner.start(|mut context| async move { let Fixture { diff --git a/consensus/src/marshal/mocks/verifying.rs b/consensus/src/marshal/mocks/verifying.rs index 94bace825c..f6a927adb5 100644 --- a/consensus/src/marshal/mocks/verifying.rs +++ b/consensus/src/marshal/mocks/verifying.rs @@ -4,13 +4,12 @@ //! `Application` trait, suitable for testing the `Marshaled` wrapper in //! both standard and coding variants. -use crate::{CertifiableBlock, Epochable}; +use crate::{marshal::ancestry::Ancestry, CertifiableBlock, Epochable}; use commonware_runtime::deterministic; use commonware_utils::{ channel::{fallible::OneshotExt, oneshot}, sync::Mutex, }; -use futures::Stream; use std::{marker::PhantomData, sync::Arc}; /// A mock application that implements `Application` for testing. @@ -75,7 +74,7 @@ where async fn propose( &mut self, _context: (deterministic::Context, Self::Context), - _ancestry: impl Stream + Send, + _ancestry: impl Ancestry, ) -> Option { self.propose_result.clone() } @@ -83,7 +82,7 @@ where async fn verify( &mut self, _context: (deterministic::Context, Self::Context), - _ancestry: impl Stream + Send, + _ancestry: impl Ancestry, ) -> bool { self.verify_result } @@ -136,7 +135,7 @@ where async fn propose( &mut self, _context: (deterministic::Context, Self::Context), - _ancestry: impl Stream + Send, + _ancestry: impl Ancestry, ) -> Option { None } @@ -144,7 +143,7 @@ where async fn verify( &mut self, _context: (deterministic::Context, Self::Context), - _ancestry: impl Stream + Send, + _ancestry: impl Ancestry, ) -> bool { if let Some(started) = self.started.lock().take() { started.send_lossy(()); diff --git a/consensus/src/marshal/standard/mod.rs b/consensus/src/marshal/standard/mod.rs index 639d3a391b..482081b9fc 100644 --- a/consensus/src/marshal/standard/mod.rs +++ b/consensus/src/marshal/standard/mod.rs @@ -43,6 +43,7 @@ mod tests { use super::{Deferred, Inline, Standard}; use crate::{ marshal::{ + ancestry::BlockProvider, config::Config, core::{cache, Actor, CommitmentFallback, Mailbox}, mocks::{ @@ -84,7 +85,8 @@ mod tests { use commonware_parallel::Sequential; use commonware_resolver::{Consumer, Delivery, Fetch, Resolver}; use commonware_runtime::{ - buffer::paged::CacheRef, deterministic, Clock, Metrics, Quota, Runner, Supervisor as _, + buffer::paged::CacheRef, deterministic, Clock, Metrics, Quota, Runner, Spawner, + Supervisor as _, }; use commonware_storage::{ archive::{immutable, prunable, Archive as _}, @@ -105,6 +107,44 @@ mod tests { time::Duration, }; + #[test] + fn mailbox_provides_application_blocks() { + fn assert_provider>() {} + assert_provider::>>(); + } + + #[test_traced("WARN")] + fn test_standard_block_provider_parent_fetches_by_commitment() { + let runner = deterministic::Runner::timed(Duration::from_secs(30)); + runner.start(|mut context| async move { + let Fixture { schemes, .. } = + bls12381_threshold_vrf::fixture::(&mut context, NAMESPACE, NUM_VALIDATORS); + let buffer = RecordingBuffer::default(); + let (mailbox, buffer, _resolver, _actor_handle) = start_standard_actor( + context.child("validator"), + "standard-provider-parent-commitment", + ConstantProvider::new(schemes[0].clone()), + Application::::manual_ack(), + buffer, + ) + .await; + + let parent = make_raw_block(Sha256::hash(b""), Height::new(1), 100); + let child = make_raw_block(parent.digest(), Height::new(2), 200); + let subscription = context + .child("subscribe") + .spawn(move |_| BlockProvider::subscribe_parent(mailbox, child)); + + context.sleep(Duration::from_millis(100)).await; + assert_eq!( + buffer.commitment_subscription_count(), + 1, + "parent walkback should use the standard parent commitment" + ); + subscription.abort(); + }); + } + fn assert_finalize_deterministic( seed: u64, link: commonware_p2p::simulated::Link, @@ -2430,7 +2470,8 @@ mod tests { /// other methods are no-ops. #[derive(Clone, Default)] struct RecordingBuffer { - subscriptions: Arc>>>, + digest_subscriptions: Arc>>>, + commitment_subscriptions: Arc>>>, sends: Arc>>, } @@ -2440,7 +2481,11 @@ mod tests { } fn subscription_count(&self) -> usize { - self.subscriptions.lock().len() + self.digest_subscriptions.lock().len() + self.commitment_subscriptions.lock().len() + } + + fn commitment_subscription_count(&self) -> usize { + self.commitment_subscriptions.lock().len() } } @@ -2457,13 +2502,13 @@ mod tests { fn subscribe_by_digest(&self, _digest: D) -> oneshot::Receiver { let (sender, receiver) = oneshot::channel(); - self.subscriptions.lock().push(sender); + self.digest_subscriptions.lock().push(sender); receiver } fn subscribe_by_commitment(&self, _commitment: D) -> oneshot::Receiver { let (sender, receiver) = oneshot::channel(); - self.subscriptions.lock().push(sender); + self.commitment_subscriptions.lock().push(sender); receiver } diff --git a/consensus/src/marshal/standard/variant.rs b/consensus/src/marshal/standard/variant.rs index 87acb6efde..a6e6a7dd17 100644 --- a/consensus/src/marshal/standard/variant.rs +++ b/consensus/src/marshal/standard/variant.rs @@ -4,13 +4,16 @@ //! receives the full block directly from the proposer or via gossip. use crate::{ - marshal::core::{Buffer, Variant}, + marshal::{ + ancestry::BlockProvider, + core::{Buffer, CommitmentFallback, Mailbox, Variant}, + }, types::Round, Block, }; use commonware_broadcast::{buffered, Broadcaster}; use commonware_codec::Read; -use commonware_cryptography::{Digestible, PublicKey}; +use commonware_cryptography::{certificate::Scheme, Digestible, PublicKey}; use commonware_p2p::Recipients; use commonware_utils::channel::oneshot; @@ -89,3 +92,23 @@ where Broadcaster::broadcast(self, recipients, block); } } + +impl BlockProvider for Mailbox> +where + S: Scheme, + B: Block, +{ + type Block = B; + + async fn subscribe_parent(self, block: Self::Block) -> Option { + let parent_height = block.height().previous()?; + self.subscribe_by_commitment( + block.parent(), + CommitmentFallback::FetchByCommitment { + height: parent_height, + }, + ) + .await + .ok() + } +} diff --git a/examples/reshare/src/application/core.rs b/examples/reshare/src/application/core.rs index 7482a48b80..b4b40f0426 100644 --- a/examples/reshare/src/application/core.rs +++ b/examples/reshare/src/application/core.rs @@ -5,6 +5,7 @@ use crate::{ dkg, }; use commonware_consensus::{ + marshal::ancestry::Ancestry, simplex::types::Context, types::{Epoch, Round, View}, Heightable, @@ -14,7 +15,7 @@ use commonware_cryptography::{ Signer, }; use commonware_runtime::{Clock, Metrics, Spawner}; -use futures::{Stream, StreamExt}; +use futures::StreamExt; use rand::Rng; use std::marker::PhantomData; @@ -88,10 +89,9 @@ where async fn propose( &mut self, (_, context): (E, Self::Context), - ancestry: impl Stream + Send, + mut ancestry: impl Ancestry, ) -> Option { // Fetch the parent block from the ancestry stream. - futures::pin_mut!(ancestry); let parent_block = ancestry.next().await?; let parent_commitment = parent_block.commitment(); @@ -111,11 +111,7 @@ where )) } - async fn verify( - &mut self, - _: (E, Self::Context), - _: impl Stream + Send, - ) -> bool { + async fn verify(&mut self, _: (E, Self::Context), _: impl Ancestry) -> bool { // We wrap this application with `Marshaled`, which handles ancestry // verification (parent commitment and height contiguity). //