Skip to content

Commit f72fb17

Browse files
committed
[consensus/marshal] commonware-glue marshal changes
1 parent 1c65a16 commit f72fb17

7 files changed

Lines changed: 270 additions & 46 deletions

File tree

consensus/src/marshal/ancestry.rs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use crate::{types::Height, Block, Heightable};
44
use commonware_cryptography::Digestible;
5+
use commonware_utils::sync::Mutex;
56
use futures::{
67
future::{BoxFuture, OptionFuture},
78
FutureExt, Stream,
@@ -10,6 +11,7 @@ use pin_project::pin_project;
1011
use std::{
1112
future::Future,
1213
pin::Pin,
14+
sync::Arc,
1315
task::{Context, Poll},
1416
};
1517

@@ -34,6 +36,45 @@ pub trait BlockProvider: Clone + Send + 'static {
3436
) -> impl Future<Output = Option<Self::Block>> + Send;
3537
}
3638

39+
/// A type-erased [`BlockProvider`] that wraps any concrete provider behind
40+
/// a shared closure.
41+
///
42+
/// Used by actor mailboxes and other channel-based patterns where a generic
43+
/// `BlockProvider` type parameter must be eliminated before sending across
44+
/// a channel.
45+
#[derive(Clone)]
46+
pub struct ErasedBlockProvider<B: Block> {
47+
fetch: Arc<dyn Fn(<B as Digestible>::Digest) -> BoxFuture<'static, Option<B>> + Send + Sync>,
48+
}
49+
50+
impl<B: Block> ErasedBlockProvider<B> {
51+
/// Erase a concrete [`BlockProvider`] into a type-erased provider.
52+
///
53+
/// The provider is wrapped in a `Mutex` so that the resulting closure
54+
/// is `Sync` (required by `Arc<dyn Fn + Send + Sync>`). The lock is
55+
/// held only for the duration of `clone()`, never across an await.
56+
pub fn new<M: BlockProvider<Block = B>>(provider: M) -> Self {
57+
let provider = Arc::new(Mutex::new(provider));
58+
Self {
59+
fetch: Arc::new(move |digest| {
60+
let p = provider.lock().clone();
61+
Box::pin(async move { p.fetch_block(digest).await })
62+
}),
63+
}
64+
}
65+
}
66+
67+
impl<B: Block> BlockProvider for ErasedBlockProvider<B> {
68+
type Block = B;
69+
70+
fn fetch_block(
71+
self,
72+
digest: <B as Digestible>::Digest,
73+
) -> impl Future<Output = Option<B>> + Send {
74+
(self.fetch)(digest)
75+
}
76+
}
77+
3778
/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
3879
///
3980
// TODO(<https://github.com/commonwarexyz/monorepo/issues/2982>): Once marshal can also yield the genesis block,
@@ -47,12 +88,37 @@ pub struct AncestorStream<M, B: Block> {
4788
}
4889

4990
impl<M, B: Block> AncestorStream<M, B> {
91+
/// Returns a reference to the next block that will be yielded by the
92+
/// stream, without consuming it.
93+
///
94+
/// Returns `None` if the buffer is empty and the next block is being
95+
/// fetched asynchronously.
96+
pub fn peek(&self) -> Option<&B> {
97+
self.buffered.last()
98+
}
99+
100+
/// Erase the block provider type, producing an
101+
/// `AncestorStream<ErasedBlockProvider<B>, B>`.
102+
///
103+
/// The returned stream behaves identically but can be sent across
104+
/// channels that require a concrete type.
105+
pub fn erase(self) -> AncestorStream<ErasedBlockProvider<B>, B>
106+
where
107+
M: BlockProvider<Block = B>,
108+
{
109+
AncestorStream {
110+
buffered: self.buffered,
111+
marshal: ErasedBlockProvider::new(self.marshal),
112+
pending: self.pending,
113+
}
114+
}
115+
50116
/// Creates a new [AncestorStream] starting from the given ancestry.
51117
///
52118
/// # Panics
53119
///
54120
/// Panics if the initial blocks are not contiguous in height.
55-
pub(crate) fn new(marshal: M, initial: impl IntoIterator<Item = B>) -> Self {
121+
pub fn new(marshal: M, initial: impl IntoIterator<Item = B>) -> Self {
56122
let mut buffered = initial.into_iter().collect::<Vec<B>>();
57123
buffered.sort_by_key(Heightable::height);
58124

consensus/src/marshal/coding/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ mod tests {
197197
harness::prune_finalized_archives::<CodingHarness>();
198198
}
199199

200+
#[test_traced("WARN")]
201+
fn test_coding_set_floor_without_pruning_preserves_archives() {
202+
harness::set_floor_without_pruning_preserves_archives::<CodingHarness>();
203+
}
204+
200205
#[test_traced("WARN")]
201206
fn test_coding_rejects_block_delivery_below_floor() {
202207
harness::reject_stale_block_delivery_after_floor_update::<CodingHarness>();

consensus/src/marshal/core/actor.rs

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use commonware_utils::{
4040
channel::{fallible::OneshotExt, mpsc, oneshot},
4141
futures::{AbortablePool, Aborter, OptionFuture},
4242
sequence::U64,
43+
vec::NonEmptyVec,
4344
Acknowledgement, BoxedError,
4445
};
4546
use futures::{future::join_all, try_join, FutureExt};
@@ -659,20 +660,41 @@ where
659660
let finalization = self.get_finalization_by_height(height).await;
660661
response.send_lossy(finalization);
661662
}
663+
Message::GetProcessedHeight { response } => {
664+
response.send_lossy(self.last_processed_height);
665+
}
662666
Message::HintFinalized { height, targets } => {
663-
// Skip if height is at or below the floor
664-
if height <= self.last_processed_height {
667+
// Skip if height is below the floor
668+
if height < self.last_processed_height {
665669
continue;
666670
}
667671

668-
// Skip if finalization is already available locally
669-
if self.get_finalization_by_height(height).await.is_some() {
672+
// Skip if both the finalization and the block are
673+
// already available locally. Both are checked because
674+
// `set_floor` can advance `last_processed_height` to
675+
// a height whose block was never finalized locally.
676+
if self.get_finalization_by_height(height).await.is_some()
677+
&& self.get_finalized_block(height).await.is_some()
678+
{
670679
continue;
671680
}
672681

673-
// Trigger a targeted fetch via the resolver
682+
// Trigger a fetch via the resolver
674683
let request = Request::<V::Commitment>::Finalized { height };
675-
resolver.fetch_targeted(request, targets).await;
684+
match targets {
685+
Recipients::All => resolver.fetch(request).await,
686+
Recipients::Some(peers) => {
687+
if peers.is_empty() {
688+
continue;
689+
}
690+
let peers = NonEmptyVec::from_unchecked(peers);
691+
resolver.fetch_targeted(request, peers).await;
692+
},
693+
Recipients::One(peer) => {
694+
let peers = NonEmptyVec::new(peer);
695+
resolver.fetch_targeted(request, peers).await;
696+
}
697+
}
676698
}
677699
Message::SubscribeByDigest {
678700
round,
@@ -704,7 +726,10 @@ where
704726
)
705727
.await;
706728
}
707-
Message::SetFloor { height } => {
729+
Message::SetFloor {
730+
height,
731+
prune_archives,
732+
} => {
708733
if self.last_processed_height >= height {
709734
warn!(
710735
%height,
@@ -726,10 +751,12 @@ where
726751
// updating `last_processed_height`.
727752
self.pending_acks.clear();
728753

729-
// Prune data in the finalized archives below the new floor.
730-
if let Err(err) = self.prune_finalized_archives(height).await {
731-
error!(?err, %height, "failed to prune finalized archives");
732-
return;
754+
if prune_archives {
755+
// Prune data in the finalized archives below the new floor.
756+
if let Err(err) = self.prune_finalized_archives(height).await {
757+
error!(?err, %height, "failed to prune finalized archives");
758+
return;
759+
}
733760
}
734761

735762
// Intentionally keep existing block subscriptions alive. Canceling
@@ -1455,15 +1482,19 @@ where
14551482
finalization: Option<Finalization<P::Scheme, V::Commitment>>,
14561483
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
14571484
) -> bool {
1458-
// Blocks below the last processed height are not useful to us, so we ignore them (this
1459-
// has the nice byproduct of ensuring we don't call a backing store with a block below the
1460-
// pruning boundary)
1461-
if height <= self.last_processed_height {
1485+
// Blocks below the last processed height are not useful to us, so we
1486+
// ignore them (this has the nice byproduct of ensuring we don't call
1487+
// a backing store with a block below the pruning boundary).
1488+
//
1489+
// Blocks at exactly the processed height are allowed, however. After
1490+
// a `set_floor`, it may be possible that the floor was set to a height
1491+
// that marshal doesn't have.
1492+
if height < self.last_processed_height {
14621493
debug!(
14631494
%height,
14641495
floor = %self.last_processed_height,
14651496
?digest,
1466-
"dropping finalization at or below processed height floor"
1497+
"dropping finalization below processed height floor"
14671498
);
14681499
return false;
14691500
}

consensus/src/marshal/core/mailbox.rs

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@ use crate::{
1010
};
1111
use commonware_cryptography::{certificate::Scheme, Digestible};
1212
use commonware_p2p::Recipients;
13-
use commonware_utils::{
14-
channel::{fallible::AsyncFallibleExt, mpsc, oneshot},
15-
vec::NonEmptyVec,
16-
};
13+
use commonware_utils::channel::{fallible::AsyncFallibleExt, mpsc, oneshot};
1714

1815
/// Messages sent to the marshal [Actor](super::Actor).
1916
///
@@ -46,6 +43,11 @@ pub(crate) enum Message<S: Scheme, V: Variant> {
4643
/// A channel to send the retrieved finalization.
4744
response: oneshot::Sender<Option<Finalization<S, V::Commitment>>>,
4845
},
46+
/// A request to retrieve the latest processed height acknowledged by the application.
47+
GetProcessedHeight {
48+
/// A channel to send the latest processed height.
49+
response: oneshot::Sender<Height>,
50+
},
4951
/// A hint that a finalized block may be available at a given height.
5052
///
5153
/// This triggers a network fetch if the finalization is not available locally.
@@ -63,8 +65,9 @@ pub(crate) enum Message<S: Scheme, V: Variant> {
6365
HintFinalized {
6466
/// The height of the finalization to fetch.
6567
height: Height,
66-
/// Target peers to fetch from. Added to any existing targets for this height.
67-
targets: NonEmptyVec<S::PublicKey>,
68+
/// Target peers to fetch from. Added to any existing targets for this
69+
/// height.
70+
targets: Recipients<S::PublicKey>,
6871
},
6972
/// A request to subscribe to a block by its digest.
7073
SubscribeByDigest {
@@ -132,7 +135,7 @@ pub(crate) enum Message<S: Scheme, V: Variant> {
132135
/// Sets the sync starting point (advances if higher than current).
133136
///
134137
/// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
135-
/// the floor is pruned.
138+
/// the floor is pruned when `prune_archives` is `true`.
136139
///
137140
/// To prune data without affecting the sync starting point (say at some trailing depth
138141
/// from tip), use [Message::Prune] instead.
@@ -141,6 +144,9 @@ pub(crate) enum Message<S: Scheme, V: Variant> {
141144
SetFloor {
142145
/// The candidate floor height.
143146
height: Height,
147+
148+
/// Whether to prune finalized archives below the new floor.
149+
prune_archives: bool,
144150
},
145151
/// Prunes finalized blocks and certificates below the given height.
146152
///
@@ -215,16 +221,18 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
215221
.flatten()
216222
}
217223

224+
/// Retrieve the latest processed height acknowledged by the application.
225+
pub async fn get_processed_height(&self) -> Option<Height> {
226+
self.sender
227+
.request(|response| Message::GetProcessedHeight { response })
228+
.await
229+
}
230+
218231
/// Hints that a finalized block may be available at the given height.
219232
///
220233
/// This method will request the finalization from the network via the resolver
221234
/// if it is not available locally.
222235
///
223-
/// Targets are required because this is typically called when a peer claims to be
224-
/// ahead. By targeting only those peers, we limit who we ask. If a target returns
225-
/// invalid data, they will be blocked by the resolver. If targets don't respond
226-
/// or return "no data", they effectively rate-limit themselves.
227-
///
228236
/// Calling this multiple times for the same height with different targets will
229237
/// add to the target set if there is an ongoing fetch, allowing more peers to be tried.
230238
///
@@ -234,7 +242,7 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
234242
/// The height must be covered by both the epocher and the provider. If the
235243
/// epocher cannot map the height to an epoch, or the provider cannot supply
236244
/// a scheme for that epoch, the hint is silently dropped.
237-
pub async fn hint_finalized(&self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
245+
pub async fn hint_finalized(&self, height: Height, targets: Recipients<S::PublicKey>) {
238246
self.sender
239247
.send_lossy(Message::HintFinalized { height, targets })
240248
.await;
@@ -346,14 +354,19 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
346354
/// Sets the sync starting point (advances if higher than current).
347355
///
348356
/// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
349-
/// the floor is pruned.
357+
/// the floor is pruned when `prune_archives` is `true`.
350358
///
351359
/// To prune data without affecting the sync starting point (say at some trailing depth
352360
/// from tip), use [Self::prune] instead.
353361
///
354362
/// The default floor is 0.
355-
pub async fn set_floor(&self, height: Height) {
356-
self.sender.send_lossy(Message::SetFloor { height }).await;
363+
pub async fn set_floor(&self, height: Height, prune_archives: bool) {
364+
self.sender
365+
.send_lossy(Message::SetFloor {
366+
height,
367+
prune_archives,
368+
})
369+
.await;
357370
}
358371

359372
/// Prunes finalized blocks and certificates below the given height.

0 commit comments

Comments
 (0)