Skip to content

Commit 6ed2e53

Browse files
committed
[consensus/marshal] commonware-glue marshal changes
1 parent 1c65a16 commit 6ed2e53

6 files changed

Lines changed: 268 additions & 35 deletions

File tree

consensus/src/marshal/ancestry.rs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use crate::{types::Height, Block, Heightable};
44
use commonware_cryptography::Digestible;
5+
use commonware_utils::sync::Mutex;
56
use futures::{
67
future::{BoxFuture, OptionFuture},
78
FutureExt, Stream,
@@ -10,6 +11,7 @@ use pin_project::pin_project;
1011
use std::{
1112
future::Future,
1213
pin::Pin,
14+
sync::Arc,
1315
task::{Context, Poll},
1416
};
1517

@@ -34,6 +36,45 @@ pub trait BlockProvider: Clone + Send + 'static {
3436
) -> impl Future<Output = Option<Self::Block>> + Send;
3537
}
3638

39+
/// A type-erased [`BlockProvider`] that wraps any concrete provider behind
40+
/// a shared closure.
41+
///
42+
/// Used by actor mailboxes and other channel-based patterns where a generic
43+
/// `BlockProvider` type parameter must be eliminated before sending across
44+
/// a channel.
45+
#[derive(Clone)]
46+
pub struct ErasedBlockProvider<B: Block> {
47+
fetch: Arc<dyn Fn(<B as Digestible>::Digest) -> BoxFuture<'static, Option<B>> + Send + Sync>,
48+
}
49+
50+
impl<B: Block> ErasedBlockProvider<B> {
51+
/// Erase a concrete [`BlockProvider`] into a type-erased provider.
52+
///
53+
/// The provider is wrapped in a `Mutex` so that the resulting closure
54+
/// is `Sync` (required by `Arc<dyn Fn + Send + Sync>`). The lock is
55+
/// held only for the duration of `clone()`, never across an await.
56+
pub fn new<M: BlockProvider<Block = B>>(provider: M) -> Self {
57+
let provider = Arc::new(Mutex::new(provider));
58+
Self {
59+
fetch: Arc::new(move |digest| {
60+
let p = provider.lock().clone();
61+
Box::pin(async move { p.fetch_block(digest).await })
62+
}),
63+
}
64+
}
65+
}
66+
67+
impl<B: Block> BlockProvider for ErasedBlockProvider<B> {
68+
type Block = B;
69+
70+
fn fetch_block(
71+
self,
72+
digest: <B as Digestible>::Digest,
73+
) -> impl Future<Output = Option<B>> + Send {
74+
(self.fetch)(digest)
75+
}
76+
}
77+
3778
/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
3879
///
3980
// TODO(<https://github.com/commonwarexyz/monorepo/issues/2982>): Once marshal can also yield the genesis block,
@@ -47,12 +88,37 @@ pub struct AncestorStream<M, B: Block> {
4788
}
4889

4990
impl<M, B: Block> AncestorStream<M, B> {
91+
/// Returns a reference to the next block that will be yielded by the
92+
/// stream, without consuming it.
93+
///
94+
/// Returns `None` if the buffer is empty and the next block is being
95+
/// fetched asynchronously.
96+
pub fn peek(&self) -> Option<&B> {
97+
self.buffered.last()
98+
}
99+
100+
/// Erase the block provider type, producing an
101+
/// `AncestorStream<ErasedBlockProvider<B>, B>`.
102+
///
103+
/// The returned stream behaves identically but can be sent across
104+
/// channels that require a concrete type.
105+
pub fn erase(self) -> AncestorStream<ErasedBlockProvider<B>, B>
106+
where
107+
M: BlockProvider<Block = B>,
108+
{
109+
AncestorStream {
110+
buffered: self.buffered,
111+
marshal: ErasedBlockProvider::new(self.marshal),
112+
pending: self.pending,
113+
}
114+
}
115+
50116
/// Creates a new [AncestorStream] starting from the given ancestry.
51117
///
52118
/// # Panics
53119
///
54120
/// Panics if the initial blocks are not contiguous in height.
55-
pub(crate) fn new(marshal: M, initial: impl IntoIterator<Item = B>) -> Self {
121+
pub fn new(marshal: M, initial: impl IntoIterator<Item = B>) -> Self {
56122
let mut buffered = initial.into_iter().collect::<Vec<B>>();
57123
buffered.sort_by_key(Heightable::height);
58124

consensus/src/marshal/coding/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ mod tests {
197197
harness::prune_finalized_archives::<CodingHarness>();
198198
}
199199

200+
#[test_traced("WARN")]
201+
fn test_coding_set_floor_without_pruning_preserves_archives() {
202+
harness::set_floor_without_pruning_preserves_archives::<CodingHarness>();
203+
}
204+
200205
#[test_traced("WARN")]
201206
fn test_coding_rejects_block_delivery_below_floor() {
202207
harness::reject_stale_block_delivery_after_floor_update::<CodingHarness>();

consensus/src/marshal/core/actor.rs

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -659,20 +659,35 @@ where
659659
let finalization = self.get_finalization_by_height(height).await;
660660
response.send_lossy(finalization);
661661
}
662+
Message::GetProcessedHeight { response } => {
663+
response.send_lossy(self.last_processed_height);
664+
}
662665
Message::HintFinalized { height, targets } => {
663-
// Skip if height is at or below the floor
664-
if height <= self.last_processed_height {
666+
// Skip if height is below the floor
667+
if height < self.last_processed_height {
665668
continue;
666669
}
667670

668-
// Skip if finalization is already available locally
669-
if self.get_finalization_by_height(height).await.is_some() {
671+
// Skip if both the finalization and the block are
672+
// already available locally. Both are checked because
673+
// `set_floor` can advance `last_processed_height` to
674+
// a height whose block was never finalized locally.
675+
if self.get_finalization_by_height(height).await.is_some()
676+
&& self.get_finalized_block(height).await.is_some()
677+
{
670678
continue;
671679
}
672680

673-
// Trigger a targeted fetch via the resolver
681+
// Trigger a fetch via the resolver
674682
let request = Request::<V::Commitment>::Finalized { height };
675-
resolver.fetch_targeted(request, targets).await;
683+
match targets {
684+
Some(targets) => {
685+
resolver.fetch_targeted(request, targets).await;
686+
}
687+
None => {
688+
resolver.fetch(request).await;
689+
}
690+
}
676691
}
677692
Message::SubscribeByDigest {
678693
round,
@@ -704,7 +719,10 @@ where
704719
)
705720
.await;
706721
}
707-
Message::SetFloor { height } => {
722+
Message::SetFloor {
723+
height,
724+
prune_archives,
725+
} => {
708726
if self.last_processed_height >= height {
709727
warn!(
710728
%height,
@@ -726,10 +744,12 @@ where
726744
// updating `last_processed_height`.
727745
self.pending_acks.clear();
728746

729-
// Prune data in the finalized archives below the new floor.
730-
if let Err(err) = self.prune_finalized_archives(height).await {
731-
error!(?err, %height, "failed to prune finalized archives");
732-
return;
747+
if prune_archives {
748+
// Prune data in the finalized archives below the new floor.
749+
if let Err(err) = self.prune_finalized_archives(height).await {
750+
error!(?err, %height, "failed to prune finalized archives");
751+
return;
752+
}
733753
}
734754

735755
// Intentionally keep existing block subscriptions alive. Canceling
@@ -1455,15 +1475,19 @@ where
14551475
finalization: Option<Finalization<P::Scheme, V::Commitment>>,
14561476
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
14571477
) -> bool {
1458-
// Blocks below the last processed height are not useful to us, so we ignore them (this
1459-
// has the nice byproduct of ensuring we don't call a backing store with a block below the
1460-
// pruning boundary)
1461-
if height <= self.last_processed_height {
1478+
// Blocks below the last processed height are not useful to us, so we
1479+
// ignore them (this has the nice byproduct of ensuring we don't call
1480+
// a backing store with a block below the pruning boundary).
1481+
//
1482+
// Blocks at exactly the processed height are allowed, however. After
1483+
// a `set_floor`, it may be possible that the floor was set to a height
1484+
// that marshal doesn't have.
1485+
if height < self.last_processed_height {
14621486
debug!(
14631487
%height,
14641488
floor = %self.last_processed_height,
14651489
?digest,
1466-
"dropping finalization at or below processed height floor"
1490+
"dropping finalization below processed height floor"
14671491
);
14681492
return false;
14691493
}

consensus/src/marshal/core/mailbox.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ pub(crate) enum Message<S: Scheme, V: Variant> {
4646
/// A channel to send the retrieved finalization.
4747
response: oneshot::Sender<Option<Finalization<S, V::Commitment>>>,
4848
},
49+
/// A request to retrieve the latest processed height acknowledged by the application.
50+
GetProcessedHeight {
51+
/// A channel to send the latest processed height.
52+
response: oneshot::Sender<Height>,
53+
},
4954
/// A hint that a finalized block may be available at a given height.
5055
///
5156
/// This triggers a network fetch if the finalization is not available locally.
@@ -63,8 +68,9 @@ pub(crate) enum Message<S: Scheme, V: Variant> {
6368
HintFinalized {
6469
/// The height of the finalization to fetch.
6570
height: Height,
66-
/// Target peers to fetch from. Added to any existing targets for this height.
67-
targets: NonEmptyVec<S::PublicKey>,
71+
/// Target peers to fetch from. Added to any existing targets for this
72+
/// height. When `None`, the resolver may ask any peer.
73+
targets: Option<NonEmptyVec<S::PublicKey>>,
6874
},
6975
/// A request to subscribe to a block by its digest.
7076
SubscribeByDigest {
@@ -132,7 +138,7 @@ pub(crate) enum Message<S: Scheme, V: Variant> {
132138
/// Sets the sync starting point (advances if higher than current).
133139
///
134140
/// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
135-
/// the floor is pruned.
141+
/// the floor is pruned when `prune_archives` is `true`.
136142
///
137143
/// To prune data without affecting the sync starting point (say at some trailing depth
138144
/// from tip), use [Message::Prune] instead.
@@ -141,6 +147,9 @@ pub(crate) enum Message<S: Scheme, V: Variant> {
141147
SetFloor {
142148
/// The candidate floor height.
143149
height: Height,
150+
151+
/// Whether to prune finalized archives below the new floor.
152+
prune_archives: bool,
144153
},
145154
/// Prunes finalized blocks and certificates below the given height.
146155
///
@@ -215,15 +224,23 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
215224
.flatten()
216225
}
217226

227+
/// Retrieve the latest processed height acknowledged by the application.
228+
pub async fn get_processed_height(&self) -> Option<Height> {
229+
self.sender
230+
.request(|response| Message::GetProcessedHeight { response })
231+
.await
232+
}
233+
218234
/// Hints that a finalized block may be available at the given height.
219235
///
220236
/// This method will request the finalization from the network via the resolver
221237
/// if it is not available locally.
222238
///
223-
/// Targets are required because this is typically called when a peer claims to be
224-
/// ahead. By targeting only those peers, we limit who we ask. If a target returns
225-
/// invalid data, they will be blocked by the resolver. If targets don't respond
226-
/// or return "no data", they effectively rate-limit themselves.
239+
/// When `targets` is `Some`, only those peers are tried. This is useful when
240+
/// a specific peer claims to be ahead. If a target returns invalid data, it
241+
/// will be blocked by the resolver.
242+
///
243+
/// When `targets` is `None`, the resolver may ask any peer.
227244
///
228245
/// Calling this multiple times for the same height with different targets will
229246
/// add to the target set if there is an ongoing fetch, allowing more peers to be tried.
@@ -234,7 +251,7 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
234251
/// The height must be covered by both the epocher and the provider. If the
235252
/// epocher cannot map the height to an epoch, or the provider cannot supply
236253
/// a scheme for that epoch, the hint is silently dropped.
237-
pub async fn hint_finalized(&self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
254+
pub async fn hint_finalized(&self, height: Height, targets: Option<NonEmptyVec<S::PublicKey>>) {
238255
self.sender
239256
.send_lossy(Message::HintFinalized { height, targets })
240257
.await;
@@ -346,14 +363,19 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
346363
/// Sets the sync starting point (advances if higher than current).
347364
///
348365
/// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
349-
/// the floor is pruned.
366+
/// the floor is pruned when `prune_archives` is `true`.
350367
///
351368
/// To prune data without affecting the sync starting point (say at some trailing depth
352369
/// from tip), use [Self::prune] instead.
353370
///
354371
/// The default floor is 0.
355-
pub async fn set_floor(&self, height: Height) {
356-
self.sender.send_lossy(Message::SetFloor { height }).await;
372+
pub async fn set_floor(&self, height: Height, prune_archives: bool) {
373+
self.sender
374+
.send_lossy(Message::SetFloor {
375+
height,
376+
prune_archives,
377+
})
378+
.await;
357379
}
358380

359381
/// Prunes finalized blocks and certificates below the given height.

0 commit comments

Comments
 (0)