Skip to content

Commit c1cd5a3

Browse files
[consensus/marshal] commonware-glue marshal changes (#3764)
Co-authored-by: Patrick O'Grady <me@patrickogrady.xyz>
1 parent 3c08216 commit c1cd5a3

3 files changed

Lines changed: 53 additions & 12 deletions

File tree

consensus/src/marshal/ancestry.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,10 @@ use std::{
1414
};
1515

1616
/// A stream of blocks used by application propose and verify calls.
17-
pub trait Ancestry<B: Block>: Stream<Item = B> + Send + Unpin + 'static {}
18-
19-
impl<T, B> Ancestry<B> for T
20-
where
21-
T: Stream<Item = B> + Send + Unpin + 'static,
22-
B: Block,
23-
{
17+
pub trait Ancestry<B: Block>: Stream<Item = B> + Send + Unpin + 'static {
18+
/// Peeks at the latest block in the stream without consuming it. Returns [None]
19+
/// if the stream does not yet have a block available or has been exhausted.
20+
fn peek(&self) -> Option<&B>;
2421
}
2522

2623
/// An interface for providing parent blocks.
@@ -115,6 +112,21 @@ impl<M: BlockProvider> AncestorStream<M> {
115112
pending: None.into(),
116113
}
117114
}
115+
116+
/// Peeks at the latest block in the stream without consuming it. Returns [None]
117+
/// if the stream does not yet have a block available or has been exhausted.
118+
pub fn peek(&self) -> Option<&M::Block> {
119+
self.buffered.last()
120+
}
121+
}
122+
123+
impl<M> Ancestry<M::Block> for AncestorStream<M>
124+
where
125+
M: BlockProvider,
126+
{
127+
fn peek(&self) -> Option<&M::Block> {
128+
Self::peek(self)
129+
}
118130
}
119131

120132
impl<M> Stream for AncestorStream<M>
@@ -290,6 +302,17 @@ mod test {
290302
let _ = futures::Stream::poll_next(stream.as_mut(), &mut cx);
291303
}
292304

305+
#[test]
306+
fn test_peek_available_through_ancestry_trait() {
307+
fn peek_height(ancestry: impl Ancestry<Block<Sha256Digest, ()>>) -> Option<Height> {
308+
ancestry.peek().map(Heightable::height)
309+
}
310+
311+
let block = Block::new::<Sha256>((), Sha256Digest::EMPTY, Height::new(1), 1);
312+
let stream = AncestorStream::new(MockProvider::default(), [block.clone()]);
313+
assert_eq!(peek_height(stream), Some(block.height()));
314+
}
315+
293316
#[test_async]
294317
async fn test_yields_genesis_and_stops() {
295318
let genesis = Block::new::<Sha256>((), Sha256Digest::EMPTY, Height::zero(), 0);

consensus/src/marshal/core/actor.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,9 @@ where
692692
let finalization = self.get_finalization_by_height(height).await;
693693
response.send_lossy(finalization);
694694
}
695+
Message::GetProcessedHeight { response } => {
696+
response.send_lossy(self.floor.processed_height());
697+
}
695698
Message::HintFinalized { height, targets } => {
696699
// Skip if finalization is already available locally.
697700
if self.get_finalization_by_height(height).await.is_some() {

consensus/src/marshal/core/mailbox.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::Variant;
22
use crate::{
33
marshal::{
4-
ancestry::{AncestorStream, BlockProvider},
4+
ancestry::{AncestorStream, Ancestry, BlockProvider},
55
Identifier,
66
},
77
simplex::types::{Activity, Finalization, Notarization},
@@ -15,7 +15,6 @@ use commonware_actor::{
1515
use commonware_cryptography::{certificate::Scheme, Digestible};
1616
use commonware_p2p::Recipients;
1717
use commonware_utils::{channel::oneshot, vec::NonEmptyVec};
18-
use futures::Stream;
1918
use std::collections::{btree_map::Entry, BTreeMap, VecDeque};
2019

2120
/// Messages sent to the marshal [Actor](super::Actor).
@@ -49,6 +48,11 @@ pub(crate) enum Message<S: Scheme, V: Variant> {
4948
/// A channel to send the retrieved finalization.
5049
response: oneshot::Sender<Option<Finalization<S, V::Commitment>>>,
5150
},
51+
/// A request to retrieve the latest processed height.
52+
GetProcessedHeight {
53+
/// A channel to send the latest processed height.
54+
response: oneshot::Sender<Height>,
55+
},
5256
/// A hint that a finalized block may be available at a given height.
5357
///
5458
/// This triggers a network fetch if the finalization is not available locally.
@@ -253,7 +257,8 @@ impl<S: Scheme, V: Variant> Message<S, V> {
253257
| Self::GetInfo {
254258
identifier: Identifier::Digest(_) | Identifier::Latest,
255259
..
256-
} => false,
260+
}
261+
| Self::GetProcessedHeight { .. } => false,
257262
Self::HintNotarized { .. } => false,
258263
Self::SubscribeByDigest { .. }
259264
| Self::SubscribeByCommitment { .. }
@@ -273,6 +278,7 @@ impl<S: Scheme, V: Variant> Message<S, V> {
273278
response.is_closed()
274279
}
275280
Self::GetFinalization { response, .. } => response.is_closed(),
281+
Self::GetProcessedHeight { response } => response.is_closed(),
276282
Self::SubscribeByDigest { response, .. }
277283
| Self::SubscribeByCommitment { response, .. } => response.is_closed(),
278284
Self::HintNotarized { .. } => false,
@@ -532,7 +538,7 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
532538
pub(crate) fn ancestor_stream<I>(
533539
&self,
534540
initial: I,
535-
) -> impl Stream<Item = V::ApplicationBlock> + Send + use<S, V, I>
541+
) -> impl Ancestry<V::ApplicationBlock> + use<S, V, I>
536542
where
537543
Self: BlockProvider<Block = V::ApplicationBlock>,
538544
I: IntoIterator<Item = V::Block>,
@@ -579,6 +585,15 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
579585
receiver.await.ok().flatten()
580586
}
581587

588+
/// Retrieve the latest processed height.
589+
pub async fn get_processed_height(&self) -> Option<Height> {
590+
let (response, receiver) = oneshot::channel();
591+
let _ = self
592+
.sender
593+
.enqueue(Message::GetProcessedHeight { response });
594+
receiver.await.ok()
595+
}
596+
582597
/// Hints that a finalized block may be available at the given height.
583598
///
584599
/// This method will request the finalization from the network via the resolver
@@ -679,7 +694,7 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
679694
pub async fn ancestry(
680695
&self,
681696
(fallback, start_digest): (DigestFallback, <V::Block as Digestible>::Digest),
682-
) -> Option<impl Stream<Item = V::ApplicationBlock> + Send + use<S, V>>
697+
) -> Option<impl Ancestry<V::ApplicationBlock> + use<S, V>>
683698
where
684699
Self: BlockProvider<Block = V::ApplicationBlock>,
685700
{

0 commit comments

Comments
 (0)