Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 84 additions & 14 deletions consensus/src/marshal/ancestry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ where
}

/// An interface for providing parent blocks.
pub trait BlockProvider: Clone + Send + 'static {
pub trait BlockProvider: Send + 'static {
/// The block type the provider walks.
type Block: Block;

Expand All @@ -41,9 +41,14 @@ pub trait BlockProvider: Clone + Send + 'static {
///
/// The child block can carry variant-specific context needed to retrieve its parent.
fn subscribe_parent(
self,
block: Self::Block,
) -> impl Future<Output = Option<Self::Block>> + Send;
&self,
block: &Self::Block,
) -> impl Future<Output = Option<Self::Block>> + Send + 'static;
}

struct ExpectedParent<D> {
child_height: Height,
parent_digest: D,
}

/// Yields the ancestors of a block while prefetching parents, including the
Expand All @@ -54,14 +59,15 @@ pub struct AncestorStream<M: BlockProvider> {
marshal: M,
#[pin]
pending: OptionFuture<BoxFuture<'static, Option<M::Block>>>,
pending_expected: Option<ExpectedParent<<M::Block as Digestible>::Digest>>,
}

impl<M: BlockProvider> AncestorStream<M> {
/// Creates a new [AncestorStream] starting from the given ancestry.
///
/// # Panics
///
/// Panics if the initial blocks are not contiguous in height.
/// Panics if the initial blocks are not contiguous.
pub(crate) fn new(marshal: M, initial: impl IntoIterator<Item = M::Block>) -> Self {
let mut buffered = initial.into_iter().collect::<Vec<M::Block>>();
buffered.sort_by_key(Heightable::height);
Expand All @@ -84,14 +90,39 @@ impl<M: BlockProvider> AncestorStream<M> {
marshal,
buffered,
pending: None.into(),
pending_expected: None,
}
}

/// Captures the parent relationship expected for a pending fetch.
fn expected_parent(child: &M::Block) -> ExpectedParent<<M::Block as Digestible>::Digest> {
ExpectedParent {
child_height: child.height(),
parent_digest: child.parent(),
}
}

/// Verifies that a fetched parent is the immediate predecessor of `child`.
fn assert_parent(
expected: ExpectedParent<<M::Block as Digestible>::Digest>,
parent: &M::Block,
) {
assert_eq!(
parent.height().next(),
expected.child_height,
"fetched parent must be contiguous in height"
);
assert_eq!(
parent.digest(),
expected.parent_digest,
"fetched parent must be contiguous in ancestry"
);
}
}

impl<M> Stream for AncestorStream<M>
where
M: BlockProvider,
M::Block: Clone,
{
type Item = M::Block;

Expand All @@ -106,23 +137,31 @@ where
let should_walk_parent = height > END_BOUND;
let end_of_buffered = this.buffered.is_empty();
if should_walk_parent && end_of_buffered {
let future = this.marshal.clone().subscribe_parent(block.clone()).boxed();
let future = this.marshal.subscribe_parent(&block).boxed();
*this.pending.as_mut() = Some(future).into();
*this.pending_expected = Some(Self::expected_parent(&block));

// Explicitly poll the next future to kick off the fetch. If it's already ready,
// buffer it for the next poll.
match this.pending.as_mut().poll(cx) {
Poll::Ready(Some(Some(block))) => {
this.buffered.push(block);
Poll::Ready(Some(Some(parent))) => {
let expected = this
.pending_expected
.take()
.expect("pending parent expectation must exist");
Self::assert_parent(expected, &parent);
this.buffered.push(parent);
}
Poll::Ready(Some(None)) => {
*this.pending.as_mut() = None.into();
*this.pending_expected = None;
}
Poll::Ready(None) | Poll::Pending => {}
}
} else if !should_walk_parent {
// No more parents to fetch; Finish the stream.
*this.pending.as_mut() = None.into();
*this.pending_expected = None;
}

return Poll::Ready(Some(block));
Expand All @@ -132,29 +171,43 @@ where
Poll::Pending => Poll::Pending,
Poll::Ready(None) | Poll::Ready(Some(None)) => {
*this.pending.as_mut() = None.into();
*this.pending_expected = None;
Poll::Ready(None)
}
Poll::Ready(Some(Some(block))) => {
let expected = this
.pending_expected
.take()
.expect("pending parent expectation must exist");
Self::assert_parent(expected, &block);
let height = block.height();
let should_walk_parent = height > END_BOUND;
if should_walk_parent {
let future = this.marshal.clone().subscribe_parent(block.clone()).boxed();
let future = this.marshal.subscribe_parent(&block).boxed();
*this.pending.as_mut() = Some(future).into();
*this.pending_expected = Some(Self::expected_parent(&block));

// Explicitly poll the next future to kick off the fetch. If it's already ready,
// buffer it for the next poll.
match this.pending.as_mut().poll(cx) {
Poll::Ready(Some(Some(block))) => {
this.buffered.push(block);
Poll::Ready(Some(Some(parent))) => {
let expected = this
.pending_expected
.take()
.expect("pending parent expectation must exist");
Self::assert_parent(expected, &parent);
this.buffered.push(parent);
}
Poll::Ready(Some(None)) => {
*this.pending.as_mut() = None.into();
*this.pending_expected = None;
}
Poll::Ready(None) | Poll::Pending => {}
}
} else {
// No more parents to fetch; Finish the stream.
*this.pending.as_mut() = None.into();
*this.pending_expected = None;
}

Poll::Ready(Some(block))
Expand All @@ -176,9 +229,13 @@ mod test {
impl BlockProvider for MockProvider {
type Block = Block<Sha256Digest, ()>;

async fn subscribe_parent(self, block: Self::Block) -> Option<Self::Block> {
fn subscribe_parent(
&self,
block: &Self::Block,
) -> impl Future<Output = Option<Self::Block>> + Send + 'static {
let parent = block.parent;
self.0.into_iter().find(|b| b.digest() == parent)
let blocks = self.0.clone();
async move { blocks.into_iter().find(|b| b.digest() == parent) }
}
}

Expand Down Expand Up @@ -206,6 +263,19 @@ mod test {
);
}

#[test]
#[should_panic = "fetched parent must be contiguous in height"]
fn test_panics_on_non_contiguous_fetched_parent_height() {
let parent = Block::new::<Sha256>((), Sha256Digest::EMPTY, Height::zero(), 0);
let child = Block::new::<Sha256>((), parent.digest(), Height::new(3), 3);
let stream = AncestorStream::new(MockProvider(vec![parent]), [child]);
futures::pin_mut!(stream);

let waker = futures::task::noop_waker_ref();
let mut cx = std::task::Context::from_waker(waker);
let _ = futures::Stream::poll_next(stream.as_mut(), &mut cx);
}

#[test_async]
async fn test_empty_yields_none() {
let mut stream: AncestorStream<MockProvider> =
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/marshal/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ mod tests {
let child = make_coding_block(child_ctx, parent.digest(), Height::new(2), 200);
let subscription = context
.child("subscribe")
.spawn(move |_| BlockProvider::subscribe_parent(marshal, child));
.spawn(move |_| async move { BlockProvider::subscribe_parent(&marshal, &child).await });

context.sleep(Duration::from_millis(100)).await;
assert_eq!(
Expand Down
32 changes: 20 additions & 12 deletions consensus/src/marshal/coding/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use commonware_coding::Scheme as CodingScheme;
use commonware_cryptography::{certificate::Scheme, Committable, Digestible, Hasher, PublicKey};
use commonware_p2p::Recipients;
use commonware_utils::channel::oneshot;
use std::future::Future;

/// The coding variant of Marshal, which uses erasure coding for block dissemination.
///
Expand Down Expand Up @@ -125,17 +126,24 @@ where
{
type Block = B;

async fn subscribe_parent(self, block: Self::Block) -> Option<Self::Block> {
let parent_height = block.height().previous()?;
let commitment = block.context().parent.1;
self.subscribe_by_commitment(
commitment,
CommitmentFallback::FetchByCommitment {
height: parent_height,
},
)
.await
.ok()
.map(<Coding<B, C, H, P> as Variant>::into_inner)
fn subscribe_parent(
&self,
block: &Self::Block,
) -> impl Future<Output = Option<Self::Block>> + Send + 'static {
let receiver = block.height().previous().map(|parent_height| {
self.subscribe_by_commitment(
block.context().parent.1,
CommitmentFallback::FetchByCommitment {
height: parent_height,
},
)
});
async move {
let receiver = receiver?;
receiver
.await
.ok()
.map(<Coding<B, C, H, P> as Variant>::into_inner)
}
}
}
2 changes: 1 addition & 1 deletion consensus/src/marshal/standard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ mod tests {
let child = make_raw_block(parent.digest(), Height::new(2), 200);
let subscription = context
.child("subscribe")
.spawn(move |_| BlockProvider::subscribe_parent(mailbox, child));
.spawn(move |_| async move { BlockProvider::subscribe_parent(&mailbox, &child).await });

context.sleep(Duration::from_millis(100)).await;
assert_eq!(
Expand Down
27 changes: 17 additions & 10 deletions consensus/src/marshal/standard/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use commonware_codec::Read;
use commonware_cryptography::{certificate::Scheme, Digestible, PublicKey};
use commonware_p2p::Recipients;
use commonware_utils::channel::oneshot;
use std::future::Future;

/// The standard variant of Marshal, which broadcasts complete blocks.
///
Expand Down Expand Up @@ -100,15 +101,21 @@ where
{
type Block = B;

async fn subscribe_parent(self, block: Self::Block) -> Option<Self::Block> {
let parent_height = block.height().previous()?;
self.subscribe_by_commitment(
block.parent(),
CommitmentFallback::FetchByCommitment {
height: parent_height,
},
)
.await
.ok()
fn subscribe_parent(
&self,
block: &Self::Block,
) -> impl Future<Output = Option<Self::Block>> + Send + 'static {
let receiver = block.height().previous().map(|parent_height| {
self.subscribe_by_commitment(
block.parent(),
CommitmentFallback::FetchByCommitment {
height: parent_height,
},
)
});
async move {
let receiver = receiver?;
receiver.await.ok()
}
}
}
33 changes: 32 additions & 1 deletion examples/reshare/src/orchestrator/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,34 @@ use rand_core::CryptoRngCore;
use std::{collections::BTreeMap, marker::PhantomData, num::NonZeroUsize, time::Duration};
use tracing::{debug, info, warn};

/// Returns the digest Simplex should use as the floor for `epoch`.
async fn floor_digest_for_epoch<S, H, C, V>(
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
epoch: Epoch,
marshal: &MarshalMailbox<S, Standard<Block<H, C, V>>>,
) -> H::Digest
where
S: Scheme,
H: Hasher,
C: Signer,
V: Variant,
{
let Some(previous_epoch) = epoch.previous() else {
return genesis::<H, C, V>().digest();
};

let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
let boundary_height = epocher
.last(previous_epoch)
.expect("previous epoch should exist");
marshal
.get_block(boundary_height)
.await
.unwrap_or_else(|| {
panic!("missing boundary block for epoch {epoch} at height {boundary_height}")
})
.digest()
}

/// Configuration for the orchestrator.
pub struct Config<B, V, C, H, A, S, L, T>
where
Expand Down Expand Up @@ -305,6 +333,9 @@ where
.context
.child("consensus_engine")
.with_attribute("epoch", epoch);
let floor = simplex::Floor::genesis(
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
floor_digest_for_epoch::<S, H, C, V>(epoch, &self.marshal).await,
);
let engine = simplex::Engine::new(
context,
simplex::Config {
Expand All @@ -317,7 +348,7 @@ where
partition: format!("{}_consensus_{}", self.partition_prefix, epoch),
mailbox_size: NZUsize!(1024),
epoch,
floor: simplex::Floor::genesis(genesis::<H, C, V>().digest()),
floor,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
leader_timeout: Duration::from_secs(1),
Expand Down
Loading