Skip to content

Commit cfee297

Browse files
authored
operator: improve error handling in critical path (#190)
**Problem:** Internal and external operators have seen transient errors crashing tip router operator. These errors are generally related to a timed out RPC request. Upon examining loop_stages I have found errors that perhaps are not handled in the way we would like. **Solution:** - Wait for epoch info and schedule rpc requests to come back, log failures, this state is required to do anything useful with the operator. A failed request should be handled gracefully, this action is periodic. - We should not handle submit_to_ncn in CastVote with `?`, log an error here, this gives the operator a chance to vote again and recover from any potential RPC issues that were responsible for the failure.
1 parent 3555d94 commit cfee297

File tree

2 files changed

+103
-12
lines changed

2 files changed

+103
-12
lines changed

tip-router-operator-cli/src/process_epoch.rs

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,44 @@ pub async fn loop_stages(
112112
save_stages: bool,
113113
) -> Result<()> {
114114
let keypair = read_keypair_file(&cli.keypair_path).expect("Failed to read keypair file");
115-
let mut current_epoch_info = rpc_client.get_epoch_info().await?;
116-
let epoch_schedule = rpc_client.get_epoch_schedule().await?;
115+
116+
let mut current_epoch_info = {
117+
loop {
118+
match rpc_client.get_epoch_info().await {
119+
Ok(info) => break info,
120+
Err(e) => {
121+
error!("Error getting epoch info from RPC. Retrying...");
122+
datapoint_error!(
123+
"tip_router_cli.get_epoch_info",
124+
("operator_address", cli.operator_address.clone(), String),
125+
("status", "error", String),
126+
("error", e.to_string(), String),
127+
"cluster" => &cli.cluster,
128+
);
129+
tokio::time::sleep(Duration::from_secs(5)).await;
130+
}
131+
}
132+
}
133+
};
134+
135+
let epoch_schedule = {
136+
loop {
137+
match rpc_client.get_epoch_schedule().await {
138+
Ok(schedule) => break schedule,
139+
Err(e) => {
140+
error!("Error getting epoch schedule from RPC. Retrying...");
141+
datapoint_error!(
142+
"tip_router_cli.get_epoch_schedule",
143+
("operator_address", cli.operator_address.clone(), String),
144+
("status", "error", String),
145+
("error", e.to_string(), String),
146+
"cluster" => &cli.cluster,
147+
);
148+
tokio::time::sleep(Duration::from_secs(5)).await;
149+
}
150+
}
151+
}
152+
};
117153

118154
// Track runs that are starting right at the beginning of a new epoch
119155
let operator_address = cli.operator_address.clone();
@@ -289,7 +325,7 @@ pub async fn loop_stages(
289325
meta_merkle_tree_path(epoch_to_process, &cli.get_save_path());
290326

291327
let operator_address = Pubkey::from_str(&cli.operator_address)?;
292-
submit_to_ncn(
328+
let submit_result = submit_to_ncn(
293329
&rpc_client,
294330
&keypair,
295331
&operator_address,
@@ -305,7 +341,22 @@ pub async fn loop_stages(
305341
cli.vote_microlamports,
306342
&cli.cluster,
307343
)
308-
.await?;
344+
.await;
345+
if let Err(e) = submit_result {
346+
error!(
347+
"Failed to submit epoch {} to NCN: {:?}",
348+
epoch_to_process, e
349+
);
350+
datapoint_error!(
351+
"tip_router_cli.cast_vote",
352+
("operator_address", operator_address.to_string(), String),
353+
("epoch", epoch_to_process, i64),
354+
("status", "error", String),
355+
("error", e.to_string(), String),
356+
("state", "cast_vote", String),
357+
"cluster" => &cli.cluster,
358+
);
359+
}
309360
stage = OperatorState::WaitForNextEpoch;
310361
}
311362
OperatorState::WaitForNextEpoch => {

tip-router-operator-cli/src/submit.rs

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,27 @@ pub async fn submit_to_ncn(
9191
compute_unit_price: u64,
9292
cluster: &str,
9393
) -> Result<(), anyhow::Error> {
94-
let epoch_info = client.get_epoch_info().await?;
95-
let meta_merkle_tree = MetaMerkleTree::new_from_file(meta_merkle_tree_path)?;
94+
let epoch_info = client
95+
.get_epoch_info()
96+
.await
97+
.map_err(|e| anyhow::anyhow!("Failed to fetch epoch info from RPC client: {:?}", e))?;
98+
let meta_merkle_tree = MetaMerkleTree::new_from_file(meta_merkle_tree_path).map_err(|e| {
99+
anyhow::anyhow!(
100+
"Failed to load Meta Merkle Tree from file {:?}: {:?}",
101+
meta_merkle_tree_path,
102+
e
103+
)
104+
})?;
96105
let config_pda = Config::find_program_address(tip_router_program_id, ncn_address).0;
97-
let config = get_ncn_config(client, tip_router_program_id, ncn_address).await?;
106+
let config = get_ncn_config(client, tip_router_program_id, ncn_address)
107+
.await
108+
.map_err(|e| {
109+
anyhow::anyhow!(
110+
"Failed to fetch Tip Router config for NCN {}: {:?}",
111+
ncn_address,
112+
e
113+
)
114+
})?;
98115

99116
// The meta merkle root files are tagged with the epoch they have created the snapshot for
100117
// Tip router accounts for that merkle root are created in the next epoch
@@ -119,12 +136,35 @@ pub async fn submit_to_ncn(
119136
}
120137
};
121138

122-
let ballot_box = BallotBox::try_from_slice_unchecked(&ballot_box_account.data)?;
139+
let ballot_box =
140+
BallotBox::try_from_slice_unchecked(&ballot_box_account.data).map_err(|e| {
141+
datapoint_error!(
142+
"tip_router_cli.ballot_box_deserialize_error",
143+
("operator_address", operator_address.to_string(), String),
144+
("epoch", tip_router_target_epoch, i64),
145+
("status", "error", String),
146+
("error", format!("{:?}", e), String),
147+
"cluster" => cluster,
148+
);
149+
anyhow::anyhow!("Failed to deserialize ballot box: {:?}", e)
150+
})?;
123151

124-
let is_voting_valid = ballot_box.is_voting_valid(
125-
epoch_info.absolute_slot,
126-
config.valid_slots_after_consensus(),
127-
)?;
152+
let is_voting_valid = ballot_box
153+
.is_voting_valid(
154+
epoch_info.absolute_slot,
155+
config.valid_slots_after_consensus(),
156+
)
157+
.map_err(|e| {
158+
datapoint_error!(
159+
"tip_router_cli.voting_validity_error",
160+
("operator_address", operator_address.to_string(), String),
161+
("epoch", tip_router_target_epoch, i64),
162+
("status", "error", String),
163+
("error", format!("{:?}", e), String),
164+
"cluster" => cluster,
165+
);
166+
anyhow::anyhow!("Failed to determine if voting is valid: {:?}", e)
167+
})?;
128168

129169
// If exists, look for vote from current operator
130170
let vote = ballot_box

0 commit comments

Comments
 (0)