Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 55 additions & 4 deletions tip-router-operator-cli/src/process_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,44 @@ pub async fn loop_stages(
save_stages: bool,
) -> Result<()> {
let keypair = read_keypair_file(&cli.keypair_path).expect("Failed to read keypair file");
let mut current_epoch_info = rpc_client.get_epoch_info().await?;
let epoch_schedule = rpc_client.get_epoch_schedule().await?;

let mut current_epoch_info = {
loop {
match rpc_client.get_epoch_info().await {
Ok(info) => break info,
Err(e) => {
error!("Error getting epoch info from RPC. Retrying...");
datapoint_error!(
"tip_router_cli.get_epoch_info",
("operator_address", cli.operator_address.clone(), String),
("status", "error", String),
("error", e.to_string(), String),
"cluster" => &cli.cluster,
);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
};

let epoch_schedule = {
loop {
match rpc_client.get_epoch_schedule().await {
Ok(schedule) => break schedule,
Err(e) => {
error!("Error getting epoch schedule from RPC. Retrying...");
datapoint_error!(
"tip_router_cli.get_epoch_schedule",
("operator_address", cli.operator_address.clone(), String),
("status", "error", String),
("error", e.to_string(), String),
"cluster" => &cli.cluster,
);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
};

// Track runs that are starting right at the beginning of a new epoch
let operator_address = cli.operator_address.clone();
Expand Down Expand Up @@ -289,7 +325,7 @@ pub async fn loop_stages(
meta_merkle_tree_path(epoch_to_process, &cli.get_save_path());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the big one. Any panic that killed the main process on an external validator was coming from here. The tokio task that submits to NCN from main.rs is properly handled. This is not.

let operator_address = Pubkey::from_str(&cli.operator_address)?;
submit_to_ncn(
let submit_result = submit_to_ncn(
&rpc_client,
&keypair,
&operator_address,
Expand All @@ -305,7 +341,22 @@ pub async fn loop_stages(
cli.vote_microlamports,
&cli.cluster,
)
.await?;
.await;
if let Err(e) = submit_result {
error!(
"Failed to submit epoch {} to NCN: {:?}",
epoch_to_process, e
);
datapoint_error!(
"tip_router_cli.cast_vote",
("operator_address", operator_address.to_string(), String),
("epoch", epoch_to_process, i64),
("status", "error", String),
("error", e.to_string(), String),
("state", "cast_vote", String),
"cluster" => &cli.cluster,
);
}
stage = OperatorState::WaitForNextEpoch;
}
OperatorState::WaitForNextEpoch => {
Expand Down
56 changes: 48 additions & 8 deletions tip-router-operator-cli/src/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,27 @@ pub async fn submit_to_ncn(
compute_unit_price: u64,
cluster: &str,
) -> Result<(), anyhow::Error> {
let epoch_info = client.get_epoch_info().await?;
let meta_merkle_tree = MetaMerkleTree::new_from_file(meta_merkle_tree_path)?;
let epoch_info = client
.get_epoch_info()
.await
.map_err(|e| anyhow::anyhow!("Failed to fetch epoch info from RPC client: {:?}", e))?;
let meta_merkle_tree = MetaMerkleTree::new_from_file(meta_merkle_tree_path).map_err(|e| {
anyhow::anyhow!(
"Failed to load Meta Merkle Tree from file {:?}: {:?}",
meta_merkle_tree_path,
e
)
})?;
let config_pda = Config::find_program_address(tip_router_program_id, ncn_address).0;
let config = get_ncn_config(client, tip_router_program_id, ncn_address).await?;
let config = get_ncn_config(client, tip_router_program_id, ncn_address)
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to fetch Tip Router config for NCN {}: {:?}",
ncn_address,
e
)
})?;

// The meta merkle root files are tagged with the epoch they have created the snapshot for
// Tip router accounts for that merkle root are created in the next epoch
Expand All @@ -119,12 +136,35 @@ pub async fn submit_to_ncn(
}
};

let ballot_box = BallotBox::try_from_slice_unchecked(&ballot_box_account.data)?;
let ballot_box =
BallotBox::try_from_slice_unchecked(&ballot_box_account.data).map_err(|e| {
datapoint_error!(
"tip_router_cli.ballot_box_deserialize_error",
("operator_address", operator_address.to_string(), String),
("epoch", tip_router_target_epoch, i64),
("status", "error", String),
("error", format!("{:?}", e), String),
"cluster" => cluster,
);
anyhow::anyhow!("Failed to deserialize ballot box: {:?}", e)
})?;

let is_voting_valid = ballot_box.is_voting_valid(
epoch_info.absolute_slot,
config.valid_slots_after_consensus(),
)?;
let is_voting_valid = ballot_box
.is_voting_valid(
epoch_info.absolute_slot,
config.valid_slots_after_consensus(),
)
.map_err(|e| {
datapoint_error!(
"tip_router_cli.voting_validity_error",
("operator_address", operator_address.to_string(), String),
("epoch", tip_router_target_epoch, i64),
("status", "error", String),
("error", format!("{:?}", e), String),
"cluster" => cluster,
);
anyhow::anyhow!("Failed to determine if voting is valid: {:?}", e)
})?;

// If exists, look for vote from current operator
let vote = ballot_box
Expand Down
Loading