Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ stability_scope!(ALPHA {
});
stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
use commonware_cryptography::certificate::Scheme;
use futures::Stream;
use crate::marshal::ancestry::Ancestry;
use commonware_runtime::{Clock, Metrics, Spawner};
use rand::Rng;

Expand Down Expand Up @@ -272,7 +272,7 @@ stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
fn propose(
&mut self,
context: (E, Self::Context),
ancestry: impl Stream<Item = Self::Block> + Send,
ancestry: impl Ancestry<Self::Block>,
) -> impl Future<Output = Option<Self::Block>> + Send;

/// Verify a block produced by the application's proposer, relative to its ancestry.
Expand All @@ -287,7 +287,7 @@ stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
fn verify(
&mut self,
context: (E, Self::Context),
ancestry: impl Stream<Item = Self::Block> + Send,
ancestry: impl Ancestry<Self::Block>,
) -> impl Future<Output = bool> + Send;
}
});
66 changes: 28 additions & 38 deletions consensus/src/marshal/ancestry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,34 @@ use std::{
task::{Context, Poll},
};

/// An interface for providing blocks.
/// A stream of blocks used by application propose and verify calls.
pub trait Ancestry<B: Block>: Stream<Item = B> + Send + Unpin + 'static {}

impl<T, B> Ancestry<B> for T
where
T: Stream<Item = B> + Send + Unpin + 'static,
B: Block,
{
}

/// An interface for providing parent blocks.
pub trait BlockProvider: Clone + Send + 'static {
/// The block type the provider walks.
type Block: Block;

/// Subscribe to a block by its digest without requesting it from the network.
/// Subscribe to the parent of a known block.
///
/// If the block is found available locally, the block will be returned immediately.
/// If the parent is found available locally, the parent will be returned immediately.
///
/// If the block is not available locally, the subscription will be registered and the caller
/// will be notified when the block is available. If the block is not finalized, it's possible
/// If the parent is not available locally, the subscription will be registered and the caller
/// will be notified when the parent is available. If the parent is not finalized, it's possible
/// that it may never become available.
///
/// Returns `None` when the subscription is canceled or the provider can no longer deliver
/// the block.
/// the parent.
///
/// This is intentionally narrower than [`Self::subscribe_parent`]. A digest is enough to
/// identify a block in local storage, but it may not be enough to form the network request
/// used by a marshal variant. Variants whose consensus commitment contains extra context
/// should keep that logic in [`Self::subscribe_parent`], where the known child block is
/// still available.
fn subscribe(
self,
digest: <Self::Block as Digestible>::Digest,
) -> impl Future<Output = Option<Self::Block>> + Send;

/// Subscribe to the parent of a known block.
///
/// This is a separate hook from [`Self::subscribe`] because the child block can carry
/// variant-specific context needed to retrieve its parent. The default implementation
/// follows the digest link and waits locally, but providers may override this to derive a
/// full parent commitment and issue a fetching subscription.
fn subscribe_parent(
self,
block: Self::Block,
) -> impl Future<Output = Option<Self::Block>> + Send {
let digest = block.parent();
self.subscribe(digest)
}
/// The child block can carry variant-specific context needed to retrieve its parent.
fn subscribe(self, block: Self::Block) -> impl Future<Output = Option<Self::Block>> + Send;
}

/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
Expand Down Expand Up @@ -114,10 +103,10 @@ where
// If a result has been buffered, return it and queue the parent fetch if needed.
if let Some(block) = this.buffered.pop() {
let height = block.height();
let should_subscribe_parent = height > END_BOUND;
let should_walk_parent = height > END_BOUND;
let end_of_buffered = this.buffered.is_empty();
if should_subscribe_parent && end_of_buffered {
let future = this.marshal.clone().subscribe_parent(block.clone()).boxed();
if should_walk_parent && end_of_buffered {
let future = this.marshal.clone().subscribe(block.clone()).boxed();
*this.pending.as_mut() = Some(future).into();

// Explicitly poll the next future to kick off the fetch. If it's already ready,
Expand All @@ -131,7 +120,7 @@ where
}
Poll::Ready(None) | Poll::Pending => {}
}
} else if !should_subscribe_parent {
} else if !should_walk_parent {
// No more parents to fetch; Finish the stream.
*this.pending.as_mut() = None.into();
}
Expand All @@ -147,9 +136,9 @@ where
}
Poll::Ready(Some(Some(block))) => {
let height = block.height();
let should_subscribe_parent = height > END_BOUND;
if should_subscribe_parent {
let future = this.marshal.clone().subscribe_parent(block.clone()).boxed();
let should_walk_parent = height > END_BOUND;
if should_walk_parent {
let future = this.marshal.clone().subscribe(block.clone()).boxed();
*this.pending.as_mut() = Some(future).into();

// Explicitly poll the next future to kick off the fetch. If it's already ready,
Expand Down Expand Up @@ -187,8 +176,9 @@ mod test {
impl BlockProvider for MockProvider {
type Block = Block<Sha256Digest, ()>;

async fn subscribe(self, digest: Sha256Digest) -> Option<Self::Block> {
self.0.into_iter().find(|b| b.digest() == digest)
async fn subscribe(self, block: Self::Block) -> Option<Self::Block> {
Comment thread
clabby marked this conversation as resolved.
Outdated
let parent = block.parent;
self.0.into_iter().find(|b| b.digest() == parent)
}
}

Expand Down
62 changes: 57 additions & 5 deletions consensus/src/marshal/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub use marshaled::{Marshaled, MarshaledConfig};
mod tests {
use crate::{
marshal::{
ancestry::BlockProvider,
coding::{
marshaled::genesis_coding_commitment,
shards,
Expand Down Expand Up @@ -103,7 +104,7 @@ mod tests {
use commonware_parallel::Sequential;
use commonware_resolver::{Delivery, Fetch, Resolver};
use commonware_runtime::{
buffer::paged::CacheRef, deterministic, Clock, Metrics, Runner, Supervisor as _,
buffer::paged::CacheRef, deterministic, Clock, Metrics, Runner, Spawner, Supervisor as _,
};
use commonware_storage::archive::immutable;
use commonware_utils::{
Expand All @@ -115,16 +116,27 @@ mod tests {
type TestCodedBlock = CodedBlock<CodingB, ReedSolomon<Sha256>, Sha256>;
type CodingSendRecord = (Round, TestCodedBlock, Recipients<K>);

#[test]
fn mailbox_provides_application_blocks() {
Comment thread
clabby marked this conversation as resolved.
fn assert_provider<P: BlockProvider<Block = CodingB>>() {}
assert_provider::<core::Mailbox<S, TestCodingVariant>>();
}

/// A coding buffer that records subscriptions and never resolves them.
#[derive(Clone, Default)]
struct RecordingCodingBuffer {
subscriptions: Arc<Mutex<Vec<oneshot::Sender<TestCodedBlock>>>>,
digest_subscriptions: Arc<Mutex<Vec<oneshot::Sender<TestCodedBlock>>>>,
commitment_subscriptions: Arc<Mutex<Vec<oneshot::Sender<TestCodedBlock>>>>,
sends: Arc<Mutex<Vec<CodingSendRecord>>>,
}

impl RecordingCodingBuffer {
fn subscription_count(&self) -> usize {
self.subscriptions.lock().len()
self.digest_subscriptions.lock().len() + self.commitment_subscriptions.lock().len()
}

fn commitment_subscription_count(&self) -> usize {
self.commitment_subscriptions.lock().len()
}
}

Expand All @@ -141,7 +153,7 @@ mod tests {

fn subscribe_by_digest(&self, _digest: D) -> oneshot::Receiver<TestCodedBlock> {
let (sender, receiver) = oneshot::channel();
self.subscriptions.lock().push(sender);
self.digest_subscriptions.lock().push(sender);
receiver
}

Expand All @@ -150,7 +162,7 @@ mod tests {
_commitment: Commitment,
) -> oneshot::Receiver<TestCodedBlock> {
let (sender, receiver) = oneshot::channel();
self.subscriptions.lock().push(sender);
self.commitment_subscriptions.lock().push(sender);
receiver
}

Expand Down Expand Up @@ -452,6 +464,46 @@ mod tests {
(candidate_ctx, coded_candidate)
}

#[test_traced("WARN")]
fn test_coding_block_provider_parent_fetches_by_commitment() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let provider = ConstantProvider::new(schemes[0].clone());
let buffer = RecordingCodingBuffer::default();
let (marshal, _resolver, _actor_handle) = start_coding_actor_with_recording(
context.child("actor_stack"),
"coding-provider-parent-commitment",
provider,
buffer.clone(),
)
.await;

let (parent_ctx, parent) = missing_candidate(participants[0].clone());
let child_ctx = CodingCtx {
round: Round::new(Epoch::zero(), View::new(2)),
leader: participants[0].clone(),
parent: (parent_ctx.round.view(), parent.commitment()),
};
let child = make_coding_block(child_ctx, parent.digest(), Height::new(2), 200);
let subscription = context
.child("subscribe")
.spawn(move |_| BlockProvider::subscribe(marshal, child));

context.sleep(Duration::from_millis(100)).await;
assert_eq!(
buffer.commitment_subscription_count(),
1,
"parent walkback should use the coding parent commitment"
);
drop(subscription);
});
}

#[test_traced("WARN")]
fn test_coding_verify_missing_candidate_waits_without_fetching() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
Expand Down
30 changes: 28 additions & 2 deletions consensus/src/marshal/coding/variant.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use crate::{
marshal::{
ancestry::BlockProvider,
coding::{
shards,
types::{CodedBlock, CodedBlockCfg, StoredCodedBlock},
},
core::{Buffer, Variant},
core::{Buffer, CommitmentFallback, Mailbox, Variant},
},
simplex::types::Context,
types::{coding::Commitment, Round},
CertifiableBlock,
};
use commonware_codec::Read;
use commonware_coding::Scheme as CodingScheme;
use commonware_cryptography::{Committable, Digestible, Hasher, PublicKey};
use commonware_cryptography::{certificate::Scheme, Committable, Digestible, Hasher, PublicKey};
use commonware_p2p::Recipients;
use commonware_utils::channel::oneshot;

Expand Down Expand Up @@ -113,3 +114,28 @@ where
self.proposed(round, block);
}
}

impl<S, B, C, H, P> BlockProvider for Mailbox<S, Coding<B, C, H, P>>
where
S: Scheme,
B: CertifiableBlock<Context = Context<Commitment, P>>,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
type Block = B;

async fn subscribe(self, block: Self::Block) -> Option<Self::Block> {
let parent_height = block.height().previous()?;
let commitment = block.context().parent.1;
self.subscribe_by_commitment(
commitment,
CommitmentFallback::FetchByCommitment {
height: parent_height,
},
)
.await
.ok()
.map(<Coding<B, C, H, P> as Variant>::into_inner)
}
}
49 changes: 8 additions & 41 deletions consensus/src/marshal/core/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
},
simplex::types::{Activity, Finalization, Notarization},
types::{Height, Round},
Heightable, Reporter,
Reporter,
};
use commonware_actor::{
mailbox::{Overflow, Policy, Sender},
Expand All @@ -15,7 +15,7 @@ use commonware_actor::{
use commonware_cryptography::{certificate::Scheme, Digestible};
use commonware_p2p::Recipients;
use commonware_utils::{channel::oneshot, vec::NonEmptyVec};
use futures::{Stream, StreamExt};
use futures::Stream;
use std::collections::{btree_map::Entry, BTreeMap, VecDeque};

/// Messages sent to the marshal [Actor](super::Actor).
Expand Down Expand Up @@ -514,12 +514,6 @@ pub struct Mailbox<S: Scheme, V: Variant> {
sender: Sender<Message<S, V>>,
}

/// Provider used by [`Mailbox::ancestor_stream`] to fetch missing certified parents.
#[derive(Clone)]
pub(crate) struct AncestryProvider<S: Scheme, V: Variant> {
mailbox: Mailbox<S, V>,
}

impl<S: Scheme, V: Variant> Mailbox<S, V> {
/// Creates a new mailbox.
pub(crate) const fn new(sender: Sender<Message<S, V>>) -> Self {
Expand All @@ -540,15 +534,10 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
initial: I,
) -> impl Stream<Item = V::ApplicationBlock> + Send + use<S, V, I>
where
Self: BlockProvider<Block = V::ApplicationBlock>,
I: IntoIterator<Item = V::Block>,
{
AncestorStream::new(
AncestryProvider {
mailbox: self.clone(),
},
initial,
)
.map(V::into_inner)
AncestorStream::new(self.clone(), initial.into_iter().map(V::into_inner))
}

/// A request to retrieve the information about the highest finalized block.
Expand Down Expand Up @@ -690,7 +679,10 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
pub async fn ancestry(
&self,
(fallback, start_digest): (DigestFallback, <V::Block as Digestible>::Digest),
) -> Option<impl Stream<Item = V::ApplicationBlock> + Send + use<S, V>> {
) -> Option<impl Stream<Item = V::ApplicationBlock> + Send + use<S, V>>
where
Self: BlockProvider<Block = V::ApplicationBlock>,
{
let receiver = self.subscribe_by_digest(start_digest, fallback);
receiver
.await
Expand Down Expand Up @@ -786,31 +778,6 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
}
}

impl<S: Scheme, V: Variant> BlockProvider for AncestryProvider<S, V> {
type Block = V::Block;

async fn subscribe(self, digest: <V::Block as Digestible>::Digest) -> Option<Self::Block> {
let subscription = self
.mailbox
.subscribe_by_digest(digest, DigestFallback::Wait);
subscription.await.ok()
}

async fn subscribe_parent(self, block: Self::Block) -> Option<Self::Block> {
// Ancestry walking does not carry the certified parent round. By this
// point the stream is walking accepted ancestry, so this height should
// be correct; it remains a local pruning bound rather than a peer
// response validity condition.
let parent_height = block.height().previous()?;
let commitment = V::parent_commitment(&block);
let fallback = CommitmentFallback::FetchByCommitment {
height: parent_height,
};
let subscription = self.mailbox.subscribe_by_commitment(commitment, fallback);
subscription.await.ok()
}
}

impl<S: Scheme, V: Variant> Reporter for Mailbox<S, V> {
type Activity = Activity<S, V::Commitment>;

Expand Down
Loading
Loading