11//! A stream that yields the ancestors of a block while prefetching parents.
22
33use crate :: { types:: Height , Block , Heightable } ;
4- use commonware_cryptography:: Digestible ;
4+ use commonware_cryptography:: { Digest , Digestible } ;
55use futures:: {
66 future:: { BoxFuture , OptionFuture } ,
77 FutureExt , Stream ,
2424}
2525
2626/// An interface for providing parent blocks.
27- pub trait BlockProvider : Clone + Send + ' static {
27+ pub trait BlockProvider : Send + ' static {
2828 /// The block type the provider walks.
2929 type Block : Block ;
3030
@@ -41,29 +41,56 @@ pub trait BlockProvider: Clone + Send + 'static {
4141 ///
4242 /// The child block can carry variant-specific context needed to retrieve its parent.
4343 fn subscribe_parent (
44- self ,
45- block : Self :: Block ,
46- ) -> impl Future < Output = Option < Self :: Block > > + Send ;
44+ & self ,
45+ block : & Self :: Block ,
46+ ) -> impl Future < Output = Option < Self :: Block > > + Send + ' static ;
4747}
4848
49- /// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
50- ///
51- // TODO(<https://github.com/commonwarexyz/monorepo/issues/2982>): Once marshal can also yield the genesis block,
52- // this stream should end at block height 0 rather than 1.
49+ // Expected parent height and digest for a pending fetch.
50+ struct ExpectedParent < D > ( Height , D ) ;
51+
52+ // Pending parent fetch paired with the relationship it must satisfy.
53+ type PendingFetch < B > = BoxFuture < ' static , Option < ( ExpectedParent < <B as Digestible >:: Digest > , B ) > > ;
54+
55+ impl < D : Digest > ExpectedParent < D > {
56+ fn from_child < B : Block < Digest = D > > ( child : & B ) -> Self {
57+ Self (
58+ child. height ( ) . previous ( ) . expect ( "child must have parent" ) ,
59+ child. parent ( ) ,
60+ )
61+ }
62+
63+ fn assert_matches < B : Block < Digest = D > > ( self , parent : & B ) {
64+ let Self ( parent_height, parent_digest) = self ;
65+ assert_eq ! (
66+ parent. height( ) ,
67+ parent_height,
68+ "fetched parent must be contiguous in height"
69+ ) ;
70+ assert_eq ! (
71+ parent. digest( ) ,
72+ parent_digest,
73+ "fetched parent must be contiguous in ancestry"
74+ ) ;
75+ }
76+ }
77+
78+ /// Yields the ancestors of a block while prefetching parents, including the
79+ /// height-zero genesis block if it is available.
5380#[ pin_project]
5481pub struct AncestorStream < M : BlockProvider > {
5582 buffered : Vec < M :: Block > ,
5683 marshal : M ,
5784 #[ pin]
58- pending : OptionFuture < BoxFuture < ' static , Option < M :: Block > > > ,
85+ pending : OptionFuture < PendingFetch < M :: Block > > ,
5986}
6087
6188impl < M : BlockProvider > AncestorStream < M > {
6289 /// Creates a new [AncestorStream] starting from the given ancestry.
6390 ///
6491 /// # Panics
6592 ///
66- /// Panics if the initial blocks are not contiguous in height .
93+ /// Panics if the initial blocks are not contiguous.
6794 pub ( crate ) fn new ( marshal : M , initial : impl IntoIterator < Item = M :: Block > ) -> Self {
6895 let mut buffered = initial. into_iter ( ) . collect :: < Vec < M :: Block > > ( ) ;
6996 buffered. sort_by_key ( Heightable :: height) ;
@@ -93,13 +120,11 @@ impl<M: BlockProvider> AncestorStream<M> {
93120impl < M > Stream for AncestorStream < M >
94121where
95122 M : BlockProvider ,
96- M :: Block : Clone ,
97123{
98124 type Item = M :: Block ;
99125
100126 fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
101- // Because marshal cannot currently yield the genesis block, we stop at height 1.
102- const END_BOUND : Height = Height :: new ( 1 ) ;
127+ const END_BOUND : Height = Height :: zero ( ) ;
103128
104129 let mut this = self . project ( ) ;
105130
@@ -109,14 +134,20 @@ where
109134 let should_walk_parent = height > END_BOUND ;
110135 let end_of_buffered = this. buffered . is_empty ( ) ;
111136 if should_walk_parent && end_of_buffered {
112- let future = this. marshal . clone ( ) . subscribe_parent ( block. clone ( ) ) . boxed ( ) ;
137+ let expected = ExpectedParent :: from_child ( & block) ;
138+ let future = this
139+ . marshal
140+ . subscribe_parent ( & block)
141+ . map ( move |parent| parent. map ( |parent| ( expected, parent) ) )
142+ . boxed ( ) ;
113143 * this. pending . as_mut ( ) = Some ( future) . into ( ) ;
114144
115145 // Explicitly poll the next future to kick off the fetch. If it's already ready,
116146 // buffer it for the next poll.
117147 match this. pending . as_mut ( ) . poll ( cx) {
118- Poll :: Ready ( Some ( Some ( block) ) ) => {
119- this. buffered . push ( block) ;
148+ Poll :: Ready ( Some ( Some ( ( expected, parent) ) ) ) => {
149+ expected. assert_matches ( & parent) ;
150+ this. buffered . push ( parent) ;
120151 }
121152 Poll :: Ready ( Some ( None ) ) => {
122153 * this. pending . as_mut ( ) = None . into ( ) ;
@@ -137,18 +168,25 @@ where
137168 * this. pending . as_mut ( ) = None . into ( ) ;
138169 Poll :: Ready ( None )
139170 }
140- Poll :: Ready ( Some ( Some ( block) ) ) => {
171+ Poll :: Ready ( Some ( Some ( ( expected, block) ) ) ) => {
172+ expected. assert_matches ( & block) ;
141173 let height = block. height ( ) ;
142174 let should_walk_parent = height > END_BOUND ;
143175 if should_walk_parent {
144- let future = this. marshal . clone ( ) . subscribe_parent ( block. clone ( ) ) . boxed ( ) ;
176+ let expected = ExpectedParent :: from_child ( & block) ;
177+ let future = this
178+ . marshal
179+ . subscribe_parent ( & block)
180+ . map ( move |parent| parent. map ( |parent| ( expected, parent) ) )
181+ . boxed ( ) ;
145182 * this. pending . as_mut ( ) = Some ( future) . into ( ) ;
146183
147184 // Explicitly poll the next future to kick off the fetch. If it's already ready,
148185 // buffer it for the next poll.
149186 match this. pending . as_mut ( ) . poll ( cx) {
150- Poll :: Ready ( Some ( Some ( block) ) ) => {
151- this. buffered . push ( block) ;
187+ Poll :: Ready ( Some ( Some ( ( expected, parent) ) ) ) => {
188+ expected. assert_matches ( & parent) ;
189+ this. buffered . push ( parent) ;
152190 }
153191 Poll :: Ready ( Some ( None ) ) => {
154192 * this. pending . as_mut ( ) = None . into ( ) ;
@@ -179,9 +217,25 @@ mod test {
179217 impl BlockProvider for MockProvider {
180218 type Block = Block < Sha256Digest , ( ) > ;
181219
182- async fn subscribe_parent ( self , block : Self :: Block ) -> Option < Self :: Block > {
220+ fn subscribe_parent (
221+ & self ,
222+ block : & Self :: Block ,
223+ ) -> impl Future < Output = Option < Self :: Block > > + Send + ' static {
183224 let parent = block. parent ;
184- self . 0 . into_iter ( ) . find ( |b| b. digest ( ) == parent)
225+ std:: future:: ready ( self . 0 . iter ( ) . find ( |b| b. digest ( ) == parent) . cloned ( ) )
226+ }
227+ }
228+
229+ #[ derive( Clone ) ]
230+ struct WrongParentProvider ( Block < Sha256Digest , ( ) > ) ;
231+ impl BlockProvider for WrongParentProvider {
232+ type Block = Block < Sha256Digest , ( ) > ;
233+
234+ fn subscribe_parent (
235+ & self ,
236+ _block : & Self :: Block ,
237+ ) -> impl Future < Output = Option < Self :: Block > > + Send + ' static {
238+ std:: future:: ready ( Some ( self . 0 . clone ( ) ) )
185239 }
186240 }
187241
@@ -209,6 +263,45 @@ mod test {
209263 ) ;
210264 }
211265
266+ #[ test]
267+ #[ should_panic = "fetched parent must be contiguous in height" ]
268+ fn test_panics_on_non_contiguous_fetched_parent_height ( ) {
269+ let parent = Block :: new :: < Sha256 > ( ( ) , Sha256Digest :: EMPTY , Height :: zero ( ) , 0 ) ;
270+ let child = Block :: new :: < Sha256 > ( ( ) , parent. digest ( ) , Height :: new ( 3 ) , 3 ) ;
271+ let stream = AncestorStream :: new ( MockProvider ( vec ! [ parent] ) , [ child] ) ;
272+ futures:: pin_mut!( stream) ;
273+
274+ let waker = futures:: task:: noop_waker_ref ( ) ;
275+ let mut cx = std:: task:: Context :: from_waker ( waker) ;
276+ let _ = futures:: Stream :: poll_next ( stream. as_mut ( ) , & mut cx) ;
277+ }
278+
279+ #[ test]
280+ #[ should_panic = "fetched parent must be contiguous in ancestry" ]
281+ fn test_panics_on_non_contiguous_fetched_parent_digest ( ) {
282+ let expected_parent = Block :: new :: < Sha256 > ( ( ) , Sha256Digest :: EMPTY , Height :: zero ( ) , 0 ) ;
283+ let fetched_parent = Block :: new :: < Sha256 > ( ( ) , Sha256Digest :: EMPTY , Height :: zero ( ) , 1 ) ;
284+ let child = Block :: new :: < Sha256 > ( ( ) , expected_parent. digest ( ) , Height :: new ( 1 ) , 2 ) ;
285+ let stream = AncestorStream :: new ( WrongParentProvider ( fetched_parent) , [ child] ) ;
286+ futures:: pin_mut!( stream) ;
287+
288+ let waker = futures:: task:: noop_waker_ref ( ) ;
289+ let mut cx = std:: task:: Context :: from_waker ( waker) ;
290+ let _ = futures:: Stream :: poll_next ( stream. as_mut ( ) , & mut cx) ;
291+ }
292+
293+ #[ test_async]
294+ async fn test_yields_genesis_and_stops ( ) {
295+ let genesis = Block :: new :: < Sha256 > ( ( ) , Sha256Digest :: EMPTY , Height :: zero ( ) , 0 ) ;
296+ let child = Block :: new :: < Sha256 > ( ( ) , genesis. digest ( ) , Height :: new ( 1 ) , 1 ) ;
297+
298+ let provider = MockProvider ( vec ! [ genesis. clone( ) ] ) ;
299+ let stream = AncestorStream :: new ( provider, [ child. clone ( ) ] ) ;
300+
301+ let results = stream. collect :: < Vec < _ > > ( ) . await ;
302+ assert_eq ! ( results, vec![ child, genesis] ) ;
303+ }
304+
212305 #[ test_async]
213306 async fn test_empty_yields_none ( ) {
214307 let mut stream: AncestorStream < MockProvider > =
0 commit comments