@@ -6,7 +6,7 @@ use crate::{
66 } ,
77 simplex:: types:: { Activity , Finalization , Notarization } ,
88 types:: { Height , Round } ,
9- Heightable , Reporter ,
9+ Reporter ,
1010} ;
1111use commonware_actor:: {
1212 mailbox:: { Overflow , Policy , Sender } ,
@@ -15,7 +15,7 @@ use commonware_actor::{
1515use commonware_cryptography:: { certificate:: Scheme , Digestible } ;
1616use commonware_p2p:: Recipients ;
1717use commonware_utils:: { channel:: oneshot, vec:: NonEmptyVec } ;
18- use futures:: { Stream , StreamExt } ;
18+ use futures:: Stream ;
1919use std:: collections:: { btree_map:: Entry , BTreeMap , VecDeque } ;
2020
2121/// Messages sent to the marshal [Actor](super::Actor).
@@ -514,12 +514,6 @@ pub struct Mailbox<S: Scheme, V: Variant> {
514514 sender : Sender < Message < S , V > > ,
515515}
516516
517- /// Provider used by [`Mailbox::ancestor_stream`] to fetch missing certified parents.
518- #[ derive( Clone ) ]
519- pub ( crate ) struct AncestryProvider < S : Scheme , V : Variant > {
520- mailbox : Mailbox < S , V > ,
521- }
522-
523517impl < S : Scheme , V : Variant > Mailbox < S , V > {
524518 /// Creates a new mailbox.
525519 pub ( crate ) const fn new ( sender : Sender < Message < S , V > > ) -> Self {
@@ -540,15 +534,10 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
540534 initial : I ,
541535 ) -> impl Stream < Item = V :: ApplicationBlock > + Send + use < S , V , I >
542536 where
537+ Self : BlockProvider < Block = V :: ApplicationBlock > ,
543538 I : IntoIterator < Item = V :: Block > ,
544539 {
545- AncestorStream :: new (
546- AncestryProvider {
547- mailbox : self . clone ( ) ,
548- } ,
549- initial,
550- )
551- . map ( V :: into_inner)
540+ AncestorStream :: new ( self . clone ( ) , initial. into_iter ( ) . map ( V :: into_inner) )
552541 }
553542
554543 /// A request to retrieve the information about the highest finalized block.
@@ -690,7 +679,10 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
690679 pub async fn ancestry (
691680 & self ,
692681 ( fallback, start_digest) : ( DigestFallback , <V :: Block as Digestible >:: Digest ) ,
693- ) -> Option < impl Stream < Item = V :: ApplicationBlock > + Send + use < S , V > > {
682+ ) -> Option < impl Stream < Item = V :: ApplicationBlock > + Send + use < S , V > >
683+ where
684+ Self : BlockProvider < Block = V :: ApplicationBlock > ,
685+ {
694686 let receiver = self . subscribe_by_digest ( start_digest, fallback) ;
695687 receiver
696688 . await
@@ -786,31 +778,6 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
786778 }
787779}
788780
789- impl < S : Scheme , V : Variant > BlockProvider for AncestryProvider < S , V > {
790- type Block = V :: Block ;
791-
792- async fn subscribe ( self , digest : <V :: Block as Digestible >:: Digest ) -> Option < Self :: Block > {
793- let subscription = self
794- . mailbox
795- . subscribe_by_digest ( digest, DigestFallback :: Wait ) ;
796- subscription. await . ok ( )
797- }
798-
799- async fn subscribe_parent ( self , block : Self :: Block ) -> Option < Self :: Block > {
800- // Ancestry walking does not carry the certified parent round. By this
801- // point the stream is walking accepted ancestry, so this height should
802- // be correct; it remains a local pruning bound rather than a peer
803- // response validity condition.
804- let parent_height = block. height ( ) . previous ( ) ?;
805- let commitment = V :: parent_commitment ( & block) ;
806- let fallback = CommitmentFallback :: FetchByCommitment {
807- height : parent_height,
808- } ;
809- let subscription = self . mailbox . subscribe_by_commitment ( commitment, fallback) ;
810- subscription. await . ok ( )
811- }
812- }
813-
814781impl < S : Scheme , V : Variant > Reporter for Mailbox < S , V > {
815782 type Activity = Activity < S , V :: Commitment > ;
816783
0 commit comments