Skip to content

Commit 7d3329c

Browse files
committed
[consensus/marshal] commonware-glue marshal changes
1 parent 14a016a commit 7d3329c

7 files changed

Lines changed: 470 additions & 86 deletions

File tree

consensus/src/marshal/ancestry.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use pin_project::pin_project;
1010
use std::{
1111
future::Future,
1212
pin::Pin,
13+
sync::Arc,
1314
task::{Context, Poll},
1415
};
1516

@@ -54,6 +55,57 @@ pub trait BlockProvider: Clone + Send + 'static {
5455
}
5556
}
5657

58+
/// A type-erased [`BlockProvider`] that wraps any concrete provider behind
59+
/// a shared closure.
60+
///
61+
/// Used by actor mailboxes and other channel-based patterns where a generic
62+
/// `BlockProvider` type parameter must be eliminated before sending across
63+
/// a channel.
64+
#[derive(Clone)]
65+
pub struct ErasedBlockProvider<B: Block> {
66+
subscribe:
67+
Arc<dyn Fn(<B as Digestible>::Digest) -> BoxFuture<'static, Option<B>> + Send + Sync>,
68+
subscribe_parent: Arc<dyn Fn(B) -> BoxFuture<'static, Option<B>> + Send + Sync>,
69+
}
70+
71+
impl<B: Block> ErasedBlockProvider<B> {
72+
/// Erase a concrete [`BlockProvider`] into a type-erased provider.
73+
pub fn new<M: BlockProvider<Block = B> + Sync>(provider: M) -> Self {
74+
let provider = Arc::new(provider);
75+
Self {
76+
subscribe: Arc::new({
77+
let provider = provider.clone();
78+
move |digest| {
79+
let p = provider.as_ref().clone();
80+
Box::pin(p.subscribe(digest))
81+
}
82+
}),
83+
subscribe_parent: Arc::new(move |block| {
84+
let p = provider.as_ref().clone();
85+
Box::pin(p.subscribe_parent(block))
86+
}),
87+
}
88+
}
89+
}
90+
91+
impl<B: Block> BlockProvider for ErasedBlockProvider<B> {
92+
type Block = B;
93+
94+
fn subscribe(
95+
self,
96+
digest: <B as Digestible>::Digest,
97+
) -> impl Future<Output = Option<B>> + Send {
98+
(self.subscribe)(digest)
99+
}
100+
101+
fn subscribe_parent(
102+
self,
103+
block: Self::Block,
104+
) -> impl Future<Output = Option<Self::Block>> + Send {
105+
(self.subscribe_parent)(block)
106+
}
107+
}
108+
57109
/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
58110
///
59111
// TODO(<https://github.com/commonwarexyz/monorepo/issues/2982>): Once marshal can also yield the genesis block,
@@ -67,12 +119,37 @@ pub struct AncestorStream<M: BlockProvider> {
67119
}
68120

69121
impl<M: BlockProvider> AncestorStream<M> {
122+
/// Returns a reference to the next block that will be yielded by the
123+
/// stream, without consuming it.
124+
///
125+
/// Returns `None` if the buffer is empty and the next block is being
126+
/// fetched asynchronously.
127+
pub fn peek(&self) -> Option<&M::Block> {
128+
self.buffered.last()
129+
}
130+
131+
/// Erase the block provider type, producing an
132+
/// `AncestorStream<ErasedBlockProvider<M::Block>>`.
133+
///
134+
/// The returned stream behaves identically but can be sent across
135+
/// channels that require a concrete type.
136+
pub fn erase(self) -> AncestorStream<ErasedBlockProvider<M::Block>>
137+
where
138+
M: Sync,
139+
{
140+
AncestorStream {
141+
buffered: self.buffered,
142+
marshal: ErasedBlockProvider::new(self.marshal),
143+
pending: self.pending,
144+
}
145+
}
146+
70147
/// Creates a new [AncestorStream] starting from the given ancestry.
71148
///
72149
/// # Panics
73150
///
74151
/// Panics if the initial blocks are not contiguous in height.
75-
pub(crate) fn new(marshal: M, initial: impl IntoIterator<Item = M::Block>) -> Self {
152+
pub fn new(marshal: M, initial: impl IntoIterator<Item = M::Block>) -> Self {
76153
let mut buffered = initial.into_iter().collect::<Vec<M::Block>>();
77154
buffered.sort_by_key(Heightable::height);
78155

consensus/src/marshal/coding/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,11 @@ mod tests {
744744
harness::prune_finalized_archives::<CodingHarness>();
745745
}
746746

747+
#[test_traced("WARN")]
748+
fn test_coding_set_floor_without_pruning_preserves_archives() {
749+
harness::set_floor_without_pruning_preserves_archives::<CodingHarness>();
750+
}
751+
747752
#[test_traced("WARN")]
748753
fn test_coding_rejects_block_delivery_below_floor() {
749754
harness::reject_stale_block_delivery_after_floor_update::<CodingHarness>();

consensus/src/marshal/core/actor.rs

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -748,17 +748,52 @@ where
748748
let finalization = self.get_finalization_by_height(height).await;
749749
response.send_lossy(finalization);
750750
}
751+
Message::GetProcessedHeight { response } => {
752+
response.send_lossy(self.floor.height);
753+
}
751754
Message::HintFinalized { height, targets } => {
752-
// Skip if finalization is already available locally
753-
if self.get_finalization_by_height(height).await.is_some() {
755+
// Skip if height is below the floor
756+
if height < self.floor.height {
754757
continue;
755758
}
756759

757-
self.floor.fetch_targeted_if_permitted(
758-
&mut resolver,
759-
Request::finalized(height),
760-
targets,
761-
);
760+
// Skip if both the finalization and the block are
761+
// already available locally. Both are checked because
762+
// `set_floor` can advance the processed height to
763+
// a height whose block was never finalized locally.
764+
if self.get_finalization_by_height(height).await.is_some()
765+
&& self.get_finalized_block(height).await.is_some()
766+
{
767+
continue;
768+
}
769+
770+
match targets {
771+
Recipients::All => {
772+
self.floor.fetch_if_permitted(
773+
&mut resolver,
774+
Request::<V::Commitment>::finalized(height),
775+
);
776+
}
777+
Recipients::Some(peers) => {
778+
if peers.is_empty() {
779+
continue;
780+
}
781+
let peers = NonEmptyVec::from_unchecked(peers);
782+
self.floor.fetch_targeted_if_permitted(
783+
&mut resolver,
784+
Request::<V::Commitment>::finalized(height),
785+
peers,
786+
);
787+
}
788+
Recipients::One(peer) => {
789+
let peers = NonEmptyVec::new(peer);
790+
self.floor.fetch_targeted_if_permitted(
791+
&mut resolver,
792+
Request::<V::Commitment>::finalized(height),
793+
peers,
794+
);
795+
}
796+
}
762797
}
763798
Message::SubscribeByDigest {
764799
digest,
@@ -799,7 +834,10 @@ where
799834
)
800835
.await;
801836
}
802-
Message::SetFloor { height } => {
837+
Message::SetFloor {
838+
height,
839+
prune_archives,
840+
} => {
803841
if self.floor.height >= height {
804842
warn!(
805843
%height,
@@ -822,10 +860,11 @@ where
822860
// updating `floor.height`.
823861
self.pending_acks.clear();
824862

825-
// The floor is durable, so cache/finalized data below it can be pruned.
826-
if let Err(err) = self.prune_after_floor(height).await {
827-
error!(?err, %height, "failed to prune data below floor");
828-
return;
863+
if prune_archives {
864+
if let Err(err) = self.prune_after_floor(height).await {
865+
error!(?err, %height, "failed to prune data below floor");
866+
return;
867+
}
829868
}
830869

831870
// Intentionally keep existing block subscriptions alive. Canceling
@@ -1638,15 +1677,19 @@ where
16381677
finalization: Option<Finalization<P::Scheme, V::Commitment>>,
16391678
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
16401679
) -> bool {
1641-
// Blocks below the last processed height are not useful to us, so we ignore them (this
1642-
// has the nice byproduct of ensuring we don't call a backing store with a block below the
1643-
// pruning boundary)
1644-
if height <= self.floor.height {
1680+
// Blocks below the last processed height are not useful to us, so we
1681+
// ignore them (this has the nice byproduct of ensuring we don't call
1682+
// a backing store with a block below the pruning boundary).
1683+
//
1684+
// Blocks at exactly the processed height are allowed, however. After
1685+
// a `set_floor`, it may be possible that the floor was set to a height
1686+
// that marshal doesn't have.
1687+
if height < self.floor.height {
16451688
debug!(
16461689
%height,
16471690
floor = %self.floor.height,
16481691
?digest,
1649-
"dropping finalization at or below processed height floor"
1692+
"dropping finalization below processed height floor"
16501693
);
16511694
return false;
16521695
}

0 commit comments

Comments
 (0)