Skip to content

Commit c06078b

Browse files
committed
Remove marshal ancestry provider adapter
1 parent 22c45c6 commit c06078b

4 files changed

Lines changed: 69 additions & 48 deletions

File tree

consensus/src/marshal/core/mailbox.rs

Lines changed: 8 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
},
77
simplex::types::{Activity, Finalization, Notarization},
88
types::{Height, Round},
9-
Heightable, Reporter,
9+
Reporter,
1010
};
1111
use commonware_actor::{
1212
mailbox::{Overflow, Policy, Sender},
@@ -15,7 +15,7 @@ use commonware_actor::{
1515
use commonware_cryptography::{certificate::Scheme, Digestible};
1616
use commonware_p2p::Recipients;
1717
use commonware_utils::{channel::oneshot, vec::NonEmptyVec};
18-
use futures::{Stream, StreamExt};
18+
use futures::Stream;
1919
use std::collections::{btree_map::Entry, BTreeMap, VecDeque};
2020

2121
/// Messages sent to the marshal [Actor](super::Actor).
@@ -514,12 +514,6 @@ pub struct Mailbox<S: Scheme, V: Variant> {
514514
sender: Sender<Message<S, V>>,
515515
}
516516

517-
/// Provider used by [`Mailbox::ancestor_stream`] to fetch missing certified parents.
518-
#[derive(Clone)]
519-
pub(crate) struct AncestryProvider<S: Scheme, V: Variant> {
520-
mailbox: Mailbox<S, V>,
521-
}
522-
523517
impl<S: Scheme, V: Variant> Mailbox<S, V> {
524518
/// Creates a new mailbox.
525519
pub(crate) const fn new(sender: Sender<Message<S, V>>) -> Self {
@@ -540,15 +534,10 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
540534
initial: I,
541535
) -> impl Stream<Item = V::ApplicationBlock> + Send + use<S, V, I>
542536
where
537+
Self: BlockProvider<Block = V::ApplicationBlock>,
543538
I: IntoIterator<Item = V::Block>,
544539
{
545-
AncestorStream::new(
546-
AncestryProvider {
547-
mailbox: self.clone(),
548-
},
549-
initial,
550-
)
551-
.map(V::into_inner)
540+
AncestorStream::new(self.clone(), initial.into_iter().map(V::into_inner))
552541
}
553542

554543
/// A request to retrieve the information about the highest finalized block.
@@ -690,7 +679,10 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
690679
pub async fn ancestry(
691680
&self,
692681
(fallback, start_digest): (DigestFallback, <V::Block as Digestible>::Digest),
693-
) -> Option<impl Stream<Item = V::ApplicationBlock> + Send + use<S, V>> {
682+
) -> Option<impl Stream<Item = V::ApplicationBlock> + Send + use<S, V>>
683+
where
684+
Self: BlockProvider<Block = V::ApplicationBlock>,
685+
{
694686
let receiver = self.subscribe_by_digest(start_digest, fallback);
695687
receiver
696688
.await
@@ -786,31 +778,6 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
786778
}
787779
}
788780

789-
impl<S: Scheme, V: Variant> BlockProvider for AncestryProvider<S, V> {
790-
type Block = V::Block;
791-
792-
async fn subscribe(self, digest: <V::Block as Digestible>::Digest) -> Option<Self::Block> {
793-
let subscription = self
794-
.mailbox
795-
.subscribe_by_digest(digest, DigestFallback::Wait);
796-
subscription.await.ok()
797-
}
798-
799-
async fn subscribe_parent(self, block: Self::Block) -> Option<Self::Block> {
800-
// Ancestry walking does not carry the certified parent round. By this
801-
// point the stream is walking accepted ancestry, so this height should
802-
// be correct; it remains a local pruning bound rather than a peer
803-
// response validity condition.
804-
let parent_height = block.height().previous()?;
805-
let commitment = V::parent_commitment(&block);
806-
let fallback = CommitmentFallback::FetchByCommitment {
807-
height: parent_height,
808-
};
809-
let subscription = self.mailbox.subscribe_by_commitment(commitment, fallback);
810-
subscription.await.ok()
811-
}
812-
}
813-
814781
impl<S: Scheme, V: Variant> Reporter for Mailbox<S, V> {
815782
type Activity = Activity<S, V::Commitment>;
816783

consensus/src/marshal/mocks/harness.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
66
use crate::{
77
marshal::{
8+
ancestry::BlockProvider,
89
coding::{
910
shards,
1011
types::{coding_config_for_participants, CodedBlock},
@@ -4661,7 +4662,10 @@ pub fn hint_finalized_triggers_fetch<H: TestHarness>() {
46614662
}
46624663

46634664
/// Test ancestry stream.
4664-
pub fn ancestry_stream<H: TestHarness>() {
4665+
pub fn ancestry_stream<H: TestHarness>()
4666+
where
4667+
Mailbox<S, H::Variant>: BlockProvider<Block = H::ApplicationBlock>,
4668+
{
46654669
let runner = deterministic::Runner::timed(Duration::from_secs(60));
46664670
runner.start(|mut context| async move {
46674671
let Fixture {

consensus/src/marshal/standard/mod.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ mod tests {
8585
use commonware_parallel::Sequential;
8686
use commonware_resolver::{Consumer, Delivery, Fetch, Resolver};
8787
use commonware_runtime::{
88-
buffer::paged::CacheRef, deterministic, Clock, Metrics, Quota, Runner, Supervisor as _,
88+
buffer::paged::CacheRef, deterministic, Clock, Metrics, Quota, Runner, Spawner,
89+
Supervisor as _,
8990
};
9091
use commonware_storage::{
9192
archive::{immutable, prunable, Archive as _},
@@ -112,6 +113,38 @@ mod tests {
112113
assert_provider::<Mailbox<S, Standard<B>>>();
113114
}
114115

116+
#[test_traced("WARN")]
117+
fn test_standard_block_provider_parent_fetches_by_commitment() {
118+
let runner = deterministic::Runner::timed(Duration::from_secs(30));
119+
runner.start(|mut context| async move {
120+
let Fixture { schemes, .. } =
121+
bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
122+
let buffer = RecordingBuffer::default();
123+
let (mailbox, buffer, _resolver, _actor_handle) = start_standard_actor(
124+
context.child("validator"),
125+
"standard-provider-parent-commitment",
126+
ConstantProvider::new(schemes[0].clone()),
127+
Application::<B>::manual_ack(),
128+
buffer,
129+
)
130+
.await;
131+
132+
let parent = make_raw_block(Sha256::hash(b""), Height::new(1), 100);
133+
let child = make_raw_block(parent.digest(), Height::new(2), 200);
134+
let subscription = context
135+
.child("subscribe_parent")
136+
.spawn(move |_| BlockProvider::subscribe_parent(mailbox, child));
137+
138+
context.sleep(Duration::from_millis(100)).await;
139+
assert_eq!(
140+
buffer.commitment_subscription_count(),
141+
1,
142+
"parent walkback should use the standard parent commitment"
143+
);
144+
subscription.abort();
145+
});
146+
}
147+
115148
fn assert_finalize_deterministic<H: TestHarness>(
116149
seed: u64,
117150
link: commonware_p2p::simulated::Link,
@@ -2437,7 +2470,8 @@ mod tests {
24372470
/// other methods are no-ops.
24382471
#[derive(Clone, Default)]
24392472
struct RecordingBuffer {
2440-
subscriptions: Arc<Mutex<Vec<oneshot::Sender<B>>>>,
2473+
digest_subscriptions: Arc<Mutex<Vec<oneshot::Sender<B>>>>,
2474+
commitment_subscriptions: Arc<Mutex<Vec<oneshot::Sender<B>>>>,
24412475
sends: Arc<Mutex<Vec<BufferSend>>>,
24422476
}
24432477

@@ -2447,7 +2481,11 @@ mod tests {
24472481
}
24482482

24492483
fn subscription_count(&self) -> usize {
2450-
self.subscriptions.lock().len()
2484+
self.digest_subscriptions.lock().len() + self.commitment_subscriptions.lock().len()
2485+
}
2486+
2487+
fn commitment_subscription_count(&self) -> usize {
2488+
self.commitment_subscriptions.lock().len()
24512489
}
24522490
}
24532491

@@ -2464,13 +2502,13 @@ mod tests {
24642502

24652503
fn subscribe_by_digest(&self, _digest: D) -> oneshot::Receiver<B> {
24662504
let (sender, receiver) = oneshot::channel();
2467-
self.subscriptions.lock().push(sender);
2505+
self.digest_subscriptions.lock().push(sender);
24682506
receiver
24692507
}
24702508

24712509
fn subscribe_by_commitment(&self, _commitment: D) -> oneshot::Receiver<B> {
24722510
let (sender, receiver) = oneshot::channel();
2473-
self.subscriptions.lock().push(sender);
2511+
self.commitment_subscriptions.lock().push(sender);
24742512
receiver
24752513
}
24762514

consensus/src/marshal/standard/variant.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use crate::{
77
marshal::{
88
ancestry::BlockProvider,
9-
core::{Buffer, DigestFallback, Mailbox, Variant},
9+
core::{Buffer, CommitmentFallback, DigestFallback, Mailbox, Variant},
1010
},
1111
types::Round,
1212
Block,
@@ -105,4 +105,16 @@ where
105105
.await
106106
.ok()
107107
}
108+
109+
async fn subscribe_parent(self, block: Self::Block) -> Option<Self::Block> {
110+
let parent_height = block.height().previous()?;
111+
self.subscribe_by_commitment(
112+
block.parent(),
113+
CommitmentFallback::FetchByCommitment {
114+
height: parent_height,
115+
},
116+
)
117+
.await
118+
.ok()
119+
}
108120
}

0 commit comments

Comments
 (0)