Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions consensus/src/marshal/ancestry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@ use std::{
};

/// A stream of blocks used by application propose and verify calls.
pub trait Ancestry<B: Block>: Stream<Item = B> + Send + Unpin + 'static {}

impl<T, B> Ancestry<B> for T
where
T: Stream<Item = B> + Send + Unpin + 'static,
B: Block,
{
pub trait Ancestry<B: Block>: Stream<Item = B> + Send + Unpin + 'static {
/// Peeks at the latest block in the stream without consuming it. Returns [None]
/// if the stream does not yet have a block available or has been exhausted.
fn peek(&self) -> Option<&B>;
}

/// An interface for providing parent blocks.
Expand Down Expand Up @@ -115,6 +112,21 @@ impl<M: BlockProvider> AncestorStream<M> {
pending: None.into(),
}
}

/// Peeks at the latest block in the stream without consuming it. Returns [None]
/// if the stream does not yet have a block available or has been exhausted.
pub fn peek(&self) -> Option<&M::Block> {
self.buffered.last()
}
}

impl<M> Ancestry<M::Block> for AncestorStream<M>
where
M: BlockProvider,
{
fn peek(&self) -> Option<&M::Block> {
Self::peek(self)
}
}

impl<M> Stream for AncestorStream<M>
Expand Down Expand Up @@ -290,6 +302,17 @@ mod test {
let _ = futures::Stream::poll_next(stream.as_mut(), &mut cx);
}

#[test]
fn test_peek_available_through_ancestry_trait() {
fn peek_height(ancestry: impl Ancestry<Block<Sha256Digest, ()>>) -> Option<Height> {
ancestry.peek().map(Heightable::height)
}

let block = Block::new::<Sha256>((), Sha256Digest::EMPTY, Height::new(1), 1);
let stream = AncestorStream::new(MockProvider::default(), [block.clone()]);
assert_eq!(peek_height(stream), Some(block.height()));
}

#[test_async]
async fn test_yields_genesis_and_stops() {
let genesis = Block::new::<Sha256>((), Sha256Digest::EMPTY, Height::zero(), 0);
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/marshal/core/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,9 @@ where
let finalization = self.get_finalization_by_height(height).await;
response.send_lossy(finalization);
}
Message::GetProcessedHeight { response } => {
response.send_lossy(self.floor.processed_height());
}
Message::HintFinalized { height, targets } => {
// Skip if finalization is already available locally.
if self.get_finalization_by_height(height).await.is_some() {
Expand Down
25 changes: 20 additions & 5 deletions consensus/src/marshal/core/mailbox.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::Variant;
use crate::{
marshal::{
ancestry::{AncestorStream, BlockProvider},
ancestry::{AncestorStream, Ancestry, BlockProvider},
Identifier,
},
simplex::types::{Activity, Finalization, Notarization},
Expand All @@ -15,7 +15,6 @@ use commonware_actor::{
use commonware_cryptography::{certificate::Scheme, Digestible};
use commonware_p2p::Recipients;
use commonware_utils::{channel::oneshot, vec::NonEmptyVec};
use futures::Stream;
use std::collections::{btree_map::Entry, BTreeMap, VecDeque};

/// Messages sent to the marshal [Actor](super::Actor).
Expand Down Expand Up @@ -49,6 +48,11 @@ pub(crate) enum Message<S: Scheme, V: Variant> {
/// A channel to send the retrieved finalization.
response: oneshot::Sender<Option<Finalization<S, V::Commitment>>>,
},
/// A request to retrieve the latest processed height.
GetProcessedHeight {
/// A channel to send the latest processed height.
response: oneshot::Sender<Height>,
},
/// A hint that a finalized block may be available at a given height.
///
/// This triggers a network fetch if the finalization is not available locally.
Expand Down Expand Up @@ -253,7 +257,8 @@ impl<S: Scheme, V: Variant> Message<S, V> {
| Self::GetInfo {
identifier: Identifier::Digest(_) | Identifier::Latest,
..
} => false,
}
| Self::GetProcessedHeight { .. } => false,
Self::HintNotarized { .. } => false,
Self::SubscribeByDigest { .. }
Comment thread
cursor[bot] marked this conversation as resolved.
| Self::SubscribeByCommitment { .. }
Expand All @@ -273,6 +278,7 @@ impl<S: Scheme, V: Variant> Message<S, V> {
response.is_closed()
}
Self::GetFinalization { response, .. } => response.is_closed(),
Self::GetProcessedHeight { response } => response.is_closed(),
Self::SubscribeByDigest { response, .. }
| Self::SubscribeByCommitment { response, .. } => response.is_closed(),
Self::HintNotarized { .. } => false,
Expand Down Expand Up @@ -533,7 +539,7 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
pub(crate) fn ancestor_stream<I>(
&self,
initial: I,
) -> impl Stream<Item = V::ApplicationBlock> + Send + use<S, V, I>
) -> impl Ancestry<V::ApplicationBlock> + use<S, V, I>
where
Self: BlockProvider<Block = V::ApplicationBlock>,
I: IntoIterator<Item = V::Block>,
Expand Down Expand Up @@ -580,6 +586,15 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
receiver.await.ok().flatten()
}

/// Retrieve the latest processed height.
pub async fn get_processed_height(&self) -> Option<Height> {
let (response, receiver) = oneshot::channel();
let _ = self
.sender
.enqueue(Message::GetProcessedHeight { response });
receiver.await.ok()
}

/// Hints that a finalized block may be available at the given height.
///
/// This method will request the finalization from the network via the resolver
Expand Down Expand Up @@ -680,7 +695,7 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
pub async fn ancestry(
&self,
(fallback, start_digest): (DigestFallback, <V::Block as Digestible>::Digest),
) -> Option<impl Stream<Item = V::ApplicationBlock> + Send + use<S, V>>
) -> Option<impl Ancestry<V::ApplicationBlock> + use<S, V>>
where
Self: BlockProvider<Block = V::ApplicationBlock>,
{
Expand Down
Loading