Skip to content

Commit 353b5ef

Browse files
committed
[consensus] Restore marshal ancestor stream symmetry
1 parent cb94eeb commit 353b5ef

5 files changed

Lines changed: 36 additions & 10 deletions

File tree

consensus/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
272272
fn propose(
273273
&mut self,
274274
context: (E, Self::Context),
275-
ancestry: impl Stream<Item = Self::Block> + Send,
275+
ancestry: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
276276
) -> impl Future<Output = Option<Self::Block>> + Send;
277277

278278
/// Verify a block produced by the application's proposer, relative to its ancestry.
@@ -287,7 +287,7 @@ stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
287287
fn verify(
288288
&mut self,
289289
context: (E, Self::Context),
290-
ancestry: impl Stream<Item = Self::Block> + Send,
290+
ancestry: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
291291
) -> impl Future<Output = bool> + Send;
292292
}
293293
});

consensus/src/marshal/coding/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ pub use marshaled::{Marshaled, MarshaledConfig};
6565
mod tests {
6666
use crate::{
6767
marshal::{
68+
ancestry::BlockProvider,
6869
coding::{
6970
marshaled::genesis_coding_commitment,
7071
shards,
@@ -115,6 +116,13 @@ mod tests {
115116
type TestCodedBlock = CodedBlock<CodingB, ReedSolomon<Sha256>, Sha256>;
116117
type CodingSendRecord = (Round, TestCodedBlock, Recipients<K>);
117118

119+
#[test]
120+
fn mailbox_provides_application_blocks() {
121+
fn assert_provider<P: BlockProvider<Block = CodingB>>() {}
122+
123+
assert_provider::<core::Mailbox<S, TestCodingVariant>>();
124+
}
125+
118126
/// A coding buffer that records subscriptions and never resolves them.
119127
#[derive(Clone, Default)]
120128
struct RecordingCodingBuffer {

consensus/src/marshal/coding/variant.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
use crate::{
22
marshal::{
3+
ancestry::BlockProvider,
34
coding::{
45
shards,
56
types::{CodedBlock, CodedBlockCfg, StoredCodedBlock},
67
},
7-
core::{Buffer, Variant},
8+
core::{Buffer, DigestFallback, Mailbox, Variant},
89
},
910
simplex::types::Context,
1011
types::{coding::Commitment, Round},
1112
CertifiableBlock,
1213
};
1314
use commonware_codec::Read;
1415
use commonware_coding::Scheme as CodingScheme;
15-
use commonware_cryptography::{Committable, Digestible, Hasher, PublicKey};
16+
use commonware_cryptography::{certificate::Scheme, Committable, Digestible, Hasher, PublicKey};
1617
use commonware_p2p::Recipients;
1718
use commonware_utils::channel::oneshot;
1819

@@ -113,3 +114,21 @@ where
113114
self.proposed(round, block);
114115
}
115116
}
117+
118+
impl<S, B, C, H, P> BlockProvider for Mailbox<S, Coding<B, C, H, P>>
119+
where
120+
S: Scheme,
121+
B: CertifiableBlock<Context = Context<Commitment, P>>,
122+
C: CodingScheme,
123+
H: Hasher,
124+
P: PublicKey,
125+
{
126+
type Block = B;
127+
128+
async fn subscribe(self, digest: B::Digest) -> Option<Self::Block> {
129+
self.subscribe_by_digest(digest, DigestFallback::Wait)
130+
.await
131+
.ok()
132+
.map(<Coding<B, C, H, P> as Variant>::into_inner)
133+
}
134+
}

consensus/src/marshal/mocks/verifying.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,15 @@ where
7575
async fn propose(
7676
&mut self,
7777
_context: (deterministic::Context, Self::Context),
78-
_ancestry: impl Stream<Item = Self::Block> + Send,
78+
_ancestry: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
7979
) -> Option<Self::Block> {
8080
self.propose_result.clone()
8181
}
8282

8383
async fn verify(
8484
&mut self,
8585
_context: (deterministic::Context, Self::Context),
86-
_ancestry: impl Stream<Item = Self::Block> + Send,
86+
_ancestry: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
8787
) -> bool {
8888
self.verify_result
8989
}
@@ -136,15 +136,15 @@ where
136136
async fn propose(
137137
&mut self,
138138
_context: (deterministic::Context, Self::Context),
139-
_ancestry: impl Stream<Item = Self::Block> + Send,
139+
_ancestry: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
140140
) -> Option<Self::Block> {
141141
None
142142
}
143143

144144
async fn verify(
145145
&mut self,
146146
_context: (deterministic::Context, Self::Context),
147-
_ancestry: impl Stream<Item = Self::Block> + Send,
147+
_ancestry: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
148148
) -> bool {
149149
if let Some(started) = self.started.lock().take() {
150150
started.send_lossy(());

examples/reshare/src/application/core.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,9 @@ where
8888
async fn propose(
8989
&mut self,
9090
(_, context): (E, Self::Context),
91-
ancestry: impl Stream<Item = Self::Block> + Send,
91+
mut ancestry: impl Stream<Item = Self::Block> + Send + Unpin + 'static,
9292
) -> Option<Self::Block> {
9393
// Fetch the parent block from the ancestry stream.
94-
futures::pin_mut!(ancestry);
9594
let parent_block = ancestry.next().await?;
9695
let parent_commitment = parent_block.commitment();
9796

0 commit comments

Comments
 (0)