diff --git a/consensus/src/marshal/ancestry.rs b/consensus/src/marshal/ancestry.rs index 053758b605..9521f1a6ca 100644 --- a/consensus/src/marshal/ancestry.rs +++ b/consensus/src/marshal/ancestry.rs @@ -9,6 +9,7 @@ use futures::{ }; use pin_project::pin_project; use std::{ + collections::VecDeque, future::Future, pin::Pin, sync::Arc, @@ -22,6 +23,39 @@ pub trait Ancestry: Stream + Send + Unpin + 'static { fn peek(&self) -> Option<&B>; } +/// Creates an ancestry stream from a fixed sequence of blocks. +/// +/// Blocks are yielded in iterator order and no parent fetching is performed. This is useful when +/// the caller wants to bound the ancestry available to the application. +pub fn from_iter(blocks: impl IntoIterator) -> impl Ancestry +where + B: Block, +{ + BoundedAncestry { + blocks: blocks.into_iter().collect(), + } +} + +struct BoundedAncestry { + blocks: VecDeque, +} + +impl Unpin for BoundedAncestry {} + +impl Ancestry for BoundedAncestry { + fn peek(&self) -> Option<&B> { + self.blocks.front() + } +} + +impl Stream for BoundedAncestry { + type Item = B; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(self.blocks.pop_front()) + } +} + /// An interface for providing parent blocks. pub trait BlockProvider: Send + 'static { /// The block type the provider walks. @@ -384,6 +418,34 @@ mod test { }); } + #[test] + fn test_from_iter_available_through_ancestry_trait() { + fn peek_height(ancestry: impl Ancestry>) -> Option { + ancestry.peek().map(Heightable::height) + } + + let block = Block::new::((), Sha256Digest::EMPTY, Height::new(1), 1); + let ancestry = from_iter([block.clone()]); + + assert_eq!(peek_height(ancestry), Some(block.height())); + } + + #[test] + fn test_from_iter_yields_blocks_in_order_and_peeks_next() { + deterministic::Runner::default().start(|_| async move { + let parent = Block::new::((), Sha256Digest::EMPTY, Height::new(1), 1); + let child = Block::new::((), parent.digest(), Height::new(2), 2); + let mut ancestry = from_iter([child.clone(), parent.clone()]); + + assert_eq!(ancestry.peek(), Some(&child)); + assert_eq!(ancestry.next().await, Some(child)); + assert_eq!(ancestry.peek(), Some(&parent)); + assert_eq!(ancestry.next().await, Some(parent)); + assert_eq!(ancestry.peek(), None); + assert_eq!(ancestry.next().await, None); + }); + } + #[test] fn test_yields_genesis_and_stops() { deterministic::Runner::default().start(|context| async move { diff --git a/consensus/src/marshal/coding/mod.rs b/consensus/src/marshal/coding/mod.rs index 06d461a45c..9f65b54f0a 100644 --- a/consensus/src/marshal/coding/mod.rs +++ b/consensus/src/marshal/coding/mod.rs @@ -156,19 +156,19 @@ mod tests { None } - fn subscribe_by_digest(&self, _digest: D) -> oneshot::Receiver { + fn subscribe_by_digest(&self, _digest: D) -> Option> { let (sender, receiver) = oneshot::channel(); self.digest_subscriptions.lock().push(sender); - receiver + Some(receiver) } fn subscribe_by_commitment( &self, _commitment: Commitment, - ) -> oneshot::Receiver { + ) -> Option> { let (sender, receiver) = oneshot::channel(); self.commitment_subscriptions.lock().push(sender); - receiver + Some(receiver) } fn finalized(&self, _commitment: Commitment) {} @@ -408,7 +408,7 @@ mod tests { let (resolver_rx, resolver) = RecordingResolver::holding(context.child("resolver")); let actor_handle = actor.start( Application::::default(), - Some(buffer), + buffer, (resolver_rx, resolver.clone()), ); (mailbox, resolver, actor_handle) diff --git a/consensus/src/marshal/coding/variant.rs b/consensus/src/marshal/coding/variant.rs index ea5b2bddfa..67f604cc2a 100644 --- a/consensus/src/marshal/coding/variant.rs +++ b/consensus/src/marshal/coding/variant.rs @@ -95,15 +95,15 @@ where fn subscribe_by_digest( &self, digest: as Digestible>::Digest, - ) -> oneshot::Receiver> { - self.subscribe_by_digest(digest) + ) -> Option>> { + Some(self.subscribe_by_digest(digest)) } fn subscribe_by_commitment( &self, commitment: Commitment, - ) -> oneshot::Receiver> { - self.subscribe(commitment) + ) -> Option>> { + Some(self.subscribe(commitment)) } fn finalized(&self, commitment: Commitment) { diff --git a/consensus/src/marshal/core/actor.rs b/consensus/src/marshal/core/actor.rs index de96a5b2a0..7763db1248 100644 --- a/consensus/src/marshal/core/actor.rs +++ b/consensus/src/marshal/core/actor.rs @@ -6,7 +6,7 @@ use super::{ mailbox::{CommitmentFallback, Mailbox, Message}, stream::Stream, subscriptions::{Key as SubscriptionKey, KeyFor as SubscriptionKeyFor, Subscriptions}, - variant::OptionalBuffer, + variant::NoBuffer, Buffer, Variant, }; use crate::{ @@ -296,7 +296,7 @@ where pub fn start( mut self, application: impl Reporter>, - buffer: Option, + buffer: Buf, resolver: (handler::Receiver, R), ) -> Handle<()> where @@ -307,15 +307,34 @@ where >, Buf: Buffer::PublicKey>, { - let buffer = OptionalBuffer::new(buffer); spawn_cell!(self.context, self.run(application, buffer, resolver)) } + /// Start the actor without a broadcast buffer. + pub fn start_unbuffered( + self, + application: impl Reporter>, + resolver: (handler::Receiver, R), + ) -> Handle<()> + where + R: Resolver< + Key = ResolverRequestFor, + Subscriber = Annotation, + PublicKey = ::PublicKey, + >, + { + self.start( + application, + NoBuffer::<::PublicKey>::new(), + resolver, + ) + } + /// Run the application actor. async fn run( mut self, mut application: impl Reporter>, - mut buffer: OptionalBuffer, + mut buffer: Buf, (mut resolver_rx, mut resolver): (handler::Receiver, R), ) where R: Resolver< @@ -436,7 +455,7 @@ where &mut self, result: ::Output, application: &mut impl Reporter>, - buffer: &mut OptionalBuffer, + buffer: &mut Buf, resolver: &mut R, ) where Buf: Buffer, @@ -491,7 +510,7 @@ where message: Message, resolver: &mut R, waiters: &mut AbortablePool>>, - buffer: &mut OptionalBuffer, + buffer: &mut Buf, application: &mut impl Reporter>, ) where Buf: Buffer::PublicKey>, @@ -764,7 +783,7 @@ where message: handler::Message, resolver_rx: &mut handler::Receiver, resolver: &mut R, - buffer: &mut OptionalBuffer, + buffer: &mut Buf, application: &mut impl Reporter>, ) where Buf: Buffer::PublicKey>, @@ -850,7 +869,7 @@ where &self, key: ResolverRequestFor, response: oneshot::Sender, - buffer: &OptionalBuffer, + buffer: &Buf, ) { match key { Key::Block(commitment) => { @@ -898,7 +917,7 @@ where PublicKey = ::PublicKey, >, waiters: &mut AbortablePool>>, - buffer: &mut OptionalBuffer, + buffer: &mut Buf, ) { let digest = match key { SubscriptionKey::Digest(digest) => digest, @@ -985,7 +1004,7 @@ where finalization: Finalization, skip_if_superseded: bool, resolver: &mut R, - buffer: &mut OptionalBuffer, + buffer: &mut Buf, application: &mut impl Reporter>, ) where Buf: Buffer::PublicKey>, @@ -1052,7 +1071,7 @@ where async fn apply_floor_anchor>( &mut self, block: &V::Block, - buffer: &mut OptionalBuffer, + buffer: &mut Buf, application: &mut impl Reporter>, resolver: &mut impl Resolver< Key = ResolverRequestFor, @@ -1171,7 +1190,7 @@ where &mut self, message: ResolverDelivery, delivers: &mut Vec>, - buffer: &mut OptionalBuffer, + buffer: &mut Buf, application: &mut impl Reporter>, resolver: &mut impl Resolver< Key = ResolverRequestFor, @@ -1349,7 +1368,7 @@ where async fn verify_delivered>( &mut self, mut delivers: Vec>, - buffer: &mut OptionalBuffer, + buffer: &mut Buf, application: &mut impl Reporter>, resolver: &mut impl Resolver< Key = ResolverRequestFor, @@ -1848,7 +1867,7 @@ where /// parent links). async fn find_block_by_digest>( &self, - buffer: &OptionalBuffer, + buffer: &Buf, digest: ::Digest, ) -> Option { if let Some(block) = buffer.find_by_digest(digest).await { @@ -1863,7 +1882,7 @@ where /// Having the full commitment may enable additional retrieval mechanisms. async fn find_block_by_commitment>( &self, - buffer: &OptionalBuffer, + buffer: &Buf, commitment: V::Commitment, ) -> Option { if let Some(block) = buffer.find_by_commitment(commitment).await { @@ -1888,7 +1907,7 @@ where /// needs a subsequent [`sync_finalized`](Self::sync_finalized). async fn try_repair_gaps>( &mut self, - buffer: &mut OptionalBuffer, + buffer: &mut Buf, resolver: &mut impl Resolver< Key = ResolverRequestFor, Subscriber = Annotation, diff --git a/consensus/src/marshal/core/mod.rs b/consensus/src/marshal/core/mod.rs index 177a1f34ae..7efbaf980d 100644 --- a/consensus/src/marshal/core/mod.rs +++ b/consensus/src/marshal/core/mod.rs @@ -31,10 +31,10 @@ //! // `last_height` is `None` until the application acknowledges a block. //! //! // Start with application and buffer -//! actor.start(application, Some(buffer), resolver); +//! actor.start(application, buffer, resolver); //! //! // Or omit broadcast buffering for follower-only chain tracking -//! actor.start(application, None::, resolver); +//! actor.start_unbuffered(application, resolver); //! ``` //! //! For standard mode, use [`crate::marshal::standard::Standard`] as the variant and diff --git a/consensus/src/marshal/core/subscriptions.rs b/consensus/src/marshal/core/subscriptions.rs index 02231e5ab1..9dc6e9b296 100644 --- a/consensus/src/marshal/core/subscriptions.rs +++ b/consensus/src/marshal/core/subscriptions.rs @@ -1,4 +1,4 @@ -use super::{variant::OptionalBuffer, Buffer, Variant}; +use super::{Buffer, Variant}; use commonware_cryptography::Digestible; use commonware_utils::{ channel::{fallible::OneshotExt, oneshot}, @@ -71,7 +71,7 @@ impl Subscriptions { key: KeyFor, response: oneshot::Sender, waiters: &mut AbortablePool>>, - buffer: &OptionalBuffer, + buffer: &Buf, ) { match self.entries.entry(key) { Entry::Occupied(mut entry) => { @@ -99,7 +99,7 @@ impl Subscriptions { mod tests { use super::*; use crate::{ - marshal::{mocks::block::Block, standard::Standard}, + marshal::{core::variant::NoBuffer, mocks::block::Block, standard::Standard}, types::{Height, Round}, }; use commonware_cryptography::{ @@ -147,16 +147,19 @@ mod tests { None } - fn subscribe_by_digest(&self, _digest: Digest) -> oneshot::Receiver { + fn subscribe_by_digest(&self, _digest: Digest) -> Option> { let (sender, receiver) = oneshot::channel(); self.digest_subscribers.lock().push(sender); - receiver + Some(receiver) } - fn subscribe_by_commitment(&self, _commitment: Digest) -> oneshot::Receiver { + fn subscribe_by_commitment( + &self, + _commitment: Digest, + ) -> Option> { let (sender, receiver) = oneshot::channel(); self.commitment_subscribers.lock().push(sender); - receiver + Some(receiver) } fn finalized(&self, _commitment: Digest) {} @@ -179,7 +182,7 @@ mod tests { #[test] fn insert_coalesces_duplicate_keys() { let test_buffer = TestBuffer::default(); - let buffer = OptionalBuffer::new(Some(test_buffer.clone())); + let buffer = test_buffer.clone(); let mut waiters = TestWaiters::default(); let mut subscriptions = Subscriptions::::new(); let block = block(1, 10); @@ -211,7 +214,7 @@ mod tests { #[test] fn notify_wakes_digest_and_commitment_subscribers() { let test_buffer = TestBuffer::default(); - let buffer = OptionalBuffer::new(Some(test_buffer.clone())); + let buffer = test_buffer.clone(); let mut waiters = TestWaiters::default(); let mut subscriptions = Subscriptions::::new(); let block = block(2, 20); @@ -243,7 +246,7 @@ mod tests { #[test] fn retain_open_drops_closed_subscribers_and_keeps_open_ones() { - let buffer = OptionalBuffer::new(Some(TestBuffer::default())); + let buffer = TestBuffer::default(); let mut waiters = TestWaiters::default(); let mut subscriptions = Subscriptions::::new(); let block = block(3, 30); @@ -279,7 +282,7 @@ mod tests { #[test] fn remove_drops_waiter_and_aborts_buffer_waiter() { deterministic::Runner::default().start(|context| async move { - let buffer = OptionalBuffer::new(Some(TestBuffer::default())); + let buffer = TestBuffer::default(); let mut waiters = TestWaiters::default(); let mut subscriptions = Subscriptions::::new(); let block = block(4, 40); @@ -307,16 +310,11 @@ mod tests { fn insert_without_buffer_keeps_local_subscriber() { let mut waiters = TestWaiters::default(); let mut subscriptions = Subscriptions::::new(); - let buffer = OptionalBuffer::::new(None); + let buffer = NoBuffer::::new(); let block = block(5, 50); let (sender, receiver) = oneshot::channel(); - subscriptions.insert::( - Key::Digest(block.digest()), - sender, - &mut waiters, - &buffer, - ); + subscriptions.insert(Key::Digest(block.digest()), sender, &mut waiters, &buffer); assert_eq!(subscriptions.entries.len(), 1); subscriptions.notify(&block); diff --git a/consensus/src/marshal/core/variant.rs b/consensus/src/marshal/core/variant.rs index 0c995400a7..5c5965c963 100644 --- a/consensus/src/marshal/core/variant.rs +++ b/consensus/src/marshal/core/variant.rs @@ -118,23 +118,28 @@ pub trait Buffer: Clone + Send + Sync + 'static { /// /// Returns a receiver that will resolve when the block becomes available. /// If the block is already cached, the receiver may resolve immediately. + /// Returns `None` when the buffer cannot provide availability notifications. /// /// The returned receiver can be dropped to cancel the subscription. fn subscribe_by_digest( &self, digest: ::Digest, - ) -> oneshot::Receiver; + ) -> Option>; /// Subscribe to a block's availability by its commitment. /// /// Returns a receiver that will resolve when the block becomes available. /// If the block is already cached, the receiver may resolve immediately. + /// Returns `None` when the buffer cannot provide availability notifications. /// /// Having the full commitment may enable additional retrieval mechanisms /// depending on the variant implementation. /// /// The returned receiver can be dropped to cancel the subscription. - fn subscribe_by_commitment(&self, commitment: V::Commitment) -> oneshot::Receiver; + fn subscribe_by_commitment( + &self, + commitment: V::Commitment, + ) -> Option>; /// Notify the buffer that a block has been finalized. /// @@ -145,76 +150,54 @@ pub trait Buffer: Clone + Send + Sync + 'static { fn send(&self, round: Round, block: V::Block, recipients: Recipients); } -/// A broadcast buffer that may be absent. -pub(super) struct OptionalBuffer { - inner: Option, - _variant: PhantomData V>, +/// A buffer implementation that never stores, subscribes, finalizes, or sends blocks. +pub(super) struct NoBuffer

{ + _public_key: PhantomData P>, } -impl OptionalBuffer { - /// Create an optional buffer. - pub(super) const fn new(inner: Option) -> Self { +impl

NoBuffer

{ + pub(super) const fn new() -> Self { Self { - inner, - _variant: PhantomData, + _public_key: PhantomData, } } } -impl OptionalBuffer +impl

Clone for NoBuffer

{ + fn clone(&self) -> Self { + *self + } +} + +impl

Copy for NoBuffer

{} + +impl Buffer for NoBuffer

where V: Variant, - Buf: Buffer, + P: PublicKey, { - /// Attempt to find a block by its digest. - pub(super) async fn find_by_digest( - &self, - digest: ::Digest, - ) -> Option { - self.inner.as_ref()?.find_by_digest(digest).await - } + type PublicKey = P; - /// Attempt to find a block by its commitment. - pub(super) async fn find_by_commitment(&self, commitment: V::Commitment) -> Option { - self.inner.as_ref()?.find_by_commitment(commitment).await + async fn find_by_digest(&self, _: ::Digest) -> Option { + None } - /// Subscribe to a block's availability by its digest, if a buffer exists. - pub(super) fn subscribe_by_digest( - &self, - digest: ::Digest, - ) -> Option> { - self.inner - .as_ref() - .map(|buffer| buffer.subscribe_by_digest(digest)) + async fn find_by_commitment(&self, _: V::Commitment) -> Option { + None } - /// Subscribe to a block's availability by its commitment, if a buffer exists. - pub(super) fn subscribe_by_commitment( + fn subscribe_by_digest( &self, - commitment: V::Commitment, + _: ::Digest, ) -> Option> { - self.inner - .as_ref() - .map(|buffer| buffer.subscribe_by_commitment(commitment)) + None } - /// Notify the buffer that a block has been finalized, if a buffer exists. - pub(super) fn finalized(&self, commitment: V::Commitment) { - if let Some(buffer) = &self.inner { - buffer.finalized(commitment); - } + fn subscribe_by_commitment(&self, _: V::Commitment) -> Option> { + None } - /// Send a block to peers, if a buffer exists. - pub(super) fn send( - &self, - round: Round, - block: V::Block, - recipients: Recipients, - ) { - if let Some(buffer) = &self.inner { - buffer.send(round, block, recipients); - } - } + fn finalized(&self, _: V::Commitment) {} + + fn send(&self, _: Round, _: V::Block, _: Recipients) {} } diff --git a/consensus/src/marshal/mocks/harness.rs b/consensus/src/marshal/mocks/harness.rs index 40b7491c5c..4146ba8b3f 100644 --- a/consensus/src/marshal/mocks/harness.rs +++ b/consensus/src/marshal/mocks/harness.rs @@ -1762,7 +1762,7 @@ impl TestHarness for StandardHarness { config, ) .await; - let actor_handle = actor.start(application.clone(), Some(buffer.clone()), resolver); + let actor_handle = actor.start(application.clone(), buffer.clone(), resolver); ValidatorSetup { application, @@ -1955,7 +1955,7 @@ impl TestHarness for StandardHarness { ) .await; let application = Application::::default(); - actor.start(application.clone(), Some(buffer.clone()), resolver); + actor.start(application.clone(), buffer.clone(), resolver); (mailbox, buffer, application) } @@ -2585,7 +2585,7 @@ impl TestHarness for CodingHarness { config, ) .await; - let actor_handle = actor.start(application.clone(), Some(shard_mailbox.clone()), resolver); + let actor_handle = actor.start(application.clone(), shard_mailbox.clone(), resolver); ValidatorSetup { application, @@ -2821,7 +2821,7 @@ impl TestHarness for CodingHarness { ) .await; let application = Application::::default(); - actor.start(application.clone(), Some(shard_mailbox.clone()), resolver); + actor.start(application.clone(), shard_mailbox.clone(), resolver); (mailbox, shard_mailbox, application) } diff --git a/consensus/src/marshal/standard/mod.rs b/consensus/src/marshal/standard/mod.rs index 125f9715ab..53bcf3c88b 100644 --- a/consensus/src/marshal/standard/mod.rs +++ b/consensus/src/marshal/standard/mod.rs @@ -2559,16 +2559,16 @@ mod tests { .cloned() } - fn subscribe_by_digest(&self, _digest: D) -> oneshot::Receiver { + fn subscribe_by_digest(&self, _digest: D) -> Option> { let (sender, receiver) = oneshot::channel(); self.digest_subscriptions.lock().push(sender); - receiver + Some(receiver) } - fn subscribe_by_commitment(&self, _commitment: D) -> oneshot::Receiver { + fn subscribe_by_commitment(&self, _commitment: D) -> Option> { let (sender, receiver) = oneshot::channel(); self.commitment_subscriptions.lock().push(sender); - receiver + Some(receiver) } fn finalized(&self, _commitment: D) {} @@ -2951,8 +2951,11 @@ mod tests { ) .await; let (resolver_rx, resolver) = RecordingResolver::holding(context.child("mailbox")); - let actor_handle = - actor.start(application, buffer.clone(), (resolver_rx, resolver.clone())); + let actor_handle = if let Some(buffer) = buffer.clone() { + actor.start(application, buffer, (resolver_rx, resolver.clone())) + } else { + actor.start_unbuffered(application, (resolver_rx, resolver.clone())) + }; (mailbox, buffer, resolver, actor_handle) } @@ -5295,7 +5298,7 @@ mod tests { .await; actor.start( Application::::default(), - Some(buffer), + buffer, ( handler::Receiver::new(resolver_rx), RecordingResolver::default(), diff --git a/consensus/src/marshal/standard/variant.rs b/consensus/src/marshal/standard/variant.rs index b0befb4ffb..09d19e5190 100644 --- a/consensus/src/marshal/standard/variant.rs +++ b/consensus/src/marshal/standard/variant.rs @@ -75,13 +75,13 @@ where self.find_by_digest(commitment).await } - fn subscribe_by_digest(&self, digest: B::Digest) -> oneshot::Receiver { + fn subscribe_by_digest(&self, digest: B::Digest) -> Option> { let (tx, rx) = oneshot::channel(); self.subscribe_prepared(digest, tx); - rx + Some(rx) } - fn subscribe_by_commitment(&self, commitment: B::Digest) -> oneshot::Receiver { + fn subscribe_by_commitment(&self, commitment: B::Digest) -> Option> { self.subscribe_by_digest(commitment) } diff --git a/examples/reshare/src/engine.rs b/examples/reshare/src/engine.rs index efb985ef2c..2d6225e5a1 100644 --- a/examples/reshare/src/engine.rs +++ b/examples/reshare/src/engine.rs @@ -404,9 +404,9 @@ where callback, ); let buffer_handle = self.buffer.start(broadcast); - let marshal_handle = - self.marshal - .start(self.dkg_mailbox, Some(self.buffered_mailbox), marshal); + let marshal_handle = self + .marshal + .start(self.dkg_mailbox, self.buffered_mailbox, marshal); let orchestrator_handle = self.orchestrator.start(votes, certificates, resolver); if let Err(e) = try_join_all(vec![