Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
fn propose(
&mut self,
context: (E, Self::Context),
ancestry: impl Stream<Item = Self::Block> + Send,
ancestry: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
) -> impl Future<Output = Option<Self::Block>> + Send;

/// Verify a block produced by the application's proposer, relative to its ancestry.
Expand All @@ -287,7 +287,7 @@ stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
fn verify(
&mut self,
context: (E, Self::Context),
ancestry: impl Stream<Item = Self::Block> + Send,
ancestry: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
) -> impl Future<Output = bool> + Send;
}
});
7 changes: 7 additions & 0 deletions consensus/src/marshal/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub use marshaled::{Marshaled, MarshaledConfig};
mod tests {
use crate::{
marshal::{
ancestry::BlockProvider,
coding::{
marshaled::genesis_coding_commitment,
shards,
Expand Down Expand Up @@ -115,6 +116,12 @@ mod tests {
type TestCodedBlock = CodedBlock<CodingB, ReedSolomon<Sha256>, Sha256>;
type CodingSendRecord = (Round, TestCodedBlock, Recipients<K>);

#[test]
fn mailbox_provides_application_blocks() {
Comment thread
clabby marked this conversation as resolved.
fn assert_provider<P: BlockProvider<Block = CodingB>>() {}
assert_provider::<core::Mailbox<S, TestCodingVariant>>();
}

/// A coding buffer that records subscriptions and never resolves them.
#[derive(Clone, Default)]
struct RecordingCodingBuffer {
Expand Down
23 changes: 21 additions & 2 deletions consensus/src/marshal/coding/variant.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use crate::{
marshal::{
ancestry::BlockProvider,
coding::{
shards,
types::{CodedBlock, CodedBlockCfg, StoredCodedBlock},
},
core::{Buffer, Variant},
core::{Buffer, DigestFallback, Mailbox, Variant},
},
simplex::types::Context,
types::{coding::Commitment, Round},
CertifiableBlock,
};
use commonware_codec::Read;
use commonware_coding::Scheme as CodingScheme;
use commonware_cryptography::{Committable, Digestible, Hasher, PublicKey};
use commonware_cryptography::{certificate::Scheme, Committable, Digestible, Hasher, PublicKey};
use commonware_p2p::Recipients;
use commonware_utils::channel::oneshot;

Expand Down Expand Up @@ -113,3 +114,21 @@ where
self.proposed(round, block);
}
}

impl<S, B, C, H, P> BlockProvider for Mailbox<S, Coding<B, C, H, P>>
where
S: Scheme,
B: CertifiableBlock<Context = Context<Commitment, P>>,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
type Block = B;

async fn subscribe(self, digest: B::Digest) -> Option<Self::Block> {
self.subscribe_by_digest(digest, DigestFallback::Wait)
Comment thread
clabby marked this conversation as resolved.
Outdated
.await
.ok()
.map(<Coding<B, C, H, P> as Variant>::into_inner)
}
}
8 changes: 4 additions & 4 deletions consensus/src/marshal/mocks/verifying.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ where
async fn propose(
&mut self,
_context: (deterministic::Context, Self::Context),
_ancestry: impl Stream<Item = Self::Block> + Send,
_ancestry: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
) -> Option<Self::Block> {
self.propose_result.clone()
}

async fn verify(
&mut self,
_context: (deterministic::Context, Self::Context),
_ancestry: impl Stream<Item = Self::Block> + Send,
_ancestry: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
) -> bool {
self.verify_result
}
Expand Down Expand Up @@ -136,15 +136,15 @@ where
async fn propose(
&mut self,
_context: (deterministic::Context, Self::Context),
_ancestry: impl Stream<Item = Self::Block> + Send,
_ancestry: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
) -> Option<Self::Block> {
None
}

async fn verify(
&mut self,
_context: (deterministic::Context, Self::Context),
_ancestry: impl Stream<Item = Self::Block> + Send,
_ancestry: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
) -> bool {
if let Some(started) = self.started.lock().take() {
started.send_lossy(());
Expand Down
7 changes: 7 additions & 0 deletions consensus/src/marshal/standard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ mod tests {
use super::{Deferred, Inline, Standard};
use crate::{
marshal::{
ancestry::BlockProvider,
config::Config,
core::{cache, Actor, CommitmentFallback, Mailbox},
mocks::{
Expand Down Expand Up @@ -105,6 +106,12 @@ mod tests {
time::Duration,
};

#[test]
fn mailbox_provides_application_blocks() {
fn assert_provider<P: BlockProvider<Block = B>>() {}
assert_provider::<Mailbox<S, Standard<B>>>();
}

fn assert_finalize_deterministic<H: TestHarness>(
seed: u64,
link: commonware_p2p::simulated::Link,
Expand Down
21 changes: 19 additions & 2 deletions consensus/src/marshal/standard/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
//! receives the full block directly from the proposer or via gossip.

use crate::{
marshal::core::{Buffer, Variant},
marshal::{
ancestry::BlockProvider,
core::{Buffer, DigestFallback, Mailbox, Variant},
},
types::Round,
Block,
};
use commonware_broadcast::{buffered, Broadcaster};
use commonware_codec::Read;
use commonware_cryptography::{Digestible, PublicKey};
use commonware_cryptography::{certificate::Scheme, Digestible, PublicKey};
use commonware_p2p::Recipients;
use commonware_utils::channel::oneshot;

Expand Down Expand Up @@ -89,3 +92,17 @@ where
Broadcaster::broadcast(self, recipients, block);
}
}

impl<S, B> BlockProvider for Mailbox<S, Standard<B>>
where
S: Scheme,
B: Block,
{
type Block = B;

async fn subscribe(self, digest: B::Digest) -> Option<Self::Block> {
self.subscribe_by_digest(digest, DigestFallback::Wait)
.await
.ok()
}
}
Comment thread
clabby marked this conversation as resolved.
5 changes: 2 additions & 3 deletions examples/reshare/src/application/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ where
async fn propose(
&mut self,
(_, context): (E, Self::Context),
ancestry: impl Stream<Item = Self::Block> + Send,
mut ancestry: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
) -> Option<Self::Block> {
// Fetch the parent block from the ancestry stream.
futures::pin_mut!(ancestry);
Comment thread
cursor[bot] marked this conversation as resolved.
let parent_block = ancestry.next().await?;
let parent_commitment = parent_block.commitment();

Expand All @@ -114,7 +113,7 @@ where
async fn verify(
&mut self,
_: (E, Self::Context),
_: impl Stream<Item = Self::Block> + Send,
_: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
) -> bool {
// We wrap this application with `Marshaled`, which handles ancestry
// verification (parent commitment and height contiguity).
Expand Down
Loading