Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ stability_scope!(ALPHA {
});
stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
use commonware_cryptography::certificate::Scheme;
use futures::Stream;
use crate::marshal::ancestry::Ancestry;
use commonware_runtime::{Clock, Metrics, Spawner};
use rand::Rng;

Expand Down Expand Up @@ -272,7 +272,7 @@ stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
fn propose(
&mut self,
context: (E, Self::Context),
ancestry: impl Stream<Item = Self::Block> + Send,
ancestry: impl Ancestry<Self::Block>,
) -> impl Future<Output = Option<Self::Block>> + Send;

/// Verify a block produced by the application's proposer, relative to its ancestry.
Expand All @@ -287,7 +287,7 @@ stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
fn verify(
&mut self,
context: (E, Self::Context),
ancestry: impl Stream<Item = Self::Block> + Send,
ancestry: impl Ancestry<Self::Block>,
) -> impl Future<Output = bool> + Send;
}
});
59 changes: 26 additions & 33 deletions consensus/src/marshal/ancestry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,37 @@ use std::{
task::{Context, Poll},
};

/// An interface for providing blocks.
/// A stream of blocks used by application propose and verify calls.
pub trait Ancestry<B: Block>: Stream<Item = B> + Send + Unpin + 'static {}

impl<T, B> Ancestry<B> for T
where
T: Stream<Item = B> + Send + Unpin + 'static,
B: Block,
{
}

/// An interface for providing parent blocks.
pub trait BlockProvider: Clone + Send + 'static {
/// The block type the provider walks.
type Block: Block;

/// Subscribe to a block by its digest without requesting it from the network.
/// Subscribe to the parent of a known block.
///
/// If the block is found available locally, the block will be returned immediately.
/// If the parent is found available locally, the parent will be returned immediately.
///
/// If the block is not available locally, the subscription will be registered and the caller
/// will be notified when the block is available. If the block is not finalized, it's possible
/// If the parent is not available locally, the subscription will be registered and the caller
/// will be notified when the parent is available. If the parent is not finalized, it's possible
/// that it may never become available.
///
/// Returns `None` when the subscription is canceled or the provider can no longer deliver
/// the block.
/// the parent.
///
/// This is intentionally narrower than [`Self::subscribe_parent`]. A digest is enough to
/// identify a block in local storage, but it may not be enough to form the network request
/// used by a marshal variant. Variants whose consensus commitment contains extra context
/// should keep that logic in [`Self::subscribe_parent`], where the known child block is
/// still available.
fn subscribe(
self,
digest: <Self::Block as Digestible>::Digest,
) -> impl Future<Output = Option<Self::Block>> + Send;

/// Subscribe to the parent of a known block.
///
/// This is a separate hook from [`Self::subscribe`] because the child block can carry
/// variant-specific context needed to retrieve its parent. The default implementation
/// follows the digest link and waits locally, but providers may override this to derive a
/// full parent commitment and issue a fetching subscription.
/// The child block can carry variant-specific context needed to retrieve its parent.
fn subscribe_parent(
self,
block: Self::Block,
) -> impl Future<Output = Option<Self::Block>> + Send {
let digest = block.parent();
self.subscribe(digest)
}
) -> impl Future<Output = Option<Self::Block>> + Send;
}

/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
Expand Down Expand Up @@ -114,9 +106,9 @@ where
// If a result has been buffered, return it and queue the parent fetch if needed.
if let Some(block) = this.buffered.pop() {
let height = block.height();
let should_subscribe_parent = height > END_BOUND;
let should_walk_parent = height > END_BOUND;
let end_of_buffered = this.buffered.is_empty();
if should_subscribe_parent && end_of_buffered {
if should_walk_parent && end_of_buffered {
let future = this.marshal.clone().subscribe_parent(block.clone()).boxed();
*this.pending.as_mut() = Some(future).into();

Expand All @@ -131,7 +123,7 @@ where
}
Poll::Ready(None) | Poll::Pending => {}
}
} else if !should_subscribe_parent {
} else if !should_walk_parent {
// No more parents to fetch; Finish the stream.
*this.pending.as_mut() = None.into();
}
Expand All @@ -147,8 +139,8 @@ where
}
Poll::Ready(Some(Some(block))) => {
let height = block.height();
let should_subscribe_parent = height > END_BOUND;
if should_subscribe_parent {
let should_walk_parent = height > END_BOUND;
if should_walk_parent {
let future = this.marshal.clone().subscribe_parent(block.clone()).boxed();
*this.pending.as_mut() = Some(future).into();

Expand Down Expand Up @@ -187,8 +179,9 @@ mod test {
impl BlockProvider for MockProvider {
type Block = Block<Sha256Digest, ()>;

async fn subscribe(self, digest: Sha256Digest) -> Option<Self::Block> {
self.0.into_iter().find(|b| b.digest() == digest)
async fn subscribe_parent(self, block: Self::Block) -> Option<Self::Block> {
let parent = block.parent;
self.0.into_iter().find(|b| b.digest() == parent)
}
}

Expand Down
62 changes: 57 additions & 5 deletions consensus/src/marshal/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub use marshaled::{Marshaled, MarshaledConfig};
mod tests {
use crate::{
marshal::{
ancestry::BlockProvider,
coding::{
marshaled::genesis_coding_commitment,
shards,
Expand Down Expand Up @@ -103,7 +104,7 @@ mod tests {
use commonware_parallel::Sequential;
use commonware_resolver::{Delivery, Fetch, Resolver};
use commonware_runtime::{
buffer::paged::CacheRef, deterministic, Clock, Metrics, Runner, Supervisor as _,
buffer::paged::CacheRef, deterministic, Clock, Metrics, Runner, Spawner, Supervisor as _,
};
use commonware_storage::archive::immutable;
use commonware_utils::{
Expand All @@ -115,16 +116,27 @@ mod tests {
type TestCodedBlock = CodedBlock<CodingB, ReedSolomon<Sha256>, Sha256>;
type CodingSendRecord = (Round, TestCodedBlock, Recipients<K>);

#[test]
fn mailbox_provides_application_blocks() {
Comment thread
clabby marked this conversation as resolved.
fn assert_provider<P: BlockProvider<Block = CodingB>>() {}
assert_provider::<core::Mailbox<S, TestCodingVariant>>();
}

/// A coding buffer that records subscriptions and never resolves them.
#[derive(Clone, Default)]
struct RecordingCodingBuffer {
subscriptions: Arc<Mutex<Vec<oneshot::Sender<TestCodedBlock>>>>,
digest_subscriptions: Arc<Mutex<Vec<oneshot::Sender<TestCodedBlock>>>>,
commitment_subscriptions: Arc<Mutex<Vec<oneshot::Sender<TestCodedBlock>>>>,
sends: Arc<Mutex<Vec<CodingSendRecord>>>,
}

impl RecordingCodingBuffer {
fn subscription_count(&self) -> usize {
self.subscriptions.lock().len()
self.digest_subscriptions.lock().len() + self.commitment_subscriptions.lock().len()
}

fn commitment_subscription_count(&self) -> usize {
self.commitment_subscriptions.lock().len()
}
}

Expand All @@ -141,7 +153,7 @@ mod tests {

fn subscribe_by_digest(&self, _digest: D) -> oneshot::Receiver<TestCodedBlock> {
let (sender, receiver) = oneshot::channel();
self.subscriptions.lock().push(sender);
self.digest_subscriptions.lock().push(sender);
receiver
}

Expand All @@ -150,7 +162,7 @@ mod tests {
_commitment: Commitment,
) -> oneshot::Receiver<TestCodedBlock> {
let (sender, receiver) = oneshot::channel();
self.subscriptions.lock().push(sender);
self.commitment_subscriptions.lock().push(sender);
receiver
}

Expand Down Expand Up @@ -452,6 +464,46 @@ mod tests {
(candidate_ctx, coded_candidate)
}

#[test_traced("WARN")]
fn test_coding_block_provider_parent_fetches_by_commitment() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let provider = ConstantProvider::new(schemes[0].clone());
let buffer = RecordingCodingBuffer::default();
let (marshal, _resolver, _actor_handle) = start_coding_actor_with_recording(
context.child("actor_stack"),
"coding-provider-parent-commitment",
provider,
buffer.clone(),
)
.await;

let (parent_ctx, parent) = missing_candidate(participants[0].clone());
let child_ctx = CodingCtx {
round: Round::new(Epoch::zero(), View::new(2)),
leader: participants[0].clone(),
parent: (parent_ctx.round.view(), parent.commitment()),
};
let child = make_coding_block(child_ctx, parent.digest(), Height::new(2), 200);
let subscription = context
.child("subscribe")
.spawn(move |_| BlockProvider::subscribe_parent(marshal, child));

context.sleep(Duration::from_millis(100)).await;
assert_eq!(
buffer.commitment_subscription_count(),
1,
"parent walkback should use the coding parent commitment"
);
drop(subscription);
});
}

#[test_traced("WARN")]
fn test_coding_verify_missing_candidate_waits_without_fetching() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
Expand Down
30 changes: 28 additions & 2 deletions consensus/src/marshal/coding/variant.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use crate::{
marshal::{
ancestry::BlockProvider,
coding::{
shards,
types::{CodedBlock, CodedBlockCfg, StoredCodedBlock},
},
core::{Buffer, Variant},
core::{Buffer, CommitmentFallback, Mailbox, Variant},
},
simplex::types::Context,
types::{coding::Commitment, Round},
CertifiableBlock,
};
use commonware_codec::Read;
use commonware_coding::Scheme as CodingScheme;
use commonware_cryptography::{Committable, Digestible, Hasher, PublicKey};
use commonware_cryptography::{certificate::Scheme, Committable, Digestible, Hasher, PublicKey};
use commonware_p2p::Recipients;
use commonware_utils::channel::oneshot;

Expand Down Expand Up @@ -113,3 +114,28 @@ where
self.proposed(round, block);
}
}

impl<S, B, C, H, P> BlockProvider for Mailbox<S, Coding<B, C, H, P>>
where
S: Scheme,
B: CertifiableBlock<Context = Context<Commitment, P>>,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
type Block = B;

async fn subscribe_parent(self, block: Self::Block) -> Option<Self::Block> {
let parent_height = block.height().previous()?;
let commitment = block.context().parent.1;
self.subscribe_by_commitment(
commitment,
CommitmentFallback::FetchByCommitment {
height: parent_height,
},
)
.await
.ok()
.map(<Coding<B, C, H, P> as Variant>::into_inner)
}
}
49 changes: 8 additions & 41 deletions consensus/src/marshal/core/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
},
simplex::types::{Activity, Finalization, Notarization},
types::{Height, Round},
Heightable, Reporter,
Reporter,
};
use commonware_actor::{
mailbox::{Overflow, Policy, Sender},
Expand All @@ -15,7 +15,7 @@ use commonware_actor::{
use commonware_cryptography::{certificate::Scheme, Digestible};
use commonware_p2p::Recipients;
use commonware_utils::{channel::oneshot, vec::NonEmptyVec};
use futures::{Stream, StreamExt};
use futures::Stream;
use std::collections::{btree_map::Entry, BTreeMap, VecDeque};

/// Messages sent to the marshal [Actor](super::Actor).
Expand Down Expand Up @@ -514,12 +514,6 @@ pub struct Mailbox<S: Scheme, V: Variant> {
sender: Sender<Message<S, V>>,
}

/// Provider used by [`Mailbox::ancestor_stream`] to fetch missing certified parents.
#[derive(Clone)]
pub(crate) struct AncestryProvider<S: Scheme, V: Variant> {
mailbox: Mailbox<S, V>,
}

impl<S: Scheme, V: Variant> Mailbox<S, V> {
/// Creates a new mailbox.
pub(crate) const fn new(sender: Sender<Message<S, V>>) -> Self {
Expand All @@ -540,15 +534,10 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
initial: I,
) -> impl Stream<Item = V::ApplicationBlock> + Send + use<S, V, I>
where
Self: BlockProvider<Block = V::ApplicationBlock>,
I: IntoIterator<Item = V::Block>,
{
AncestorStream::new(
AncestryProvider {
mailbox: self.clone(),
},
initial,
)
.map(V::into_inner)
AncestorStream::new(self.clone(), initial.into_iter().map(V::into_inner))
}

/// A request to retrieve the information about the highest finalized block.
Expand Down Expand Up @@ -690,7 +679,10 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
pub async fn ancestry(
&self,
(fallback, start_digest): (DigestFallback, <V::Block as Digestible>::Digest),
) -> Option<impl Stream<Item = V::ApplicationBlock> + Send + use<S, V>> {
) -> Option<impl Stream<Item = V::ApplicationBlock> + Send + use<S, V>>
where
Self: BlockProvider<Block = V::ApplicationBlock>,
{
let receiver = self.subscribe_by_digest(start_digest, fallback);
receiver
.await
Expand Down Expand Up @@ -786,31 +778,6 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
}
}

impl<S: Scheme, V: Variant> BlockProvider for AncestryProvider<S, V> {
type Block = V::Block;

async fn subscribe(self, digest: <V::Block as Digestible>::Digest) -> Option<Self::Block> {
let subscription = self
.mailbox
.subscribe_by_digest(digest, DigestFallback::Wait);
subscription.await.ok()
}

async fn subscribe_parent(self, block: Self::Block) -> Option<Self::Block> {
// Ancestry walking does not carry the certified parent round. By this
// point the stream is walking accepted ancestry, so this height should
// be correct; it remains a local pruning bound rather than a peer
// response validity condition.
let parent_height = block.height().previous()?;
let commitment = V::parent_commitment(&block);
let fallback = CommitmentFallback::FetchByCommitment {
height: parent_height,
};
let subscription = self.mailbox.subscribe_by_commitment(commitment, fallback);
subscription.await.ok()
}
}

impl<S: Scheme, V: Variant> Reporter for Mailbox<S, V> {
type Activity = Activity<S, V::Commitment>;

Expand Down
Loading
Loading