@@ -28,7 +28,9 @@ use shared::crawler_state::ChainCrawlerState;
2828use shared:: error:: { AsDbError , AsRpcError , ContextDbInteractError , MainError } ;
2929use shared:: id:: Id ;
3030use shared:: token:: Token ;
31+ use shared:: utils:: BalanceChange ;
3132use shared:: validator:: ValidatorSet ;
33+ use tendermint_rpc:: endpoint:: block:: Response as TendermintBlockResponse ;
3234use tendermint_rpc:: HttpClient ;
3335use tokio_retry:: strategy:: { jitter, ExponentialBackoff } ;
3436use tokio_retry:: Retry ;
@@ -140,6 +142,7 @@ async fn main() -> Result<(), MainError> {
140142 initial_query (
141143 & client,
142144 & conn,
145+ checksums. clone ( ) ,
143146 config. initial_query_retry_time ,
144147 config. initial_query_retry_attempts ,
145148 )
@@ -186,46 +189,15 @@ async fn crawling_fn(
186189 return Err ( MainError :: NoAction ) ;
187190 }
188191
189- tracing:: debug!( block = block_height, "Query block..." ) ;
190- let tm_block_response =
191- tendermint_service:: query_raw_block_at_height ( & client, block_height)
192- . await
193- . into_rpc_error ( ) ?;
194- tracing:: debug!(
195- block = block_height,
196- "Raw block contains {} txs..." ,
197- tm_block_response. block. data. len( )
198- ) ;
199-
200- tracing:: debug!( block = block_height, "Query block results..." ) ;
201- let tm_block_results_response =
202- tendermint_service:: query_raw_block_results_at_height (
203- & client,
204- block_height,
205- )
206- . await
207- . into_rpc_error ( ) ?;
208- let block_results = BlockResult :: from ( tm_block_results_response) ;
209-
210- tracing:: debug!( block = block_height, "Query epoch..." ) ;
211- let epoch =
212- namada_service:: get_epoch_at_block_height ( & client, block_height)
213- . await
214- . into_rpc_error ( ) ?;
215-
216192 tracing:: debug!( block = block_height, "Query first block in epoch..." ) ;
217193 let first_block_in_epoch =
218194 namada_service:: get_first_block_in_epoch ( & client)
219195 . await
220196 . into_rpc_error ( ) ?;
221197
222- let block = Block :: from (
223- tm_block_response,
224- & block_results,
225- checksums,
226- epoch,
227- block_height,
228- ) ;
198+ let ( block, tm_block_response, epoch) =
199+ get_block ( block_height, & client, checksums) . await ?;
200+
229201 tracing:: debug!(
230202 block = block_height,
231203 txs = block. transactions. len( ) ,
@@ -243,8 +215,33 @@ async fn crawling_fn(
243215 . map ( Token :: Ibc )
244216 . collect :: < Vec < Token > > ( ) ;
245217
218+ let native_addresses =
219+ namada_service:: query_native_addresses_balance_change ( Token :: Native (
220+ native_token. clone ( ) ,
221+ ) ) ;
246222 let addresses = block. addresses_with_balance_change ( & native_token) ;
247223
224+ let validators_addresses = if first_block_in_epoch. eq ( & block_height) {
225+ namada_service:: get_all_consensus_validators_addresses_at (
226+ & client,
227+ epoch - 1 ,
228+ native_token. clone ( ) ,
229+ )
230+ . await
231+ . into_rpc_error ( ) ?
232+ } else {
233+ HashSet :: default ( )
234+ } ;
235+
236+ let block_proposer_address = block
237+ . header
238+ . proposer_address_namada
239+ . as_ref ( )
240+ . map ( |address| BalanceChange {
241+ address : Id :: Account ( address. clone ( ) ) ,
242+ token : Token :: Native ( native_token. clone ( ) ) ,
243+ } ) ;
244+
248245 let pgf_receipient_addresses = if first_block_in_epoch. eq ( & block_height) {
249246 conn. interact ( move |conn| {
250247 namada_pgf_repository:: get_pgf_receipients_balance_changes (
@@ -260,8 +257,12 @@ async fn crawling_fn(
260257 HashSet :: default ( )
261258 } ;
262259
263- let all_balance_changed_addresses = pgf_receipient_addresses
264- . union ( & addresses)
260+ let all_balance_changed_addresses = addresses
261+ . iter ( )
262+ . chain ( block_proposer_address. iter ( ) )
263+ . chain ( pgf_receipient_addresses. iter ( ) )
264+ . chain ( validators_addresses. iter ( ) )
265+ . chain ( native_addresses. iter ( ) )
265266 . cloned ( )
266267 . collect :: < HashSet < _ > > ( ) ;
267268
@@ -272,7 +273,13 @@ async fn crawling_fn(
272273 )
273274 . await
274275 . into_rpc_error ( ) ?;
275- tracing:: info!( "Updating balance for {} addresses..." , addresses. len( ) ) ;
276+
277+ tracing:: debug!(
278+ block = block_height,
279+ addresses = all_balance_changed_addresses. len( ) ,
280+ "Updating balance for {} addresses..." ,
281+ all_balance_changed_addresses. len( )
282+ ) ;
276283
277284 let next_governance_proposal_id =
278285 namada_service:: query_next_governance_id ( & client, block_height)
@@ -298,12 +305,18 @@ async fn crawling_fn(
298305 proposals_votes. len( )
299306 ) ;
300307
301- let validators = block. validators ( ) ;
308+ let validators = block. new_validators ( ) ;
302309 let validator_set = ValidatorSet {
303310 validators : validators. clone ( ) ,
304311 epoch,
305312 } ;
306313
314+ let validators_state_change = block. update_validators_state ( ) ;
315+ tracing:: debug!(
316+ "Updating {} validators state" ,
317+ validators_state_change. len( )
318+ ) ;
319+
307320 let addresses = block. bond_addresses ( ) ;
308321 let bonds = query_bonds ( & client, addresses) . await . into_rpc_error ( ) ?;
309322 tracing:: debug!(
@@ -368,6 +381,7 @@ async fn crawling_fn(
368381 withdraws = withdraw_addreses. len( ) ,
369382 claimed_rewards = reward_claimers. len( ) ,
370383 revealed_pks = revealed_pks. len( ) ,
384+ validator_state = validators_state_change. len( ) ,
371385 epoch = epoch,
372386 first_block_in_epoch = first_block_in_epoch,
373387 block = block_height,
@@ -383,6 +397,12 @@ async fn crawling_fn(
383397 ibc_tokens,
384398 ) ?;
385399
400+ repository:: block:: upsert_block (
401+ transaction_conn,
402+ block,
403+ tm_block_response,
404+ ) ?;
405+
386406 repository:: balance:: insert_balances (
387407 transaction_conn,
388408 balances,
@@ -402,6 +422,11 @@ async fn crawling_fn(
402422 validator_set,
403423 ) ?;
404424
425+ repository:: pos:: upsert_validator_state (
426+ transaction_conn,
427+ validators_state_change,
428+ ) ?;
429+
405430 // We first remove all the bonds and then insert the new ones
406431 repository:: pos:: clear_bonds (
407432 transaction_conn,
@@ -453,29 +478,35 @@ async fn crawling_fn(
453478async fn initial_query (
454479 client : & HttpClient ,
455480 conn : & Object ,
481+ checksums : Checksums ,
456482 retry_time : u64 ,
457483 retry_attempts : usize ,
458484) -> Result < ( ) , MainError > {
459485 let retry_strategy = ExponentialBackoff :: from_millis ( retry_time)
460486 . map ( jitter)
461487 . take ( retry_attempts) ;
462- Retry :: spawn ( retry_strategy, || try_initial_query ( client, conn) ) . await
488+ Retry :: spawn ( retry_strategy, || {
489+ try_initial_query ( client, conn, checksums. clone ( ) )
490+ } )
491+ . await
463492}
464493
465494async fn try_initial_query (
466495 client : & HttpClient ,
467496 conn : & Object ,
497+ checksums : Checksums ,
468498) -> Result < ( ) , MainError > {
469499 tracing:: debug!( "Querying initial data..." ) ;
470500 let block_height =
471501 query_last_block_height ( client) . await . into_rpc_error ( ) ?;
472- let epoch = namada_service:: get_epoch_at_block_height ( client, block_height)
473- . await
474- . into_rpc_error ( ) ?;
502+
475503 let first_block_in_epoch = namada_service:: get_first_block_in_epoch ( client)
476504 . await
477505 . into_rpc_error ( ) ?;
478506
507+ let ( block, tm_block_response, epoch) =
508+ get_block ( block_height, client, checksums. clone ( ) ) . await ?;
509+
479510 let tokens = query_tokens ( client) . await . into_rpc_error ( ) ?;
480511
481512 // This can sometimes fail if the last block height in the node has moved
@@ -535,6 +566,12 @@ async fn try_initial_query(
535566 . run ( |transaction_conn| {
536567 repository:: balance:: insert_tokens ( transaction_conn, tokens) ?;
537568
569+ repository:: block:: upsert_block (
570+ transaction_conn,
571+ block,
572+ tm_block_response,
573+ ) ?;
574+
538575 tracing:: debug!(
539576 block = block_height,
540577 "Inserting {} balances..." ,
@@ -611,3 +648,60 @@ async fn update_crawler_timestamp(
611648 . and_then ( identity)
612649 . into_db_error ( )
613650}
651+
652+ async fn get_block (
653+ block_height : u32 ,
654+ client : & HttpClient ,
655+ checksums : Checksums ,
656+ ) -> Result < ( Block , TendermintBlockResponse , u32 ) , MainError > {
657+ tracing:: debug!( block = block_height, "Query block..." ) ;
658+ let tm_block_response =
659+ tendermint_service:: query_raw_block_at_height ( client, block_height)
660+ . await
661+ . into_rpc_error ( ) ?;
662+ tracing:: debug!(
663+ block = block_height,
664+ "Raw block contains {} txs..." ,
665+ tm_block_response. block. data. len( )
666+ ) ;
667+
668+ tracing:: debug!( block = block_height, "Query block results..." ) ;
669+ let tm_block_results_response =
670+ tendermint_service:: query_raw_block_results_at_height (
671+ client,
672+ block_height,
673+ )
674+ . await
675+ . into_rpc_error ( ) ?;
676+ let block_results = BlockResult :: from ( tm_block_results_response) ;
677+
678+ tracing:: debug!( block = block_height, "Query epoch..." ) ;
679+ let epoch = namada_service:: get_epoch_at_block_height ( client, block_height)
680+ . await
681+ . into_rpc_error ( ) ?;
682+
683+ let proposer_address_namada = namada_service:: get_validator_namada_address (
684+ client,
685+ & Id :: from ( & tm_block_response. block . header . proposer_address ) ,
686+ )
687+ . await
688+ . into_rpc_error ( ) ?;
689+
690+ tracing:: info!(
691+ block = block_height,
692+ tm_address = tm_block_response. block. header. proposer_address. to_string( ) ,
693+ namada_address = ?proposer_address_namada,
694+ "Got block proposer address"
695+ ) ;
696+
697+ let block = Block :: from (
698+ & tm_block_response,
699+ & block_results,
700+ & proposer_address_namada,
701+ checksums,
702+ epoch,
703+ block_height,
704+ ) ;
705+
706+ Ok ( ( block, tm_block_response, epoch) )
707+ }
0 commit comments