Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions consensus/src/marshal/ancestry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::{
};
use pin_project::pin_project;
use std::{
collections::VecDeque,
future::Future,
pin::Pin,
sync::Arc,
Expand All @@ -22,6 +23,39 @@ pub trait Ancestry<B: Block>: Stream<Item = B> + Send + Unpin + 'static {
fn peek(&self) -> Option<&B>;
}

/// Creates an ancestry stream from a fixed sequence of blocks.
///
/// Blocks are yielded in iterator order and no parent fetching is performed. This is useful when
/// the caller already has the complete ancestry needed by an application.
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
pub fn from_iter<B>(blocks: impl IntoIterator<Item = B>) -> impl Ancestry<B>
where
B: Block,
{
BoundedAncestry {
blocks: blocks.into_iter().collect(),
}
}

struct BoundedAncestry<B: Block> {
blocks: VecDeque<B>,
}

impl<B: Block> Unpin for BoundedAncestry<B> {}

impl<B: Block> Ancestry<B> for BoundedAncestry<B> {
fn peek(&self) -> Option<&B> {
self.blocks.front()
}
}

impl<B: Block> Stream for BoundedAncestry<B> {
type Item = B;

fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.blocks.pop_front())
}
}

/// An interface for providing parent blocks.
pub trait BlockProvider: Send + 'static {
/// The block type the provider walks.
Expand Down Expand Up @@ -384,6 +418,34 @@ mod test {
});
}

#[test]
fn test_from_iter_available_through_ancestry_trait() {
fn peek_height(ancestry: impl Ancestry<Block<Sha256Digest, ()>>) -> Option<Height> {
ancestry.peek().map(Heightable::height)
}

let block = Block::new::<Sha256>((), Sha256Digest::EMPTY, Height::new(1), 1);
let ancestry = from_iter([block.clone()]);

assert_eq!(peek_height(ancestry), Some(block.height()));
}

#[test]
fn test_from_iter_yields_blocks_in_order_and_peeks_next() {
deterministic::Runner::default().start(|_| async move {
let parent = Block::new::<Sha256>((), Sha256Digest::EMPTY, Height::new(1), 1);
let child = Block::new::<Sha256>((), parent.digest(), Height::new(2), 2);
let mut ancestry = from_iter([child.clone(), parent.clone()]);

assert_eq!(ancestry.peek(), Some(&child));
assert_eq!(ancestry.next().await, Some(child));
assert_eq!(ancestry.peek(), Some(&parent));
assert_eq!(ancestry.next().await, Some(parent));
assert_eq!(ancestry.peek(), None);
assert_eq!(ancestry.next().await, None);
});
}

#[test]
fn test_yields_genesis_and_stops() {
deterministic::Runner::default().start(|context| async move {
Expand Down
8 changes: 4 additions & 4 deletions consensus/src/marshal/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,19 @@ mod tests {
None
}

fn subscribe_by_digest(&self, _digest: D) -> oneshot::Receiver<TestCodedBlock> {
fn subscribe_by_digest(&self, _digest: D) -> Option<oneshot::Receiver<TestCodedBlock>> {
let (sender, receiver) = oneshot::channel();
self.digest_subscriptions.lock().push(sender);
receiver
Some(receiver)
}

fn subscribe_by_commitment(
&self,
_commitment: Commitment,
) -> oneshot::Receiver<TestCodedBlock> {
) -> Option<oneshot::Receiver<TestCodedBlock>> {
let (sender, receiver) = oneshot::channel();
self.commitment_subscriptions.lock().push(sender);
receiver
Some(receiver)
}

fn finalized(&self, _commitment: Commitment) {}
Expand Down
8 changes: 4 additions & 4 deletions consensus/src/marshal/coding/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ where
fn subscribe_by_digest(
&self,
digest: <CodedBlock<B, C, H> as Digestible>::Digest,
) -> oneshot::Receiver<CodedBlock<B, C, H>> {
self.subscribe_by_digest(digest)
) -> Option<oneshot::Receiver<CodedBlock<B, C, H>>> {
Some(self.subscribe_by_digest(digest))
}

fn subscribe_by_commitment(
&self,
commitment: Commitment,
) -> oneshot::Receiver<CodedBlock<B, C, H>> {
self.subscribe(commitment)
) -> Option<oneshot::Receiver<CodedBlock<B, C, H>>> {
Some(self.subscribe(commitment))
}

fn finalized(&self, commitment: Commitment) {
Expand Down
30 changes: 14 additions & 16 deletions consensus/src/marshal/core/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use super::{
mailbox::{CommitmentFallback, Mailbox, Message},
stream::Stream,
subscriptions::{Key as SubscriptionKey, KeyFor as SubscriptionKeyFor, Subscriptions},
variant::OptionalBuffer,
Buffer, Variant,
};
use crate::{
Expand Down Expand Up @@ -296,7 +295,7 @@ where
pub fn start<R, Buf>(
mut self,
application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
buffer: Option<Buf>,
buffer: Buf,
resolver: (handler::Receiver<V::Commitment>, R),
) -> Handle<()>
where
Expand All @@ -307,15 +306,14 @@ where
>,
Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
{
let buffer = OptionalBuffer::new(buffer);
spawn_cell!(self.context, self.run(application, buffer, resolver))
}

/// Run the application actor.
async fn run<R, Buf>(
mut self,
mut application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
mut buffer: OptionalBuffer<V, Buf>,
mut buffer: Buf,
(mut resolver_rx, mut resolver): (handler::Receiver<V::Commitment>, R),
) where
R: Resolver<
Expand Down Expand Up @@ -436,7 +434,7 @@ where
&mut self,
result: <A::Waiter as Future>::Output,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
buffer: &mut OptionalBuffer<V, Buf>,
buffer: &mut Buf,
resolver: &mut R,
) where
Buf: Buffer<V>,
Expand Down Expand Up @@ -491,7 +489,7 @@ where
message: Message<P::Scheme, V>,
resolver: &mut R,
waiters: &mut AbortablePool<Result<V::Block, SubscriptionKeyFor<V>>>,
buffer: &mut OptionalBuffer<V, Buf>,
buffer: &mut Buf,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
) where
Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
Expand Down Expand Up @@ -764,7 +762,7 @@ where
message: handler::Message<V::Commitment>,
resolver_rx: &mut handler::Receiver<V::Commitment>,
resolver: &mut R,
buffer: &mut OptionalBuffer<V, Buf>,
buffer: &mut Buf,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
) where
Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
Expand Down Expand Up @@ -850,7 +848,7 @@ where
&self,
key: ResolverRequestFor<V>,
response: oneshot::Sender<Bytes>,
buffer: &OptionalBuffer<V, Buf>,
buffer: &Buf,
) {
match key {
Key::Block(commitment) => {
Expand Down Expand Up @@ -898,7 +896,7 @@ where
PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
>,
waiters: &mut AbortablePool<Result<V::Block, SubscriptionKeyFor<V>>>,
buffer: &mut OptionalBuffer<V, Buf>,
buffer: &mut Buf,
) {
let digest = match key {
SubscriptionKey::Digest(digest) => digest,
Expand Down Expand Up @@ -985,7 +983,7 @@ where
finalization: Finalization<P::Scheme, V::Commitment>,
skip_if_superseded: bool,
resolver: &mut R,
buffer: &mut OptionalBuffer<V, Buf>,
buffer: &mut Buf,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
) where
Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
Expand Down Expand Up @@ -1052,7 +1050,7 @@ where
async fn apply_floor_anchor<Buf: Buffer<V>>(
&mut self,
block: &V::Block,
buffer: &mut OptionalBuffer<V, Buf>,
buffer: &mut Buf,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
resolver: &mut impl Resolver<
Key = ResolverRequestFor<V>,
Expand Down Expand Up @@ -1171,7 +1169,7 @@ where
&mut self,
message: ResolverDelivery<V>,
delivers: &mut Vec<PendingVerification<P::Scheme, V>>,
buffer: &mut OptionalBuffer<V, Buf>,
buffer: &mut Buf,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
resolver: &mut impl Resolver<
Key = ResolverRequestFor<V>,
Expand Down Expand Up @@ -1349,7 +1347,7 @@ where
async fn verify_delivered<Buf: Buffer<V>>(
&mut self,
mut delivers: Vec<PendingVerification<P::Scheme, V>>,
buffer: &mut OptionalBuffer<V, Buf>,
buffer: &mut Buf,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
resolver: &mut impl Resolver<
Key = ResolverRequestFor<V>,
Expand Down Expand Up @@ -1848,7 +1846,7 @@ where
/// parent links).
async fn find_block_by_digest<Buf: Buffer<V>>(
&self,
buffer: &OptionalBuffer<V, Buf>,
buffer: &Buf,
digest: <V::Block as Digestible>::Digest,
) -> Option<V::Block> {
if let Some(block) = buffer.find_by_digest(digest).await {
Expand All @@ -1863,7 +1861,7 @@ where
/// Having the full commitment may enable additional retrieval mechanisms.
async fn find_block_by_commitment<Buf: Buffer<V>>(
&self,
buffer: &OptionalBuffer<V, Buf>,
buffer: &Buf,
commitment: V::Commitment,
) -> Option<V::Block> {
if let Some(block) = buffer.find_by_commitment(commitment).await {
Expand All @@ -1888,7 +1886,7 @@ where
/// needs a subsequent [`sync_finalized`](Self::sync_finalized).
async fn try_repair_gaps<Buf: Buffer<V>>(
&mut self,
buffer: &mut OptionalBuffer<V, Buf>,
buffer: &mut Buf,
resolver: &mut impl Resolver<
Key = ResolverRequestFor<V>,
Subscriber = Annotation,
Expand Down
29 changes: 12 additions & 17 deletions consensus/src/marshal/core/subscriptions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{variant::OptionalBuffer, Buffer, Variant};
use super::{Buffer, Variant};
use commonware_cryptography::Digestible;
use commonware_utils::{
channel::{fallible::OneshotExt, oneshot},
Expand Down Expand Up @@ -71,7 +71,7 @@ impl<V: Variant> Subscriptions<V> {
key: KeyFor<V>,
response: oneshot::Sender<V::Block>,
waiters: &mut AbortablePool<Result<V::Block, KeyFor<V>>>,
buffer: &OptionalBuffer<V, Buf>,
buffer: &Buf,
) {
match self.entries.entry(key) {
Entry::Occupied(mut entry) => {
Expand Down Expand Up @@ -147,16 +147,16 @@ mod tests {
None
}

fn subscribe_by_digest(&self, _digest: Digest) -> oneshot::Receiver<TestBlock> {
fn subscribe_by_digest(&self, _digest: Digest) -> Option<oneshot::Receiver<TestBlock>> {
let (sender, receiver) = oneshot::channel();
self.digest_subscribers.lock().push(sender);
receiver
Some(receiver)
}

fn subscribe_by_commitment(&self, _commitment: Digest) -> oneshot::Receiver<TestBlock> {
fn subscribe_by_commitment(&self, _commitment: Digest) -> Option<oneshot::Receiver<TestBlock>> {
let (sender, receiver) = oneshot::channel();
self.commitment_subscribers.lock().push(sender);
receiver
Some(receiver)
}

fn finalized(&self, _commitment: Digest) {}
Expand All @@ -179,7 +179,7 @@ mod tests {
#[test]
fn insert_coalesces_duplicate_keys() {
let test_buffer = TestBuffer::default();
let buffer = OptionalBuffer::new(Some(test_buffer.clone()));
let buffer = Some(test_buffer.clone());
let mut waiters = TestWaiters::default();
let mut subscriptions = Subscriptions::<TestVariant>::new();
let block = block(1, 10);
Expand Down Expand Up @@ -211,7 +211,7 @@ mod tests {
#[test]
fn notify_wakes_digest_and_commitment_subscribers() {
let test_buffer = TestBuffer::default();
let buffer = OptionalBuffer::new(Some(test_buffer.clone()));
let buffer = Some(test_buffer.clone());
let mut waiters = TestWaiters::default();
let mut subscriptions = Subscriptions::<TestVariant>::new();
let block = block(2, 20);
Expand Down Expand Up @@ -243,7 +243,7 @@ mod tests {

#[test]
fn retain_open_drops_closed_subscribers_and_keeps_open_ones() {
let buffer = OptionalBuffer::new(Some(TestBuffer::default()));
let buffer = Some(TestBuffer::default());
let mut waiters = TestWaiters::default();
let mut subscriptions = Subscriptions::<TestVariant>::new();
let block = block(3, 30);
Expand Down Expand Up @@ -279,7 +279,7 @@ mod tests {
#[test]
fn remove_drops_waiter_and_aborts_buffer_waiter() {
deterministic::Runner::default().start(|context| async move {
let buffer = OptionalBuffer::new(Some(TestBuffer::default()));
let buffer = Some(TestBuffer::default());
let mut waiters = TestWaiters::default();
let mut subscriptions = Subscriptions::<TestVariant>::new();
let block = block(4, 40);
Expand Down Expand Up @@ -307,16 +307,11 @@ mod tests {
fn insert_without_buffer_keeps_local_subscriber() {
let mut waiters = TestWaiters::default();
let mut subscriptions = Subscriptions::<TestVariant>::new();
let buffer = OptionalBuffer::<TestVariant, TestBuffer>::new(None);
let buffer: Option<TestBuffer> = None;
let block = block(5, 50);

let (sender, receiver) = oneshot::channel();
subscriptions.insert::<TestBuffer>(
Key::Digest(block.digest()),
sender,
&mut waiters,
&buffer,
);
subscriptions.insert(Key::Digest(block.digest()), sender, &mut waiters, &buffer);

assert_eq!(subscriptions.entries.len(), 1);
subscriptions.notify(&block);
Expand Down
Loading
Loading