Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 117 additions & 53 deletions tip-router-operator-cli/src/claim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use anchor_lang::AccountDeserialize;
use itertools::Itertools;
use jito_priority_fee_distribution_sdk::PriorityFeeDistributionAccount;
use jito_tip_distribution_sdk::{
derive_claim_status_account_address, TipDistributionAccount, CLAIM_STATUS_SIZE, CONFIG_SEED,
derive_claim_status_account_address, ClaimStatus, TipDistributionAccount, CLAIM_STATUS_SIZE,
CONFIG_SEED,
};
use jito_tip_router_client::instructions::ClaimWithPayerBuilder;
use jito_tip_router_core::{account_payer::AccountPayer, config::Config};
use legacy_meta_merkle_tree::generated_merkle_tree::GeneratedMerkleTreeCollection as LegacyGeneratedMerkleTreeCollection;
use legacy_tip_router_operator_cli::claim::ClaimMevError as LegacyClaimMevError;
use log::{info, warn};
use meta_merkle_tree::generated_merkle_tree::GeneratedMerkleTreeCollection;
use meta_merkle_tree::generated_merkle_tree::{GeneratedMerkleTreeCollection, TreeNode};
use rand::{prelude::SliceRandom, thread_rng};
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcSimulateTransactionConfig};
use solana_metrics::{datapoint_error, datapoint_info};
Expand Down Expand Up @@ -105,7 +106,7 @@ pub async fn emit_claim_mev_tips_metrics(
return Ok(());
}

let all_claim_transactions = get_claim_transactions_for_valid_unclaimed(
let (claims_to_process, validators_processed) = get_claim_transactions_for_valid_unclaimed(
&rpc_client,
&merkle_trees,
tip_distribution_program_id,
Expand All @@ -119,14 +120,16 @@ pub async fn emit_claim_mev_tips_metrics(
)
.await?;

datapoint_info!(
"tip_router_cli.claim_mev_tips-send_summary",
("claim_transactions_left", all_claim_transactions.len(), i64),
("epoch", epoch, i64),
"cluster" => &cli.cluster,
);
if validators_processed {
datapoint_info!(
"tip_router_cli.claim_mev_tips-send_summary",
("claim_transactions_left", claims_to_process.len(), i64),
("epoch", epoch, i64),
"cluster" => &cli.cluster,
);
}

if all_claim_transactions.is_empty() {
if validators_processed && claims_to_process.is_empty() {
add_completed_epoch(epoch, current_epoch, file_path, file_mutex).await?;
}

Expand Down Expand Up @@ -424,36 +427,39 @@ pub async fn claim_mev_tips(

let start = Instant::now();
while start.elapsed() <= max_loop_duration {
let mut all_claim_transactions = get_claim_transactions_for_valid_unclaimed(
&rpc_client,
merkle_trees,
tip_distribution_program_id,
priority_fee_distribution_program_id,
tip_router_program_id,
ncn,
micro_lamports,
keypair.pubkey(),
operator_address,
cluster,
)
.await?;
let (mut claims_to_process, validators_processed) =
get_claim_transactions_for_valid_unclaimed(
&rpc_client,
merkle_trees,
tip_distribution_program_id,
priority_fee_distribution_program_id,
tip_router_program_id,
ncn,
micro_lamports,
keypair.pubkey(),
operator_address,
cluster,
)
.await?;

datapoint_info!(
"tip_router_cli.claim_mev_tips-send_summary",
("claim_transactions_left", all_claim_transactions.len(), i64),
("epoch", epoch, i64),
("operator", operator_address, String),
"cluster" => cluster,
);
if validators_processed {
datapoint_info!(
"tip_router_cli.claim_mev_tips-send_summary",
("claim_transactions_left", claims_to_process.len(), i64),
("epoch", epoch, i64),
("operator", operator_address, String),
"cluster" => cluster,
);
}

if all_claim_transactions.is_empty() {
if validators_processed && claims_to_process.is_empty() {
add_completed_epoch(epoch, current_epoch, file_path, file_mutex).await?;
return Ok(());
}

all_claim_transactions.shuffle(&mut thread_rng());
claims_to_process.shuffle(&mut thread_rng());

for transactions in all_claim_transactions.chunks(2_000) {
for transactions in claims_to_process.chunks(2_000) {
let transactions: Vec<_> = transactions.to_vec();
// only check balance for the ones we need to currently send since reclaim rent running in parallel
if let Some((start_balance, desired_balance, sol_to_deposit)) =
Expand Down Expand Up @@ -483,7 +489,7 @@ pub async fn claim_mev_tips(
}
}

let transactions = get_claim_transactions_for_valid_unclaimed(
let (transactions, validators_processed) = get_claim_transactions_for_valid_unclaimed(
&rpc_client,
merkle_trees,
tip_distribution_program_id,
Expand All @@ -496,7 +502,8 @@ pub async fn claim_mev_tips(
cluster,
)
.await?;
if transactions.is_empty() {

if validators_processed && transactions.is_empty() {
add_completed_epoch(epoch, current_epoch, file_path, file_mutex).await?;
return Ok(());
}
Expand Down Expand Up @@ -561,11 +568,11 @@ pub async fn get_claim_transactions_for_valid_unclaimed(
payer_pubkey: Pubkey,
operator_address: &String,
cluster: &str,
) -> Result<Vec<Transaction>, ClaimMevError> {
) -> Result<(Vec<Transaction>, bool), ClaimMevError> {
let epoch = merkle_trees.epoch;
let tip_router_config_address = Config::find_program_address(&tip_router_program_id, &ncn).0;

let tree_nodes = merkle_trees
let all_tree_nodes = merkle_trees
.generated_merkle_trees
.iter()
.filter_map(|tree| {
Expand All @@ -578,9 +585,32 @@ pub async fn get_claim_transactions_for_valid_unclaimed(
.flatten()
.collect_vec();

let validator_tree_nodes = merkle_trees
.generated_merkle_trees
.iter()
.filter_map(|tree| {
if tree.merkle_root_upload_authority != tip_router_config_address {
return None;
}

Some(vec![&tree.tree_nodes[1]])
})
.flatten()
.collect_vec();

let remaining_validator_claims =
get_unprocessed_claims_for_validators(rpc_client, &validator_tree_nodes).await?;

let validators_processed = remaining_validator_claims.is_empty();
let tree_nodes = if validators_processed {
all_tree_nodes.to_owned()
} else {
validator_tree_nodes.to_owned()
};

info!(
"reading {} tip distribution related accounts for epoch {}",
tree_nodes.len(),
all_tree_nodes.len(),
epoch
);

Expand All @@ -602,6 +632,7 @@ pub async fn get_claim_transactions_for_valid_unclaimed(
.iter()
.map(|tree_node| tree_node.claimant)
.collect_vec();

let claimants: HashMap<Pubkey, Account> = get_batched_accounts(rpc_client, &claimant_pubkeys)
.await?
.into_iter()
Expand All @@ -622,20 +653,22 @@ pub async fn get_claim_transactions_for_valid_unclaimed(

let elapsed_us = start.elapsed().as_micros();

// can be helpful for determining mismatch in state between requested and read
datapoint_info!(
"tip_router_cli.get_claim_transactions_account_data",
("elapsed_us", elapsed_us, i64),
("tdas", tda_pubkeys.len(), i64),
("tdas_onchain", tdas.len(), i64),
("claimants", claimant_pubkeys.len(), i64),
("claimants_onchain", claimants.len(), i64),
("claim_statuses", claim_status_pubkeys.len(), i64),
("claim_statuses_onchain", claim_statuses.len(), i64),
("epoch", epoch, i64),
("operator", operator_address, String),
"cluster" => cluster,
);
if validators_processed {
// can be helpful for determining mismatch in state between requested and read
datapoint_info!(
"tip_router_cli.get_claim_transactions_account_data",
("elapsed_us", elapsed_us, i64),
("tdas", tda_pubkeys.len(), i64),
("tdas_onchain", tdas.len(), i64),
("epoch", epoch, i64),
("claimants", claimant_pubkeys.len(), i64),
("claim_statuses", claim_status_pubkeys.len(), i64),
("claimants_onchain", claimants.len(), i64),
("claim_statuses_onchain", claim_statuses.len(), i64),
("operator", operator_address, String),
"cluster" => cluster,
);
}

let transactions = build_mev_claim_transactions(
tip_distribution_program_id,
Expand All @@ -651,7 +684,38 @@ pub async fn get_claim_transactions_for_valid_unclaimed(
cluster,
);

Ok(transactions)
Ok((transactions, validators_processed))
}

pub async fn get_unprocessed_claims_for_validators(
rpc_client: &RpcClient,
tree_nodes: &[&TreeNode],
) -> Result<Vec<Account>, ClaimMevError> {
let claim_status_pubkeys = tree_nodes
.iter()
.map(|tree_node| tree_node.claim_status_pubkey)
.collect_vec();

let claim_statuses: HashMap<Pubkey, Account> =
get_batched_accounts(rpc_client, &claim_status_pubkeys)
.await?
.into_iter()
.filter_map(|(pubkey, a)| Some((pubkey, a?)))
.collect();

let deserialized_claim_statuses = claim_statuses.values().map(|a| {
(
ClaimStatus::try_deserialize(&mut a.data.as_slice()).unwrap(),
a,
)
});

let unprocessed_claim_statuses = deserialized_claim_statuses
.filter(|(c, _)| !c.is_claimed)
.map(|(_, a)| a.clone())
.collect();

Ok(unprocessed_claim_statuses)
}

/// Returns a list of claim transactions for valid, unclaimed MEV tips
Expand Down
2 changes: 1 addition & 1 deletion tip_distribution_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
use anchor_lang::{declare_program, prelude::Pubkey, solana_program::clock::Epoch};

declare_program!(jito_tip_distribution);
pub use jito_tip_distribution::accounts::ClaimStatus;
pub use jito_tip_distribution::accounts::TipDistributionAccount;

pub mod instruction;

pub const CONFIG_SEED: &[u8] = b"CONFIG_ACCOUNT";
Expand Down
Loading