11use std:: marker:: PhantomData ;
2- use std:: process;
32use std:: sync:: Arc ;
43
54use alloy:: consensus:: proofs:: { calculate_transaction_root, calculate_withdrawals_root} ;
@@ -20,7 +19,7 @@ use url::Url;
2019use tokio:: sync:: mpsc:: channel;
2120use tokio:: sync:: mpsc:: Receiver ;
2221use tokio:: sync:: mpsc:: Sender ;
23- use tokio:: sync:: watch;
22+ use tokio:: sync:: { watch, Mutex } ;
2423
2524use helios_consensus_core:: {
2625 apply_bootstrap, apply_finality_update, apply_update, calc_sync_period,
@@ -40,10 +39,18 @@ use crate::constants::MAX_REQUEST_LIGHT_CLIENT_UPDATES;
4039use crate :: database:: Database ;
4140use crate :: rpc:: ConsensusRpc ;
4241
42+ #[ derive( Debug , Clone ) ]
43+ pub enum ConsensusSyncStatus {
44+ Syncing ,
45+ Synced ,
46+ Error ( String ) ,
47+ }
48+
4349pub struct ConsensusClient < S : ConsensusSpec , R : ConsensusRpc < S > , DB : Database > {
4450 pub block_recv : Option < Receiver < Block < Transaction > > > ,
4551 pub finalized_block_recv : Option < watch:: Receiver < Option < Block < Transaction > > > > ,
4652 pub checkpoint_recv : watch:: Receiver < Option < B256 > > ,
53+ sync_status_recv : Mutex < watch:: Receiver < ConsensusSyncStatus > > ,
4754 shutdown_send : watch:: Sender < bool > ,
4855 genesis_time : u64 ,
4956 config : Arc < Config > ,
@@ -62,6 +69,7 @@ pub struct Inner<S: ConsensusSpec, R: ConsensusRpc<S>> {
6269 phantom : PhantomData < S > ,
6370}
6471
72+ #[ async_trait:: async_trait]
6573impl < S : ConsensusSpec , R : ConsensusRpc < S > , DB : Database > Consensus < Block >
6674 for ConsensusClient < S , R , DB >
6775{
@@ -85,13 +93,30 @@ impl<S: ConsensusSpec, R: ConsensusRpc<S>, DB: Database> Consensus<Block>
8593 self . shutdown_send . send ( true ) ?;
8694 Ok ( ( ) )
8795 }
96+
97+ async fn wait_synced ( & self ) -> Result < ( ) > {
98+ let mut sync_status_recv = self . sync_status_recv . lock ( ) . await ;
99+
100+ loop {
101+ let status = sync_status_recv. borrow ( ) . clone ( ) ;
102+ match status {
103+ ConsensusSyncStatus :: Synced => return Ok ( ( ) ) ,
104+ ConsensusSyncStatus :: Error ( err) => return Err ( eyre ! ( "sync failed: {}" , err) ) ,
105+ ConsensusSyncStatus :: Syncing => {
106+ // Wait for status to change
107+ sync_status_recv. changed ( ) . await ?;
108+ }
109+ }
110+ }
111+ }
88112}
89113
90114impl < S : ConsensusSpec , R : ConsensusRpc < S > , DB : Database > ConsensusClient < S , R , DB > {
91115 pub fn new ( rpc : & Url , config : Arc < Config > ) -> Result < ConsensusClient < S , R , DB > > {
92116 let ( block_send, block_recv) = channel ( 256 ) ;
93117 let ( finalized_block_send, finalized_block_recv) = watch:: channel ( None ) ;
94118 let ( checkpoint_send, checkpoint_recv) = watch:: channel ( None ) ;
119+ let ( sync_status_send, sync_status_recv) = watch:: channel ( ConsensusSyncStatus :: Syncing ) ;
95120 let ( shutdown_send, shutdown_recv) = watch:: channel ( false ) ;
96121
97122 let config_clone = config. clone ( ) ;
@@ -125,20 +150,24 @@ impl<S: ConsensusSpec, R: ConsensusRpc<S>, DB: Database> ConsensusClient<S, R, D
125150 let res = sync_all_fallbacks ( & mut inner, config. chain . chain_id ) . await ;
126151 if let Err ( err) = res {
127152 error ! ( target: "helios::consensus" , err = %err, "sync failed" ) ;
128- process:: exit ( 1 ) ;
153+ _ = sync_status_send. send ( ConsensusSyncStatus :: Error ( err. to_string ( ) ) ) ;
154+ return ;
129155 }
130156 } else if let Some ( fallback) = & config. fallback {
131157 let res = sync_fallback ( & mut inner, fallback) . await ;
132158 if let Err ( err) = res {
133159 error ! ( target: "helios::consensus" , err = %err, "sync failed" ) ;
134- process:: exit ( 1 ) ;
160+ _ = sync_status_send. send ( ConsensusSyncStatus :: Error ( err. to_string ( ) ) ) ;
161+ return ;
135162 }
136163 } else {
137164 error ! ( target: "helios::consensus" , err = %err, "sync failed" ) ;
138- process:: exit ( 1 ) ;
165+ _ = sync_status_send. send ( ConsensusSyncStatus :: Error ( err. to_string ( ) ) ) ;
166+ return ;
139167 }
140168 }
141169
170+ _ = sync_status_send. send ( ConsensusSyncStatus :: Synced ) ;
142171 _ = inner. send_blocks ( ) . await ;
143172
144173 let start = Instant :: now ( ) + inner. duration_until_next_update ( ) . to_std ( ) . unwrap ( ) ;
@@ -180,6 +209,7 @@ impl<S: ConsensusSpec, R: ConsensusRpc<S>, DB: Database> ConsensusClient<S, R, D
180209 block_recv : Some ( block_recv) ,
181210 finalized_block_recv : Some ( finalized_block_recv) ,
182211 checkpoint_recv,
212+ sync_status_recv : Mutex :: new ( sync_status_recv) ,
183213 shutdown_send,
184214 genesis_time,
185215 config : config_clone,
0 commit comments