22
33use crate :: { types:: Height , Block , Heightable } ;
44use commonware_cryptography:: Digestible ;
5+ use commonware_utils:: sync:: Mutex ;
56use futures:: {
67 future:: { BoxFuture , OptionFuture } ,
78 FutureExt , Stream ,
@@ -10,6 +11,7 @@ use pin_project::pin_project;
1011use std:: {
1112 future:: Future ,
1213 pin:: Pin ,
14+ sync:: Arc ,
1315 task:: { Context , Poll } ,
1416} ;
1517
@@ -34,6 +36,45 @@ pub trait BlockProvider: Clone + Send + 'static {
3436 ) -> impl Future < Output = Option < Self :: Block > > + Send ;
3537}
3638
39+ /// A type-erased [`BlockProvider`] that wraps any concrete provider behind
40+ /// a shared closure.
41+ ///
42+ /// Used by actor mailboxes and other channel-based patterns where a generic
43+ /// `BlockProvider` type parameter must be eliminated before sending across
44+ /// a channel.
45+ #[ derive( Clone ) ]
46+ pub struct ErasedBlockProvider < B : Block > {
47+ fetch : Arc < dyn Fn ( <B as Digestible >:: Digest ) -> BoxFuture < ' static , Option < B > > + Send + Sync > ,
48+ }
49+
50+ impl < B : Block > ErasedBlockProvider < B > {
51+ /// Erase a concrete [`BlockProvider`] into a type-erased provider.
52+ ///
53+ /// The provider is wrapped in a `Mutex` so that the resulting closure
54+ /// is `Sync` (required by `Arc<dyn Fn + Send + Sync>`). The lock is
55+ /// held only for the duration of `clone()`, never across an await.
56+ pub fn new < M : BlockProvider < Block = B > > ( provider : M ) -> Self {
57+ let provider = Arc :: new ( Mutex :: new ( provider) ) ;
58+ Self {
59+ fetch : Arc :: new ( move |digest| {
60+ let p = provider. lock ( ) . clone ( ) ;
61+ Box :: pin ( async move { p. fetch_block ( digest) . await } )
62+ } ) ,
63+ }
64+ }
65+ }
66+
67+ impl < B : Block > BlockProvider for ErasedBlockProvider < B > {
68+ type Block = B ;
69+
70+ fn fetch_block (
71+ self ,
72+ digest : <B as Digestible >:: Digest ,
73+ ) -> impl Future < Output = Option < B > > + Send {
74+ ( self . fetch ) ( digest)
75+ }
76+ }
77+
3778/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
3879///
3980// TODO(<https://github.com/commonwarexyz/monorepo/issues/2982>): Once marshal can also yield the genesis block,
@@ -47,6 +88,31 @@ pub struct AncestorStream<M, B: Block> {
4788}
4889
4990impl < M , B : Block > AncestorStream < M , B > {
91+ /// Returns a reference to the next block that will be yielded by the
92+ /// stream, without consuming it.
93+ ///
94+ /// Returns `None` if the buffer is empty and the next block is being
95+ /// fetched asynchronously.
96+ pub fn peek ( & self ) -> Option < & B > {
97+ self . buffered . last ( )
98+ }
99+
100+ /// Erase the block provider type, producing an
101+ /// `AncestorStream<ErasedBlockProvider<B>, B>`.
102+ ///
103+ /// The returned stream behaves identically but can be sent across
104+ /// channels that require a concrete type.
105+ pub fn erase ( self ) -> AncestorStream < ErasedBlockProvider < B > , B >
106+ where
107+ M : BlockProvider < Block = B > ,
108+ {
109+ AncestorStream {
110+ buffered : self . buffered ,
111+ marshal : ErasedBlockProvider :: new ( self . marshal ) ,
112+ pending : self . pending ,
113+ }
114+ }
115+
50116 /// Creates a new [AncestorStream] starting from the given ancestry.
51117 ///
52118 /// # Panics
0 commit comments