@@ -2,14 +2,15 @@ use anchor_lang::AccountDeserialize;
22use itertools:: Itertools ;
33use jito_priority_fee_distribution_sdk:: PriorityFeeDistributionAccount ;
44use jito_tip_distribution_sdk:: {
5- derive_claim_status_account_address, TipDistributionAccount , CLAIM_STATUS_SIZE , CONFIG_SEED ,
5+ derive_claim_status_account_address, ClaimStatus , TipDistributionAccount , CLAIM_STATUS_SIZE ,
6+ CONFIG_SEED ,
67} ;
78use jito_tip_router_client:: instructions:: ClaimWithPayerBuilder ;
89use jito_tip_router_core:: { account_payer:: AccountPayer , config:: Config } ;
910use legacy_meta_merkle_tree:: generated_merkle_tree:: GeneratedMerkleTreeCollection as LegacyGeneratedMerkleTreeCollection ;
1011use legacy_tip_router_operator_cli:: claim:: ClaimMevError as LegacyClaimMevError ;
1112use log:: { info, warn} ;
12- use meta_merkle_tree:: generated_merkle_tree:: GeneratedMerkleTreeCollection ;
13+ use meta_merkle_tree:: generated_merkle_tree:: { GeneratedMerkleTreeCollection , TreeNode } ;
1314use rand:: { prelude:: SliceRandom , thread_rng} ;
1415use solana_client:: { nonblocking:: rpc_client:: RpcClient , rpc_config:: RpcSimulateTransactionConfig } ;
1516use solana_metrics:: { datapoint_error, datapoint_info} ;
@@ -105,7 +106,7 @@ pub async fn emit_claim_mev_tips_metrics(
105106 return Ok ( ( ) ) ;
106107 }
107108
108- let all_claim_transactions = get_claim_transactions_for_valid_unclaimed (
109+ let ( claims_to_process , validators_processed ) = get_claim_transactions_for_valid_unclaimed (
109110 & rpc_client,
110111 & merkle_trees,
111112 tip_distribution_program_id,
@@ -119,14 +120,16 @@ pub async fn emit_claim_mev_tips_metrics(
119120 )
120121 . await ?;
121122
122- datapoint_info ! (
123- "tip_router_cli.claim_mev_tips-send_summary" ,
124- ( "claim_transactions_left" , all_claim_transactions. len( ) , i64 ) ,
125- ( "epoch" , epoch, i64 ) ,
126- "cluster" => & cli. cluster,
127- ) ;
123+ if validators_processed {
124+ datapoint_info ! (
125+ "tip_router_cli.claim_mev_tips-send_summary" ,
126+ ( "claim_transactions_left" , claims_to_process. len( ) , i64 ) ,
127+ ( "epoch" , epoch, i64 ) ,
128+ "cluster" => & cli. cluster,
129+ ) ;
130+ }
128131
129- if all_claim_transactions . is_empty ( ) {
132+ if validators_processed && claims_to_process . is_empty ( ) {
130133 add_completed_epoch ( epoch, current_epoch, file_path, file_mutex) . await ?;
131134 }
132135
@@ -424,36 +427,39 @@ pub async fn claim_mev_tips(
424427
425428 let start = Instant :: now ( ) ;
426429 while start. elapsed ( ) <= max_loop_duration {
427- let mut all_claim_transactions = get_claim_transactions_for_valid_unclaimed (
428- & rpc_client,
429- merkle_trees,
430- tip_distribution_program_id,
431- priority_fee_distribution_program_id,
432- tip_router_program_id,
433- ncn,
434- micro_lamports,
435- keypair. pubkey ( ) ,
436- operator_address,
437- cluster,
438- )
439- . await ?;
430+ let ( mut claims_to_process, validators_processed) =
431+ get_claim_transactions_for_valid_unclaimed (
432+ & rpc_client,
433+ merkle_trees,
434+ tip_distribution_program_id,
435+ priority_fee_distribution_program_id,
436+ tip_router_program_id,
437+ ncn,
438+ micro_lamports,
439+ keypair. pubkey ( ) ,
440+ operator_address,
441+ cluster,
442+ )
443+ . await ?;
440444
441- datapoint_info ! (
442- "tip_router_cli.claim_mev_tips-send_summary" ,
443- ( "claim_transactions_left" , all_claim_transactions. len( ) , i64 ) ,
444- ( "epoch" , epoch, i64 ) ,
445- ( "operator" , operator_address, String ) ,
446- "cluster" => cluster,
447- ) ;
445+ if validators_processed {
446+ datapoint_info ! (
447+ "tip_router_cli.claim_mev_tips-send_summary" ,
448+ ( "claim_transactions_left" , claims_to_process. len( ) , i64 ) ,
449+ ( "epoch" , epoch, i64 ) ,
450+ ( "operator" , operator_address, String ) ,
451+ "cluster" => cluster,
452+ ) ;
453+ }
448454
449- if all_claim_transactions . is_empty ( ) {
455+ if validators_processed && claims_to_process . is_empty ( ) {
450456 add_completed_epoch ( epoch, current_epoch, file_path, file_mutex) . await ?;
451457 return Ok ( ( ) ) ;
452458 }
453459
454- all_claim_transactions . shuffle ( & mut thread_rng ( ) ) ;
460+ claims_to_process . shuffle ( & mut thread_rng ( ) ) ;
455461
456- for transactions in all_claim_transactions . chunks ( 2_000 ) {
462+ for transactions in claims_to_process . chunks ( 2_000 ) {
457463 let transactions: Vec < _ > = transactions. to_vec ( ) ;
458464 // only check balance for the ones we need to currently send since reclaim rent running in parallel
459465 if let Some ( ( start_balance, desired_balance, sol_to_deposit) ) =
@@ -483,7 +489,7 @@ pub async fn claim_mev_tips(
483489 }
484490 }
485491
486- let transactions = get_claim_transactions_for_valid_unclaimed (
492+ let ( transactions, validators_processed ) = get_claim_transactions_for_valid_unclaimed (
487493 & rpc_client,
488494 merkle_trees,
489495 tip_distribution_program_id,
@@ -496,7 +502,8 @@ pub async fn claim_mev_tips(
496502 cluster,
497503 )
498504 . await ?;
499- if transactions. is_empty ( ) {
505+
506+ if validators_processed && transactions. is_empty ( ) {
500507 add_completed_epoch ( epoch, current_epoch, file_path, file_mutex) . await ?;
501508 return Ok ( ( ) ) ;
502509 }
@@ -561,11 +568,11 @@ pub async fn get_claim_transactions_for_valid_unclaimed(
561568 payer_pubkey : Pubkey ,
562569 operator_address : & String ,
563570 cluster : & str ,
564- ) -> Result < Vec < Transaction > , ClaimMevError > {
571+ ) -> Result < ( Vec < Transaction > , bool ) , ClaimMevError > {
565572 let epoch = merkle_trees. epoch ;
566573 let tip_router_config_address = Config :: find_program_address ( & tip_router_program_id, & ncn) . 0 ;
567574
568- let tree_nodes = merkle_trees
575+ let all_tree_nodes = merkle_trees
569576 . generated_merkle_trees
570577 . iter ( )
571578 . filter_map ( |tree| {
@@ -578,9 +585,32 @@ pub async fn get_claim_transactions_for_valid_unclaimed(
578585 . flatten ( )
579586 . collect_vec ( ) ;
580587
588+ let validator_tree_nodes = merkle_trees
589+ . generated_merkle_trees
590+ . iter ( )
591+ . filter_map ( |tree| {
592+ if tree. merkle_root_upload_authority != tip_router_config_address {
593+ return None ;
594+ }
595+
596+ Some ( vec ! [ & tree. tree_nodes[ 1 ] ] )
597+ } )
598+ . flatten ( )
599+ . collect_vec ( ) ;
600+
601+ let remaining_validator_claims =
602+ get_unprocessed_claims_for_validators ( rpc_client, & validator_tree_nodes) . await ?;
603+
604+ let validators_processed = remaining_validator_claims. is_empty ( ) ;
605+ let tree_nodes = if validators_processed {
606+ all_tree_nodes. to_owned ( )
607+ } else {
608+ validator_tree_nodes. to_owned ( )
609+ } ;
610+
581611 info ! (
582612 "reading {} tip distribution related accounts for epoch {}" ,
583- tree_nodes . len( ) ,
613+ all_tree_nodes . len( ) ,
584614 epoch
585615 ) ;
586616
@@ -602,6 +632,7 @@ pub async fn get_claim_transactions_for_valid_unclaimed(
602632 . iter ( )
603633 . map ( |tree_node| tree_node. claimant )
604634 . collect_vec ( ) ;
635+
605636 let claimants: HashMap < Pubkey , Account > = get_batched_accounts ( rpc_client, & claimant_pubkeys)
606637 . await ?
607638 . into_iter ( )
@@ -622,20 +653,22 @@ pub async fn get_claim_transactions_for_valid_unclaimed(
622653
623654 let elapsed_us = start. elapsed ( ) . as_micros ( ) ;
624655
625- // can be helpful for determining mismatch in state between requested and read
626- datapoint_info ! (
627- "tip_router_cli.get_claim_transactions_account_data" ,
628- ( "elapsed_us" , elapsed_us, i64 ) ,
629- ( "tdas" , tda_pubkeys. len( ) , i64 ) ,
630- ( "tdas_onchain" , tdas. len( ) , i64 ) ,
631- ( "claimants" , claimant_pubkeys. len( ) , i64 ) ,
632- ( "claimants_onchain" , claimants. len( ) , i64 ) ,
633- ( "claim_statuses" , claim_status_pubkeys. len( ) , i64 ) ,
634- ( "claim_statuses_onchain" , claim_statuses. len( ) , i64 ) ,
635- ( "epoch" , epoch, i64 ) ,
636- ( "operator" , operator_address, String ) ,
637- "cluster" => cluster,
638- ) ;
656+ if validators_processed {
657+ // can be helpful for determining mismatch in state between requested and read
658+ datapoint_info ! (
659+ "tip_router_cli.get_claim_transactions_account_data" ,
660+ ( "elapsed_us" , elapsed_us, i64 ) ,
661+ ( "tdas" , tda_pubkeys. len( ) , i64 ) ,
662+ ( "tdas_onchain" , tdas. len( ) , i64 ) ,
663+ ( "epoch" , epoch, i64 ) ,
664+ ( "claimants" , claimant_pubkeys. len( ) , i64 ) ,
665+ ( "claim_statuses" , claim_status_pubkeys. len( ) , i64 ) ,
666+ ( "claimants_onchain" , claimants. len( ) , i64 ) ,
667+ ( "claim_statuses_onchain" , claim_statuses. len( ) , i64 ) ,
668+ ( "operator" , operator_address, String ) ,
669+ "cluster" => cluster,
670+ ) ;
671+ }
639672
640673 let transactions = build_mev_claim_transactions (
641674 tip_distribution_program_id,
@@ -651,7 +684,38 @@ pub async fn get_claim_transactions_for_valid_unclaimed(
651684 cluster,
652685 ) ;
653686
654- Ok ( transactions)
687+ Ok ( ( transactions, validators_processed) )
688+ }
689+
690+ pub async fn get_unprocessed_claims_for_validators (
691+ rpc_client : & RpcClient ,
692+ tree_nodes : & [ & TreeNode ] ,
693+ ) -> Result < Vec < Account > , ClaimMevError > {
694+ let claim_status_pubkeys = tree_nodes
695+ . iter ( )
696+ . map ( |tree_node| tree_node. claim_status_pubkey )
697+ . collect_vec ( ) ;
698+
699+ let claim_statuses: HashMap < Pubkey , Account > =
700+ get_batched_accounts ( rpc_client, & claim_status_pubkeys)
701+ . await ?
702+ . into_iter ( )
703+ . filter_map ( |( pubkey, a) | Some ( ( pubkey, a?) ) )
704+ . collect ( ) ;
705+
706+ let deserialized_claim_statuses = claim_statuses. values ( ) . map ( |a| {
707+ (
708+ ClaimStatus :: try_deserialize ( & mut a. data . as_slice ( ) ) . unwrap ( ) ,
709+ a,
710+ )
711+ } ) ;
712+
713+ let unprocessed_claim_statuses = deserialized_claim_statuses
714+ . filter ( |( c, _) | !c. is_claimed )
715+ . map ( |( _, a) | a. clone ( ) )
716+ . collect ( ) ;
717+
718+ Ok ( unprocessed_claim_statuses)
655719}
656720
657721/// Returns a list of claim transactions for valid, unclaimed MEV tips
0 commit comments