Skip to content

Commit 325c42c

Browse files
[marshal] Alto Tweaks (#3862)
1 parent b455f67 commit 325c42c

11 files changed

Lines changed: 180 additions & 115 deletions

File tree

consensus/src/marshal/ancestry.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use futures::{
99
};
1010
use pin_project::pin_project;
1111
use std::{
12+
collections::VecDeque,
1213
future::Future,
1314
pin::Pin,
1415
sync::Arc,
@@ -22,6 +23,39 @@ pub trait Ancestry<B: Block>: Stream<Item = B> + Send + Unpin + 'static {
2223
fn peek(&self) -> Option<&B>;
2324
}
2425

26+
/// Creates an ancestry stream from a fixed sequence of blocks.
27+
///
28+
/// Blocks are yielded in iterator order and no parent fetching is performed. This is useful when
29+
/// the caller wants to bound the ancestry available to the application.
30+
pub fn from_iter<B>(blocks: impl IntoIterator<Item = B>) -> impl Ancestry<B>
31+
where
32+
B: Block,
33+
{
34+
BoundedAncestry {
35+
blocks: blocks.into_iter().collect(),
36+
}
37+
}
38+
39+
struct BoundedAncestry<B: Block> {
40+
blocks: VecDeque<B>,
41+
}
42+
43+
impl<B: Block> Unpin for BoundedAncestry<B> {}
44+
45+
impl<B: Block> Ancestry<B> for BoundedAncestry<B> {
46+
fn peek(&self) -> Option<&B> {
47+
self.blocks.front()
48+
}
49+
}
50+
51+
impl<B: Block> Stream for BoundedAncestry<B> {
52+
type Item = B;
53+
54+
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55+
Poll::Ready(self.blocks.pop_front())
56+
}
57+
}
58+
2559
/// An interface for providing parent blocks.
2660
pub trait BlockProvider: Send + 'static {
2761
/// The block type the provider walks.
@@ -384,6 +418,34 @@ mod test {
384418
});
385419
}
386420

421+
#[test]
422+
fn test_from_iter_available_through_ancestry_trait() {
423+
fn peek_height(ancestry: impl Ancestry<Block<Sha256Digest, ()>>) -> Option<Height> {
424+
ancestry.peek().map(Heightable::height)
425+
}
426+
427+
let block = Block::new::<Sha256>((), Sha256Digest::EMPTY, Height::new(1), 1);
428+
let ancestry = from_iter([block.clone()]);
429+
430+
assert_eq!(peek_height(ancestry), Some(block.height()));
431+
}
432+
433+
#[test]
434+
fn test_from_iter_yields_blocks_in_order_and_peeks_next() {
435+
deterministic::Runner::default().start(|_| async move {
436+
let parent = Block::new::<Sha256>((), Sha256Digest::EMPTY, Height::new(1), 1);
437+
let child = Block::new::<Sha256>((), parent.digest(), Height::new(2), 2);
438+
let mut ancestry = from_iter([child.clone(), parent.clone()]);
439+
440+
assert_eq!(ancestry.peek(), Some(&child));
441+
assert_eq!(ancestry.next().await, Some(child));
442+
assert_eq!(ancestry.peek(), Some(&parent));
443+
assert_eq!(ancestry.next().await, Some(parent));
444+
assert_eq!(ancestry.peek(), None);
445+
assert_eq!(ancestry.next().await, None);
446+
});
447+
}
448+
387449
#[test]
388450
fn test_yields_genesis_and_stops() {
389451
deterministic::Runner::default().start(|context| async move {

consensus/src/marshal/coding/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,19 +156,19 @@ mod tests {
156156
None
157157
}
158158

159-
fn subscribe_by_digest(&self, _digest: D) -> oneshot::Receiver<TestCodedBlock> {
159+
fn subscribe_by_digest(&self, _digest: D) -> Option<oneshot::Receiver<TestCodedBlock>> {
160160
let (sender, receiver) = oneshot::channel();
161161
self.digest_subscriptions.lock().push(sender);
162-
receiver
162+
Some(receiver)
163163
}
164164

165165
fn subscribe_by_commitment(
166166
&self,
167167
_commitment: Commitment,
168-
) -> oneshot::Receiver<TestCodedBlock> {
168+
) -> Option<oneshot::Receiver<TestCodedBlock>> {
169169
let (sender, receiver) = oneshot::channel();
170170
self.commitment_subscriptions.lock().push(sender);
171-
receiver
171+
Some(receiver)
172172
}
173173

174174
fn finalized(&self, _commitment: Commitment) {}
@@ -408,7 +408,7 @@ mod tests {
408408
let (resolver_rx, resolver) = RecordingResolver::holding(context.child("resolver"));
409409
let actor_handle = actor.start(
410410
Application::<CodingB>::default(),
411-
Some(buffer),
411+
buffer,
412412
(resolver_rx, resolver.clone()),
413413
);
414414
(mailbox, resolver, actor_handle)

consensus/src/marshal/coding/variant.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,15 @@ where
9595
fn subscribe_by_digest(
9696
&self,
9797
digest: <CodedBlock<B, C, H> as Digestible>::Digest,
98-
) -> oneshot::Receiver<CodedBlock<B, C, H>> {
99-
self.subscribe_by_digest(digest)
98+
) -> Option<oneshot::Receiver<CodedBlock<B, C, H>>> {
99+
Some(self.subscribe_by_digest(digest))
100100
}
101101

102102
fn subscribe_by_commitment(
103103
&self,
104104
commitment: Commitment,
105-
) -> oneshot::Receiver<CodedBlock<B, C, H>> {
106-
self.subscribe(commitment)
105+
) -> Option<oneshot::Receiver<CodedBlock<B, C, H>>> {
106+
Some(self.subscribe(commitment))
107107
}
108108

109109
fn finalized(&self, commitment: Commitment) {

consensus/src/marshal/core/actor.rs

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use super::{
66
mailbox::{CommitmentFallback, Mailbox, Message},
77
stream::Stream,
88
subscriptions::{Key as SubscriptionKey, KeyFor as SubscriptionKeyFor, Subscriptions},
9-
variant::OptionalBuffer,
9+
variant::NoBuffer,
1010
Buffer, Variant,
1111
};
1212
use crate::{
@@ -296,7 +296,7 @@ where
296296
pub fn start<R, Buf>(
297297
mut self,
298298
application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
299-
buffer: Option<Buf>,
299+
buffer: Buf,
300300
resolver: (handler::Receiver<V::Commitment>, R),
301301
) -> Handle<()>
302302
where
@@ -307,15 +307,34 @@ where
307307
>,
308308
Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
309309
{
310-
let buffer = OptionalBuffer::new(buffer);
311310
spawn_cell!(self.context, self.run(application, buffer, resolver))
312311
}
313312

313+
/// Start the actor without a broadcast buffer.
314+
pub fn start_unbuffered<R>(
315+
self,
316+
application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
317+
resolver: (handler::Receiver<V::Commitment>, R),
318+
) -> Handle<()>
319+
where
320+
R: Resolver<
321+
Key = ResolverRequestFor<V>,
322+
Subscriber = Annotation,
323+
PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
324+
>,
325+
{
326+
self.start(
327+
application,
328+
NoBuffer::<<P::Scheme as CertificateScheme>::PublicKey>::new(),
329+
resolver,
330+
)
331+
}
332+
314333
/// Run the application actor.
315334
async fn run<R, Buf>(
316335
mut self,
317336
mut application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
318-
mut buffer: OptionalBuffer<V, Buf>,
337+
mut buffer: Buf,
319338
(mut resolver_rx, mut resolver): (handler::Receiver<V::Commitment>, R),
320339
) where
321340
R: Resolver<
@@ -436,7 +455,7 @@ where
436455
&mut self,
437456
result: <A::Waiter as Future>::Output,
438457
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
439-
buffer: &mut OptionalBuffer<V, Buf>,
458+
buffer: &mut Buf,
440459
resolver: &mut R,
441460
) where
442461
Buf: Buffer<V>,
@@ -491,7 +510,7 @@ where
491510
message: Message<P::Scheme, V>,
492511
resolver: &mut R,
493512
waiters: &mut AbortablePool<Result<V::Block, SubscriptionKeyFor<V>>>,
494-
buffer: &mut OptionalBuffer<V, Buf>,
513+
buffer: &mut Buf,
495514
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
496515
) where
497516
Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
@@ -764,7 +783,7 @@ where
764783
message: handler::Message<V::Commitment>,
765784
resolver_rx: &mut handler::Receiver<V::Commitment>,
766785
resolver: &mut R,
767-
buffer: &mut OptionalBuffer<V, Buf>,
786+
buffer: &mut Buf,
768787
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
769788
) where
770789
Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
@@ -850,7 +869,7 @@ where
850869
&self,
851870
key: ResolverRequestFor<V>,
852871
response: oneshot::Sender<Bytes>,
853-
buffer: &OptionalBuffer<V, Buf>,
872+
buffer: &Buf,
854873
) {
855874
match key {
856875
Key::Block(commitment) => {
@@ -898,7 +917,7 @@ where
898917
PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
899918
>,
900919
waiters: &mut AbortablePool<Result<V::Block, SubscriptionKeyFor<V>>>,
901-
buffer: &mut OptionalBuffer<V, Buf>,
920+
buffer: &mut Buf,
902921
) {
903922
let digest = match key {
904923
SubscriptionKey::Digest(digest) => digest,
@@ -985,7 +1004,7 @@ where
9851004
finalization: Finalization<P::Scheme, V::Commitment>,
9861005
skip_if_superseded: bool,
9871006
resolver: &mut R,
988-
buffer: &mut OptionalBuffer<V, Buf>,
1007+
buffer: &mut Buf,
9891008
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
9901009
) where
9911010
Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
@@ -1052,7 +1071,7 @@ where
10521071
async fn apply_floor_anchor<Buf: Buffer<V>>(
10531072
&mut self,
10541073
block: &V::Block,
1055-
buffer: &mut OptionalBuffer<V, Buf>,
1074+
buffer: &mut Buf,
10561075
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
10571076
resolver: &mut impl Resolver<
10581077
Key = ResolverRequestFor<V>,
@@ -1171,7 +1190,7 @@ where
11711190
&mut self,
11721191
message: ResolverDelivery<V>,
11731192
delivers: &mut Vec<PendingVerification<P::Scheme, V>>,
1174-
buffer: &mut OptionalBuffer<V, Buf>,
1193+
buffer: &mut Buf,
11751194
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
11761195
resolver: &mut impl Resolver<
11771196
Key = ResolverRequestFor<V>,
@@ -1349,7 +1368,7 @@ where
13491368
async fn verify_delivered<Buf: Buffer<V>>(
13501369
&mut self,
13511370
mut delivers: Vec<PendingVerification<P::Scheme, V>>,
1352-
buffer: &mut OptionalBuffer<V, Buf>,
1371+
buffer: &mut Buf,
13531372
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
13541373
resolver: &mut impl Resolver<
13551374
Key = ResolverRequestFor<V>,
@@ -1848,7 +1867,7 @@ where
18481867
/// parent links).
18491868
async fn find_block_by_digest<Buf: Buffer<V>>(
18501869
&self,
1851-
buffer: &OptionalBuffer<V, Buf>,
1870+
buffer: &Buf,
18521871
digest: <V::Block as Digestible>::Digest,
18531872
) -> Option<V::Block> {
18541873
if let Some(block) = buffer.find_by_digest(digest).await {
@@ -1863,7 +1882,7 @@ where
18631882
/// Having the full commitment may enable additional retrieval mechanisms.
18641883
async fn find_block_by_commitment<Buf: Buffer<V>>(
18651884
&self,
1866-
buffer: &OptionalBuffer<V, Buf>,
1885+
buffer: &Buf,
18671886
commitment: V::Commitment,
18681887
) -> Option<V::Block> {
18691888
if let Some(block) = buffer.find_by_commitment(commitment).await {
@@ -1888,7 +1907,7 @@ where
18881907
/// needs a subsequent [`sync_finalized`](Self::sync_finalized).
18891908
async fn try_repair_gaps<Buf: Buffer<V>>(
18901909
&mut self,
1891-
buffer: &mut OptionalBuffer<V, Buf>,
1910+
buffer: &mut Buf,
18921911
resolver: &mut impl Resolver<
18931912
Key = ResolverRequestFor<V>,
18941913
Subscriber = Annotation,

consensus/src/marshal/core/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131
//! // `last_height` is `None` until the application acknowledges a block.
3232
//!
3333
//! // Start with application and buffer
34-
//! actor.start(application, Some(buffer), resolver);
34+
//! actor.start(application, buffer, resolver);
3535
//!
3636
//! // Or omit broadcast buffering for follower-only chain tracking
37-
//! actor.start(application, None::<MyBuffer>, resolver);
37+
//! actor.start_unbuffered(application, resolver);
3838
//! ```
3939
//!
4040
//! For standard mode, use [`crate::marshal::standard::Standard`] as the variant and

0 commit comments

Comments
 (0)