Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ stability_scope!(ALPHA {
pub mod ordered_broadcast;
});
stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
use crate::marshal::ancestry::AncestryStream;
use commonware_cryptography::certificate::Scheme;
use futures::Stream;
use commonware_runtime::{Clock, Metrics, Spawner};
use rand::Rng;

Expand Down Expand Up @@ -272,7 +272,7 @@ stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
fn propose(
&mut self,
context: (E, Self::Context),
ancestry: impl Stream<Item = Self::Block> + Send,
ancestry: impl AncestryStream<Block = Self::Block>,
) -> impl Future<Output = Option<Self::Block>> + Send;

/// Verify a block produced by the application's proposer, relative to its ancestry.
Expand All @@ -287,7 +287,7 @@ stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
fn verify(
&mut self,
context: (E, Self::Context),
ancestry: impl Stream<Item = Self::Block> + Send,
ancestry: impl AncestryStream<Block = Self::Block>,
) -> impl Future<Output = bool> + Send;
}
});
26 changes: 23 additions & 3 deletions consensus/src/marshal/ancestry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{types::Height, Block, Heightable};
use commonware_cryptography::Digestible;
use futures::{
future::{BoxFuture, OptionFuture},
FutureExt, Stream,
FutureExt, Stream, StreamExt,
};
use pin_project::pin_project;
use std::{
Expand Down Expand Up @@ -54,6 +54,27 @@ pub trait BlockProvider: Clone + Send + 'static {
}
}

/// A stream over a block's ancestry.
pub trait AncestryStream: Send + 'static {
/// The block type yielded by the stream.
type Block: Block;

/// Returns the next block in the ancestry.
fn next(&mut self) -> impl Future<Output = Option<Self::Block>> + Send;
}

impl<S, B> AncestryStream for S
where
S: Stream<Item = B> + Send + Unpin + 'static,
B: Block,
{
type Block = B;

async fn next(&mut self) -> Option<Self::Block> {
StreamExt::next(self).await
}
}

/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
///
// TODO(<https://github.com/commonwarexyz/monorepo/issues/2982>): Once marshal can also yield the genesis block,
Expand Down Expand Up @@ -180,7 +201,6 @@ mod test {
use crate::marshal::mocks::block::Block;
use commonware_cryptography::{sha256::Digest as Sha256Digest, Digest, Sha256};
use commonware_macros::test_async;
use futures::StreamExt;

#[derive(Default, Clone)]
struct MockProvider(Vec<Block<Sha256Digest, ()>>);
Expand Down Expand Up @@ -220,7 +240,7 @@ mod test {
async fn test_empty_yields_none() {
let mut stream: AncestorStream<MockProvider> =
AncestorStream::new(MockProvider::default(), vec![]);
assert_eq!(stream.next().await, None);
assert_eq!(AncestryStream::next(&mut stream).await, None);
}

#[test_async]
Expand Down
81 changes: 81 additions & 0 deletions consensus/src/marshal/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub use marshaled::{Marshaled, MarshaledConfig};
mod tests {
use crate::{
marshal::{
ancestry::AncestryStream,
coding::{
marshaled::genesis_coding_commitment,
shards,
Expand Down Expand Up @@ -809,6 +810,86 @@ mod tests {
harness::ancestry_stream::<CodingHarness>();
}

#[test_traced("WARN")]
fn test_coding_ancestry_stream_fetches_parent_by_commitment() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let provider = ConstantProvider::new(schemes[0].clone());
let buffer = RecordingCodingBuffer::default();
let (marshal, resolver, _actor_handle) = start_coding_actor_with_recording(
context.child("actor_stack"),
"coding-ancestry-parent-by-commitment",
provider,
buffer.clone(),
)
.await;

let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let genesis = genesis_block();
let genesis_parent_commitment = genesis_coding_commitment::<Sha256, _>(&genesis);

let parent_round = Round::new(Epoch::zero(), View::new(1));
let parent_ctx = CodingCtx {
round: parent_round,
leader: participants[0].clone(),
parent: (View::zero(), genesis_parent_commitment),
};
let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
let parent_coded: TestCodedBlock =
CodedBlock::new(parent.clone(), coding_config, &Sequential);
let parent_commitment = parent_coded.commitment();

let child_ctx = CodingCtx {
round: Round::new(Epoch::zero(), View::new(2)),
leader: participants[0].clone(),
parent: (View::new(1), parent_commitment),
};
let child = make_coding_block(child_ctx, parent.digest(), Height::new(2), 200);
let child_coded: TestCodedBlock =
CodedBlock::new(child.clone(), coding_config, &Sequential);

resolver.respond_to_next_fetch(parent_coded.encode());
let mut ancestry = marshal.ancestor_stream([child_coded]);
let first = AncestryStream::next(&mut ancestry)
.await
.expect("child should be yielded");
assert_eq!(first.digest(), child.digest());

let second = select! {
block = AncestryStream::next(&mut ancestry) => {
block.expect("parent should be fetched by commitment")
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("parent fetch did not resolve")
},
};
assert_eq!(second.digest(), parent.digest());
assert!(
resolver.wait_for_delivery_response().await,
"certified parent delivery should validate"
);
assert!(
resolver.fetches().iter().any(|fetch| matches!(
(&fetch.key, &fetch.subscriber),
(
handler::Key::Block(commitment),
handler::Annotation::Certified { height },
) if commitment == &parent_commitment && *height == Height::new(1)
)),
"ancestry should fetch the parent by the coded parent commitment"
);
assert!(
buffer.subscription_count() > 0,
"missing parent should register a local buffer subscription"
);
});
}

#[test_traced("WARN")]
fn test_coding_finalize_same_height_different_views() {
harness::finalize_same_height_different_views::<CodingHarness>();
Expand Down
35 changes: 30 additions & 5 deletions consensus/src/marshal/core/mailbox.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::Variant;
use crate::{
marshal::{
ancestry::{AncestorStream, BlockProvider},
ancestry::{AncestorStream, AncestryStream, BlockProvider},
Identifier,
},
simplex::types::{Activity, Finalization, Notarization},
Expand Down Expand Up @@ -538,7 +538,9 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
pub(crate) fn ancestor_stream<I>(
&self,
initial: I,
) -> impl Stream<Item = V::ApplicationBlock> + Send + use<S, V, I>
) -> impl AncestryStream<Block = V::ApplicationBlock>
+ Stream<Item = V::ApplicationBlock>
+ use<S, V, I>
where
I: IntoIterator<Item = V::Block>,
{
Expand Down Expand Up @@ -690,7 +692,11 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
pub async fn ancestry(
&self,
(fallback, start_digest): (DigestFallback, <V::Block as Digestible>::Digest),
) -> Option<impl Stream<Item = V::ApplicationBlock> + Send + use<S, V>> {
) -> Option<
impl AncestryStream<Block = V::ApplicationBlock>
+ Stream<Item = V::ApplicationBlock>
+ use<S, V>,
> {
let receiver = self.subscribe_by_digest(start_digest, fallback);
receiver
.await
Expand Down Expand Up @@ -828,15 +834,22 @@ impl<S: Scheme, V: Variant> Reporter for Mailbox<S, V> {
mod tests {
use super::*;
use crate::{
marshal::{mocks::harness, standard::Standard},
marshal::{
coding::{types::CodedBlock, Coding},
mocks::harness,
standard::Standard,
},
types::{Epoch, View},
Heightable,
};
use commonware_cryptography::{ed25519::PrivateKey, Digest as _, Signer as _};
use commonware_coding::ReedSolomon;
use commonware_cryptography::{ed25519::PrivateKey, sha256::Sha256, Digest as _, Signer as _};
use commonware_utils::channel::oneshot::error::TryRecvError;

type TestMessage = Message<harness::S, Standard<harness::B>>;
type TestPending = Pending<harness::S, Standard<harness::B>>;
type TestCodingVariant = Coding<harness::CodingB, ReedSolomon<Sha256>, Sha256, harness::K>;
type TestCodedBlock = CodedBlock<harness::CodingB, ReedSolomon<Sha256>, Sha256>;

fn public_key(seed: u64) -> harness::K {
PrivateKey::from_seed(seed).public_key()
Expand All @@ -854,6 +867,18 @@ mod tests {
<Standard<harness::B> as Variant>::commitment(&block(height))
}

#[test]
fn ancestry_provider_walks_standard_blocks() {
fn assert_provider<P: BlockProvider<Block = harness::B>>() {}
assert_provider::<AncestryProvider<harness::S, Standard<harness::B>>>();
}

#[test]
fn ancestry_provider_walks_coded_blocks() {
fn assert_provider<P: BlockProvider<Block = TestCodedBlock>>() {}
assert_provider::<AncestryProvider<harness::S, TestCodingVariant>>();
}

fn get_info(height: u64) -> (TestMessage, oneshot::Receiver<Option<(Height, harness::D)>>) {
let (response, receiver) = oneshot::channel();
(
Expand Down
11 changes: 5 additions & 6 deletions consensus/src/marshal/mocks/verifying.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
//! `Application` trait, suitable for testing the `Marshaled` wrapper in
//! both standard and coding variants.

use crate::{CertifiableBlock, Epochable};
use crate::{marshal::ancestry::AncestryStream, CertifiableBlock, Epochable};
use commonware_runtime::deterministic;
use commonware_utils::{
channel::{fallible::OneshotExt, oneshot},
sync::Mutex,
};
use futures::Stream;
use std::{marker::PhantomData, sync::Arc};

/// A mock application that implements `Application` for testing.
Expand Down Expand Up @@ -75,15 +74,15 @@ where
async fn propose(
&mut self,
_context: (deterministic::Context, Self::Context),
_ancestry: impl Stream<Item = Self::Block> + Send,
_ancestry: impl AncestryStream<Block = Self::Block>,
) -> Option<Self::Block> {
self.propose_result.clone()
}

async fn verify(
&mut self,
_context: (deterministic::Context, Self::Context),
_ancestry: impl Stream<Item = Self::Block> + Send,
_ancestry: impl AncestryStream<Block = Self::Block>,
) -> bool {
self.verify_result
}
Expand Down Expand Up @@ -136,15 +135,15 @@ where
async fn propose(
&mut self,
_context: (deterministic::Context, Self::Context),
_ancestry: impl Stream<Item = Self::Block> + Send,
_ancestry: impl AncestryStream<Block = Self::Block>,
) -> Option<Self::Block> {
None
}

async fn verify(
&mut self,
_context: (deterministic::Context, Self::Context),
_ancestry: impl Stream<Item = Self::Block> + Send,
_ancestry: impl AncestryStream<Block = Self::Block>,
) -> bool {
if let Some(started) = self.started.lock().take() {
started.send_lossy(());
Expand Down
59 changes: 59 additions & 0 deletions consensus/src/marshal/standard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ mod tests {
use super::{Deferred, Inline, Standard};
use crate::{
marshal::{
ancestry::AncestryStream,
config::Config,
core::{cache, Actor, CommitmentFallback, Mailbox},
mocks::{
Expand Down Expand Up @@ -1190,6 +1191,64 @@ mod tests {
harness::ancestry_stream::<DeferredHarness>();
}

#[test_traced("WARN")]
fn test_standard_ancestry_stream_fetches_parent_by_commitment() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture { schemes, .. } =
bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let buffer = RecordingBuffer::default();
let (marshal, buffer, resolver, _actor_handle) = start_standard_actor(
context.child("validator"),
"standard-ancestry-parent-by-commitment",
ConstantProvider::new(schemes[0].clone()),
Application::<B>::manual_ack(),
buffer,
)
.await;

let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
let child = make_raw_block(parent.digest(), Height::new(2), 200);
let parent_commitment = parent.digest();

resolver.respond_to_next_fetch(parent.encode());
let mut ancestry = marshal.ancestor_stream([child.clone()]);
let first = AncestryStream::next(&mut ancestry)
.await
.expect("child should be yielded");
assert_eq!(first.digest(), child.digest());

let second = select! {
block = AncestryStream::next(&mut ancestry) => {
block.expect("parent should be fetched by commitment")
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("parent fetch did not resolve")
},
};
assert_eq!(second.digest(), parent_commitment);
assert!(
resolver.wait_for_delivery_response().await,
"certified parent delivery should validate"
);
assert!(
resolver.fetches().iter().any(|fetch| matches!(
(&fetch.key, &fetch.subscriber),
(
handler::Key::Block(commitment),
handler::Annotation::Certified { height },
) if commitment == &parent_commitment && *height == Height::new(1)
)),
"ancestry should fetch the parent by the standard parent commitment"
);
assert!(
buffer.subscription_count() > 0,
"missing parent should register a local buffer subscription"
);
});
}

#[test_traced("WARN")]
fn test_standard_finalize_same_height_different_views() {
harness::finalize_same_height_different_views::<InlineHarness>();
Expand Down
Loading
Loading