Skip to content

Commit ccb7372

Browse files
committed
fix: address review comments
1 parent c741b40 commit ccb7372

2 files changed

Lines changed: 86 additions & 120 deletions

File tree

backend/src/bridge/status.rs

Lines changed: 49 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ use super::{
1919
},
2020
};
2121

22-
use tokio::time::{interval, sleep, Duration};
22+
use tokio::time::{interval, Duration};
2323
use tracing::{debug, error, info, warn};
2424

2525
use crate::{
2626
config::BridgeMonitoringConfig,
27-
utils::{retry_policy::ExponentialBackoff, rpc_client::create_rpc_client},
27+
utils::rpc_client::{create_rpc_client, execute_with_retries},
2828
};
2929

3030
/// RPC client manager with connection pooling and retry logic
@@ -52,8 +52,6 @@ struct RpcClientManager {
5252
/// HTTP clients for each operator, keyed by operator public key ([`String`])
5353
/// [`BTreeMap`] ensures deterministic ordering (sorted by key)
5454
clients: BTreeMap<String, HttpClient>,
55-
/// Retry policy configuration ([`ExponentialBackoff`]) (3 retries over 10 seconds with exponential backoff)
56-
retry_policy: ExponentialBackoff,
5755
}
5856

5957
impl RpcClientManager {
@@ -65,9 +63,6 @@ impl RpcClientManager {
6563
/// - 10MB max request size
6664
/// - Connection pooling enabled
6765
///
68-
/// The retry policy is configured for 3 retries (4 total attempts) over 10 seconds
69-
/// with a 1.5x exponential backoff multiplier.
70-
///
7166
/// # Arguments
7267
///
7368
/// * `config` - Bridge monitoring configuration containing operator RPC URLs
@@ -80,20 +75,15 @@ impl RpcClientManager {
8075
);
8176
}
8277

83-
// Configure retry policy: 3 retries (4 total attempts) over 10 seconds
84-
let retry_policy = ExponentialBackoff::new(3, 10, 1.5);
85-
86-
Self {
87-
clients,
88-
retry_policy,
89-
}
78+
Self { clients }
9079
}
9180

9281
/// Execute an async operation across all available clients with retry logic
9382
///
9483
/// This method tries the given operation on each operator sequentially (in sorted order
95-
/// by public key). For each operator, it retries up to `max_retries` times with exponential
96-
/// backoff between attempts. The first successful result is returned immediately.
84+
/// by public key). For each operator, it retries up to 3 times with exponential
85+
/// backoff between attempts using [`execute_with_retries`]. The first successful result
86+
/// is returned immediately.
9787
///
9888
/// # Type Parameters
9989
///
@@ -112,18 +102,14 @@ impl RpcClientManager {
112102
///
113103
/// # Retry Behavior
114104
///
105+
/// Uses [`execute_with_retries`] for each operator with exponential backoff:
106+
///
115107
/// - **Attempt 0**: Immediate (no delay)
116108
/// - **Attempt 1**: After ~2s delay
117109
/// - **Attempt 2**: After ~3s delay
118110
/// - **Attempt 3**: After ~5s delay
119111
/// - Total: ~10 seconds per operator
120112
///
121-
/// # Logging
122-
///
123-
/// - `debug`: Successful requests and retry attempts with delays
124-
/// - `info`: Requests that succeed after retries
125-
/// - `warn`: Operators that fail after all retries
126-
///
127113
/// # Example
128114
///
129115
/// ```ignore
@@ -138,45 +124,30 @@ impl RpcClientManager {
138124
F: Fn(HttpClient) -> Fut,
139125
Fut: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>,
140126
{
141-
let max_retries = self.retry_policy.max_retries();
142-
143127
// BTreeMap maintains sorted order automatically
144128
for (key, client) in self.clients.iter() {
145-
let mut attempt = 0;
146-
147-
// Retry loop: attempt 0, 1, 2, ..., max_retries (inclusive)
148-
loop {
149-
match operation(client.clone()).await {
150-
Ok(result) => {
151-
if attempt > 0 {
152-
info!(
153-
"RPC request succeeded for operator {} after {} retries",
154-
key, attempt
155-
);
156-
} else {
157-
debug!("RPC request succeeded for operator: {}", key);
158-
}
159-
return Some(result);
160-
}
161-
Err(e) => {
162-
if attempt < max_retries {
163-
let delay_secs = self.retry_policy.get_delay(attempt);
164-
if delay_secs > 0 {
165-
debug!(
166-
"RPC request failed for operator {}, retry {}/{}, waiting {} seconds: {}",
167-
key, attempt + 1, max_retries, delay_secs, e
168-
);
169-
sleep(Duration::from_secs(delay_secs)).await;
170-
}
171-
attempt += 1;
172-
} else {
173-
warn!(
174-
"RPC request failed for operator {} after {} retries: {}",
175-
key, max_retries, e
176-
);
177-
break;
178-
}
179-
}
129+
let client_clone = client.clone();
130+
let operation_name = format!("RPC request to operator {}", key);
131+
132+
match execute_with_retries(
133+
|| {
134+
let client = client_clone.clone();
135+
operation(client)
136+
},
137+
&operation_name,
138+
)
139+
.await
140+
{
141+
Ok(result) => {
142+
debug!("RPC request succeeded for operator: {}", key);
143+
return Some(result);
144+
}
145+
Err(e) => {
146+
warn!(
147+
"RPC request failed for operator {} after retries: {}",
148+
key, e
149+
);
150+
// Continue to next operator
180151
}
181152
}
182153
}
@@ -332,7 +303,7 @@ pub async fn bridge_monitoring_task(context: Arc<BridgeMonitoringContext>) {
332303
let operator_id = format!("Alpen Labs #{}", index + 1);
333304
let pk_bytes = hex::decode(operator.public_key()).expect("decode to succeed");
334305
let operator_pk = PublicKey::from_slice(&pk_bytes).expect("conversion to succeed");
335-
let status = get_operator_status(&rpc_manager, operator.rpc_url()).await;
306+
let status = get_operator_status(operator.rpc_url()).await;
336307
operator_statuses.push(OperatorStatus::new(operator_id, operator_pk, status));
337308
}
338309

@@ -456,7 +427,7 @@ pub async fn bridge_monitoring_task(context: Arc<BridgeMonitoringContext>) {
456427
}
457428

458429
/// Fetch operator status
459-
async fn get_operator_status(_rpc_manager: &RpcClientManager, rpc_url: &str) -> RpcOperatorStatus {
430+
async fn get_operator_status(rpc_url: &str) -> RpcOperatorStatus {
460431
let rpc_client = create_rpc_client(rpc_url);
461432

462433
// Directly use `get_uptime`
@@ -496,12 +467,7 @@ async fn get_bitcoin_chain_tip_height(
496467
async fn get_deposit_requests(rpc_manager: &RpcClientManager) -> Vec<Txid> {
497468
let result = rpc_manager
498469
.try_with_retry(|client| async move {
499-
let txids = client.get_deposit_requests().await?;
500-
if txids.is_empty() {
501-
Err("No deposit requests found".into())
502-
} else {
503-
Ok(txids)
504-
}
470+
client.get_deposit_requests().await.map_err(|e| e.into())
505471
})
506472
.await;
507473

@@ -538,7 +504,9 @@ async fn get_deposits(
538504
chain_tip_height: u64,
539505
active_deposit_txids: &[Txid],
540506
) -> Vec<(DepositInfo, u64)> {
541-
let mut deposit_requests = get_deposit_requests(rpc_manager).await;
507+
let new_deposit_requests = get_deposit_requests(rpc_manager).await;
508+
let new_count = new_deposit_requests.len();
509+
let mut deposit_requests = new_deposit_requests;
542510

543511
// Add existing active deposits that we need to check for status updates
544512
for txid in active_deposit_txids {
@@ -550,7 +518,7 @@ async fn get_deposits(
550518
info!(
551519
"Checking {} deposit requests ({} new, {} existing active)",
552520
deposit_requests.len(),
553-
get_deposit_requests(rpc_manager).await.len(),
521+
new_count,
554522
active_deposit_txids.len()
555523
);
556524

@@ -616,14 +584,9 @@ async fn get_deposits(
616584
/// Vector of withdrawal request IDs. Empty if no withdrawals found or all operators failed.
617585
async fn get_withdrawal_requests(rpc_manager: &RpcClientManager) -> Vec<Buf32> {
618586
let result = rpc_manager
619-
.try_with_retry(|client| async move {
620-
let txids = client.get_withdrawals().await?;
621-
if txids.is_empty() {
622-
Err("No withdrawal requests found".into())
623-
} else {
624-
Ok(txids)
625-
}
626-
})
587+
.try_with_retry(
588+
|client| async move { client.get_withdrawals().await.map_err(|e| e.into()) },
589+
)
627590
.await;
628591

629592
result.unwrap_or_else(|| {
@@ -658,7 +621,9 @@ async fn get_withdrawals(
658621
chain_tip_height: u64,
659622
active_withdrawal_request_ids: &[Buf32],
660623
) -> Vec<(WithdrawalInfo, u64)> {
661-
let mut withdrawal_requests = get_withdrawal_requests(rpc_manager).await;
624+
let new_withdrawal_requests = get_withdrawal_requests(rpc_manager).await;
625+
let new_count = new_withdrawal_requests.len();
626+
let mut withdrawal_requests = new_withdrawal_requests;
662627

663628
// Add existing active withdrawals that we need to check for status updates
664629
for request_id in active_withdrawal_request_ids {
@@ -670,7 +635,7 @@ async fn get_withdrawals(
670635
info!(
671636
"Checking {} withdrawal requests ({} new, {} existing active)",
672637
withdrawal_requests.len(),
673-
get_withdrawal_requests(rpc_manager).await.len(),
638+
new_count,
674639
active_withdrawal_request_ids.len()
675640
);
676641

@@ -732,14 +697,7 @@ async fn get_withdrawals(
732697
/// Vector of claim transaction IDs. Empty if no claims found or all operators failed.
733698
async fn get_claims(rpc_manager: &RpcClientManager) -> Vec<Txid> {
734699
let result = rpc_manager
735-
.try_with_retry(|client| async move {
736-
let txids = client.get_claims().await?;
737-
if txids.is_empty() {
738-
Err("No claims found".into())
739-
} else {
740-
Ok(txids)
741-
}
742-
})
700+
.try_with_retry(|client| async move { client.get_claims().await.map_err(|e| e.into()) })
743701
.await;
744702

745703
result.unwrap_or_else(|| {
@@ -775,7 +733,9 @@ async fn get_reimbursements(
775733
chain_tip_height: u64,
776734
active_reimbursement_txids: &[Txid],
777735
) -> Vec<(ReimbursementInfo, u64)> {
778-
let mut claims = get_claims(rpc_manager).await;
736+
let new_claims = get_claims(rpc_manager).await;
737+
let new_count = new_claims.len();
738+
let mut claims = new_claims;
779739

780740
// Add existing active reimbursements that we need to check for status updates
781741
for txid in active_reimbursement_txids {
@@ -787,7 +747,7 @@ async fn get_reimbursements(
787747
info!(
788748
"Checking {} claims ({} new, {} existing active)",
789749
claims.len(),
790-
get_claims(rpc_manager).await.len(),
750+
new_count,
791751
active_reimbursement_txids.len()
792752
);
793753

backend/src/utils/rpc_client.rs

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
22
use std::time::Duration;
3+
use std::{fmt, future::Future};
4+
use tokio::time::sleep;
5+
use tracing::warn;
6+
7+
use super::retry_policy::ExponentialBackoff;
38

49
/// Creates a JSON-RPC HTTP client with connection pooling and timeout configuration
510
///
@@ -36,35 +41,36 @@ pub fn create_rpc_client(rpc_url: &str) -> HttpClient {
3641
.expect("Failed to create JSON-RPC client")
3742
}
3843

39-
/// Creates a JSON-RPC HTTP client with a custom timeout duration
40-
///
41-
/// Similar to [`create_rpc_client`] but allows specifying a custom request timeout.
42-
/// This is useful for operations that may take longer than the default 30 seconds.
43-
///
44-
/// # Arguments
45-
///
46-
/// * `rpc_url` - Base URL of the JSON-RPC server
47-
/// * `timeout_secs` - Request timeout in seconds
48-
///
49-
/// # Returns
50-
///
51-
/// A configured [`HttpClient`] with the specified timeout
52-
///
53-
/// # Panics
54-
///
55-
/// Panics if the client cannot be built (e.g., invalid URL format)
56-
///
57-
/// # Example
58-
///
59-
/// ```ignore
60-
/// // Create a client with 60-second timeout for slow operations
61-
/// let client = create_rpc_client_with_timeout("http://localhost:8332", 60);
62-
/// ```
63-
#[allow(dead_code)]
64-
pub fn create_rpc_client_with_timeout(rpc_url: &str, timeout_secs: u64) -> HttpClient {
65-
HttpClientBuilder::default()
66-
.request_timeout(Duration::from_secs(timeout_secs))
67-
.max_request_size(10 * 1024 * 1024) // 10MB
68-
.build(rpc_url)
69-
.expect("Failed to create JSON-RPC client")
44+
/// Execute an async operation with exponential backoff retry logic
45+
pub async fn execute_with_retries<F, Fut, T, E>(operation: F, operation_name: &str) -> Result<T, E>
46+
where
47+
F: Fn() -> Fut,
48+
Fut: Future<Output = Result<T, E>>,
49+
E: fmt::Display,
50+
{
51+
let retry_policy = ExponentialBackoff::new(3, 10, 1.5);
52+
let mut last_error = None;
53+
54+
for attempt in 0..=retry_policy.max_retries() {
55+
match operation().await {
56+
Ok(result) => return Ok(result),
57+
Err(e) => {
58+
if attempt < retry_policy.max_retries() {
59+
let delay = retry_policy.get_delay(attempt + 1);
60+
warn!(
61+
operation = operation_name,
62+
attempt = attempt + 1,
63+
max_retries = retry_policy.max_retries(),
64+
delay_secs = delay,
65+
error = %e,
66+
"Operation failed, retrying..."
67+
);
68+
sleep(Duration::from_secs(delay)).await;
69+
}
70+
last_error = Some(e);
71+
}
72+
}
73+
}
74+
75+
Err(last_error.expect("last_error should be set after all retries"))
7076
}

0 commit comments

Comments
 (0)