Skip to content

Commit 585d566

Browse files
committed
[consensus/marshal] commonware-glue marshal changes
1 parent 25bd841 commit 585d566

8 files changed

Lines changed: 403 additions & 87 deletions

File tree

consensus/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,8 @@ stability_scope!(ALPHA {
236236
pub mod ordered_broadcast;
237237
});
238238
stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
239-
use commonware_cryptography::certificate::Scheme;
240239
use crate::marshal::ancestry::Ancestry;
240+
use commonware_cryptography::certificate::Scheme;
241241
use commonware_runtime::{Clock, Metrics, Spawner};
242242
use rand::Rng;
243243

consensus/src/marshal/ancestry.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,21 @@ pub struct AncestorStream<M: BlockProvider> {
5959
}
6060

6161
impl<M: BlockProvider> AncestorStream<M> {
62+
/// Returns a reference to the next block that will be yielded by the
63+
/// stream, without consuming it.
64+
///
65+
/// Returns `None` if the buffer is empty and the next block is being
66+
/// fetched asynchronously.
67+
pub fn peek(&self) -> Option<&M::Block> {
68+
self.buffered.last()
69+
}
70+
6271
/// Creates a new [AncestorStream] starting from the given ancestry.
6372
///
6473
/// # Panics
6574
///
6675
/// Panics if the initial blocks are not contiguous in height.
67-
pub(crate) fn new(marshal: M, initial: impl IntoIterator<Item = M::Block>) -> Self {
76+
pub fn new(marshal: M, initial: impl IntoIterator<Item = M::Block>) -> Self {
6877
let mut buffered = initial.into_iter().collect::<Vec<M::Block>>();
6978
buffered.sort_by_key(Heightable::height);
7079

consensus/src/marshal/coding/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,11 @@ mod tests {
796796
harness::prune_finalized_archives::<CodingHarness>();
797797
}
798798

799+
#[test_traced("WARN")]
800+
fn test_coding_set_floor_without_pruning_preserves_archives() {
801+
harness::set_floor_without_pruning_preserves_archives::<CodingHarness>();
802+
}
803+
799804
#[test_traced("WARN")]
800805
fn test_coding_rejects_block_delivery_below_floor() {
801806
harness::reject_stale_block_delivery_after_floor_update::<CodingHarness>();

consensus/src/marshal/core/actor.rs

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -748,17 +748,52 @@ where
748748
let finalization = self.get_finalization_by_height(height).await;
749749
response.send_lossy(finalization);
750750
}
751+
Message::GetProcessedHeight { response } => {
752+
response.send_lossy(self.floor.height);
753+
}
751754
Message::HintFinalized { height, targets } => {
752-
// Skip if finalization is already available locally
753-
if self.get_finalization_by_height(height).await.is_some() {
755+
// Skip if height is below the floor
756+
if height < self.floor.height {
754757
continue;
755758
}
756759

757-
self.floor.fetch_targeted_if_permitted(
758-
&mut resolver,
759-
Request::finalized(height),
760-
targets,
761-
);
760+
// Skip if both the finalization and the block are
761+
// already available locally. Both are checked because
762+
// `set_floor` can advance the processed height to
763+
// a height whose block was never finalized locally.
764+
if self.get_finalization_by_height(height).await.is_some()
765+
&& self.get_finalized_block(height).await.is_some()
766+
{
767+
continue;
768+
}
769+
770+
match targets {
771+
Recipients::All => {
772+
self.floor.fetch_if_permitted(
773+
&mut resolver,
774+
Request::<V::Commitment>::finalized(height),
775+
);
776+
}
777+
Recipients::Some(peers) => {
778+
if peers.is_empty() {
779+
continue;
780+
}
781+
let peers = NonEmptyVec::from_unchecked(peers);
782+
self.floor.fetch_targeted_if_permitted(
783+
&mut resolver,
784+
Request::<V::Commitment>::finalized(height),
785+
peers,
786+
);
787+
}
788+
Recipients::One(peer) => {
789+
let peers = NonEmptyVec::new(peer);
790+
self.floor.fetch_targeted_if_permitted(
791+
&mut resolver,
792+
Request::<V::Commitment>::finalized(height),
793+
peers,
794+
);
795+
}
796+
}
762797
}
763798
Message::SubscribeByDigest {
764799
digest,
@@ -799,7 +834,10 @@ where
799834
)
800835
.await;
801836
}
802-
Message::SetFloor { height } => {
837+
Message::SetFloor {
838+
height,
839+
prune_archives,
840+
} => {
803841
if self.floor.height >= height {
804842
warn!(
805843
%height,
@@ -822,10 +860,11 @@ where
822860
// updating `floor.height`.
823861
self.pending_acks.clear();
824862

825-
// The floor is durable, so cache/finalized data below it can be pruned.
826-
if let Err(err) = self.prune_after_floor(height).await {
827-
error!(?err, %height, "failed to prune data below floor");
828-
return;
863+
if prune_archives {
864+
if let Err(err) = self.prune_after_floor(height).await {
865+
error!(?err, %height, "failed to prune data below floor");
866+
return;
867+
}
829868
}
830869

831870
// Intentionally keep existing block subscriptions alive. Canceling
@@ -1638,15 +1677,19 @@ where
16381677
finalization: Option<Finalization<P::Scheme, V::Commitment>>,
16391678
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
16401679
) -> bool {
1641-
// Blocks below the last processed height are not useful to us, so we ignore them (this
1642-
// has the nice byproduct of ensuring we don't call a backing store with a block below the
1643-
// pruning boundary)
1644-
if height <= self.floor.height {
1680+
// Blocks below the last processed height are not useful to us, so we
1681+
// ignore them (this has the nice byproduct of ensuring we don't call
1682+
// a backing store with a block below the pruning boundary).
1683+
//
1684+
// Blocks at exactly the processed height are allowed, however. After
1685+
// a `set_floor`, it may be possible that the floor was set to a height
1686+
// that marshal doesn't have.
1687+
if height < self.floor.height {
16451688
debug!(
16461689
%height,
16471690
floor = %self.floor.height,
16481691
?digest,
1649-
"dropping finalization at or below processed height floor"
1692+
"dropping finalization below processed height floor"
16501693
);
16511694
return false;
16521695
}

0 commit comments

Comments
 (0)