Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 18 additions & 38 deletions consensus/src/marshal/ancestry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,45 +23,24 @@ where
{
}

/// An interface for providing blocks.
/// An interface for providing parent blocks.
pub trait BlockProvider: Clone + Send + 'static {
/// The block type the provider walks.
type Block: Block;

/// Subscribe to a block by its digest without requesting it from the network.
/// Subscribe to the parent of a known block.
///
/// If the block is found available locally, the block will be returned immediately.
/// If the parent is found available locally, the parent will be returned immediately.
///
/// If the block is not available locally, the subscription will be registered and the caller
/// will be notified when the block is available. If the block is not finalized, it's possible
/// If the parent is not available locally, the subscription will be registered and the caller
/// will be notified when the parent is available. If the parent is not finalized, it's possible
/// that it may never become available.
///
/// Returns `None` when the subscription is canceled or the provider can no longer deliver
/// the block.
/// the parent.
///
/// This is intentionally narrower than [`Self::subscribe_parent`]. A digest is enough to
/// identify a block in local storage, but it may not be enough to form the network request
/// used by a marshal variant. Variants whose consensus commitment contains extra context
/// should keep that logic in [`Self::subscribe_parent`], where the known child block is
/// still available.
fn subscribe(
self,
digest: <Self::Block as Digestible>::Digest,
) -> impl Future<Output = Option<Self::Block>> + Send;

/// Subscribe to the parent of a known block.
///
/// This is a separate hook from [`Self::subscribe`] because the child block can carry
/// variant-specific context needed to retrieve its parent. The default implementation
/// follows the digest link and waits locally, but providers may override this to derive a
/// full parent commitment and issue a fetching subscription.
fn subscribe_parent(
self,
block: Self::Block,
) -> impl Future<Output = Option<Self::Block>> + Send {
let digest = block.parent();
self.subscribe(digest)
}
/// The child block can carry variant-specific context needed to retrieve its parent.
fn subscribe(self, block: Self::Block) -> impl Future<Output = Option<Self::Block>> + Send;
}

/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
Expand Down Expand Up @@ -124,10 +103,10 @@ where
// If a result has been buffered, return it and queue the parent fetch if needed.
if let Some(block) = this.buffered.pop() {
let height = block.height();
let should_subscribe_parent = height > END_BOUND;
let should_walk_parent = height > END_BOUND;
let end_of_buffered = this.buffered.is_empty();
if should_subscribe_parent && end_of_buffered {
let future = this.marshal.clone().subscribe_parent(block.clone()).boxed();
if should_walk_parent && end_of_buffered {
let future = this.marshal.clone().subscribe(block.clone()).boxed();
*this.pending.as_mut() = Some(future).into();

// Explicitly poll the next future to kick off the fetch. If it's already ready,
Expand All @@ -141,7 +120,7 @@ where
}
Poll::Ready(None) | Poll::Pending => {}
}
} else if !should_subscribe_parent {
} else if !should_walk_parent {
// No more parents to fetch; Finish the stream.
*this.pending.as_mut() = None.into();
}
Expand All @@ -157,9 +136,9 @@ where
}
Poll::Ready(Some(Some(block))) => {
let height = block.height();
let should_subscribe_parent = height > END_BOUND;
if should_subscribe_parent {
let future = this.marshal.clone().subscribe_parent(block.clone()).boxed();
let should_walk_parent = height > END_BOUND;
if should_walk_parent {
let future = this.marshal.clone().subscribe(block.clone()).boxed();
*this.pending.as_mut() = Some(future).into();

// Explicitly poll the next future to kick off the fetch. If it's already ready,
Expand Down Expand Up @@ -197,8 +176,9 @@ mod test {
impl BlockProvider for MockProvider {
type Block = Block<Sha256Digest, ()>;

async fn subscribe(self, digest: Sha256Digest) -> Option<Self::Block> {
self.0.into_iter().find(|b| b.digest() == digest)
async fn subscribe(self, block: Self::Block) -> Option<Self::Block> {
let parent = block.parent;
self.0.into_iter().find(|b| b.digest() == parent)
}
}

Expand Down
4 changes: 2 additions & 2 deletions consensus/src/marshal/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,8 @@ mod tests {
};
let child = make_coding_block(child_ctx, parent.digest(), Height::new(2), 200);
let subscription = context
.child("subscribe_parent")
.spawn(move |_| BlockProvider::subscribe_parent(marshal, child));
.child("subscribe")
.spawn(move |_| BlockProvider::subscribe(marshal, child));

context.sleep(Duration::from_millis(100)).await;
assert_eq!(
Expand Down
11 changes: 2 additions & 9 deletions consensus/src/marshal/coding/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
shards,
types::{CodedBlock, CodedBlockCfg, StoredCodedBlock},
},
core::{Buffer, CommitmentFallback, DigestFallback, Mailbox, Variant},
core::{Buffer, CommitmentFallback, Mailbox, Variant},
},
simplex::types::Context,
types::{coding::Commitment, Round},
Expand Down Expand Up @@ -125,14 +125,7 @@ where
{
type Block = B;

async fn subscribe(self, digest: B::Digest) -> Option<Self::Block> {
self.subscribe_by_digest(digest, DigestFallback::Wait)
.await
.ok()
.map(<Coding<B, C, H, P> as Variant>::into_inner)
}

async fn subscribe_parent(self, block: Self::Block) -> Option<Self::Block> {
async fn subscribe(self, block: Self::Block) -> Option<Self::Block> {
let parent_height = block.height().previous()?;
let commitment = block.context().parent.1;
self.subscribe_by_commitment(
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/marshal/standard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ mod tests {
let parent = make_raw_block(Sha256::hash(b""), Height::new(1), 100);
let child = make_raw_block(parent.digest(), Height::new(2), 200);
let subscription = context
.child("subscribe_parent")
.spawn(move |_| BlockProvider::subscribe_parent(mailbox, child));
.child("subscribe")
.spawn(move |_| BlockProvider::subscribe(mailbox, child));

context.sleep(Duration::from_millis(100)).await;
assert_eq!(
Expand Down
10 changes: 2 additions & 8 deletions consensus/src/marshal/standard/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use crate::{
marshal::{
ancestry::BlockProvider,
core::{Buffer, CommitmentFallback, DigestFallback, Mailbox, Variant},
core::{Buffer, CommitmentFallback, Mailbox, Variant},
},
types::Round,
Block,
Expand Down Expand Up @@ -100,13 +100,7 @@ where
{
type Block = B;

async fn subscribe(self, digest: B::Digest) -> Option<Self::Block> {
self.subscribe_by_digest(digest, DigestFallback::Wait)
.await
.ok()
}

async fn subscribe_parent(self, block: Self::Block) -> Option<Self::Block> {
async fn subscribe(self, block: Self::Block) -> Option<Self::Block> {
let parent_height = block.height().previous()?;
self.subscribe_by_commitment(
block.parent(),
Expand Down
Loading