diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 4b90365d..ebf83756 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -5,6 +5,9 @@ updates: directory: "/backend" schedule: interval: "monthly" + # Cooldown + cooldown: + default-days: 5 open-pull-requests-limit: 10 # Enable version updates for GitHub Actions @@ -12,6 +15,9 @@ updates: directory: "/" schedule: interval: "monthly" + # Cooldown + cooldown: + default-days: 5 open-pull-requests-limit: 10 # Enable version updates for npm @@ -19,4 +25,7 @@ updates: directory: "/frontend" schedule: interval: "monthly" - open-pull-requests-limit: 10 \ No newline at end of file + # Cooldown + cooldown: + default-days: 5 + open-pull-requests-limit: 10 diff --git a/backend/src/bridge/status.rs b/backend/src/bridge/status.rs index a53caaad..1f4efbc9 100644 --- a/backend/src/bridge/status.rs +++ b/backend/src/bridge/status.rs @@ -1,6 +1,8 @@ use axum::Json; use bitcoin::{secp256k1::PublicKey, Txid}; +use jsonrpsee::http_client::HttpClient; +use std::collections::BTreeMap; use std::sync::{atomic::Ordering, Arc}; use strata_bridge_rpc::traits::{StrataBridgeControlApiClient, StrataBridgeMonitoringApiClient}; use strata_bridge_rpc::types::{ @@ -18,9 +20,141 @@ use super::{ }; use tokio::time::{interval, Duration}; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; -use crate::{config::BridgeMonitoringConfig, utils::rpc_client::create_rpc_client}; +use crate::{ + config::BridgeMonitoringConfig, + utils::rpc_client::{create_rpc_client, execute_with_retries}, +}; + +/// RPC client manager with connection pooling and retry logic +/// +/// This manager maintains a pool of reusable HTTP clients for each bridge operator, +/// preventing connection exhaustion by reusing clients across requests. It implements +/// automatic retry logic with exponential backoff to handle transient network failures. +/// +/// # Design +/// +/// - **Connection Pooling**: Creates one HTTP client per operator and reuses it for all requests +/// - **Retry Logic**: Implements exponential backoff (3 retries over 10 seconds with 1.5x multiplier) +/// - **Failover**: Tries all available operators in deterministic order (sorted by public key) +/// - **Graceful Degradation**: Returns `None` if all operators fail after retries +/// +/// # Example Flow +/// +/// For each RPC request: +/// +/// 1. Try operator 1 with up to 3 retries (exponential backoff between retries) +/// 2. If operator 1 fails after retries, try operator 2 with up to 3 retries +/// 3. Continue until an operator succeeds or all fail +/// 4. Return the first successful result or [`None`] if all fail +struct RpcClientManager { + /// HTTP clients for each operator, keyed by operator public key ([`String`]) + /// [`BTreeMap`] ensures deterministic ordering (sorted by key) + clients: BTreeMap, +} + +impl RpcClientManager { + /// Create a new RPC client manager for the configured bridge operators + /// + /// This initializes one HTTP client per operator with: + /// + /// - 30-second request timeout + /// - 10MB max request size + /// - Connection pooling enabled + /// + /// # Arguments + /// + /// * `config` - Bridge monitoring configuration containing operator RPC URLs + fn new(config: &BridgeMonitoringConfig) -> Self { + let mut clients = BTreeMap::new(); + for operator in config.operators() { + clients.insert( + operator.public_key().to_string(), + create_rpc_client(operator.rpc_url()), + ); + } + + Self { clients } + } + + /// Execute an async operation across all available clients with retry logic + /// + /// This method tries the given operation on each operator sequentially (in sorted order + /// by public key). For each operator, it retries up to 3 times with exponential + /// backoff between attempts using [`execute_with_retries`]. The first successful result + /// is returned immediately. + /// + /// # Type Parameters + /// + /// * `T` - The expected return type of the operation + /// * `F` - A function that takes an [`HttpClient`] and returns a future + /// * `Fut` - The future returned by `F` + /// + /// # Arguments + /// + /// * `operation` - A closure that performs an RPC call using the provided client + /// + /// # Returns + /// + /// * [`Some(T)`](Some) - If any operator succeeds (possibly after retries) + /// * [`None`] - If all operators fail after exhausting their retries + /// + /// # Retry Behavior + /// + /// Uses [`execute_with_retries`] for each operator with exponential backoff: + /// + /// - **Attempt 0**: Immediate (no delay) + /// - **Attempt 1**: After ~2s delay + /// - **Attempt 2**: After ~3s delay + /// - **Attempt 3**: After ~5s delay + /// - Total: ~10 seconds per operator + /// + /// # Example + /// + /// ```ignore + /// let result = rpc_manager + /// .query_clients_with_retry(|client| async move { + /// client.get_deposit_requests().await.map_err(|e| e.into()) + /// }) + /// .await; + /// ``` + async fn query_clients_with_retry(&self, operation: F) -> Option + where + F: Fn(HttpClient) -> Fut, + Fut: std::future::Future>>, + { + // BTreeMap maintains sorted order automatically + for (key, client) in self.clients.iter() { + let client_clone = client.clone(); + let operation_name = format!("RPC request to operator {key}"); + + match execute_with_retries( + || { + let client = client_clone.clone(); + operation(client) + }, + &operation_name, + ) + .await + { + Ok(result) => { + debug!("RPC request succeeded for operator: {}", key); + return Some(result); + } + Err(e) => { + warn!( + "RPC request failed for operator {} after retries: {}", + key, e + ); + // Continue to next operator + } + } + } + + None + } +} /// Get transaction confirmations from esplora async fn get_tx_confirmations(esplora_url: &str, txid: Txid, chain_tip_height: u64) -> Option { @@ -156,6 +290,9 @@ pub async fn bridge_monitoring_task(context: Arc) { context.config.status_refetch_interval(), )); + // Create RPC client manager once and reuse it + let rpc_manager = RpcClientManager::new(&context.config); + loop { // Fetch all data without holding lock @@ -213,12 +350,16 @@ pub async fn bridge_monitoring_task(context: Arc) { }; // Update deposits incrementally - let deposit_updates: Vec<(Txid, DepositInfo, u64)> = - get_deposits(&context.config, chain_tip_height, &active_deposits) - .await - .iter() - .map(|(info, confirmations)| (info.deposit_request_txid, *info, *confirmations)) - .collect(); + let deposit_updates: Vec<(Txid, DepositInfo, u64)> = get_deposits( + &rpc_manager, + &context.config, + chain_tip_height, + &active_deposits, + ) + .await + .iter() + .map(|(info, confirmations)| (info.deposit_request_txid, *info, *confirmations)) + .collect(); { let mut cache = context.status_cache.write().await; @@ -231,12 +372,16 @@ pub async fn bridge_monitoring_task(context: Arc) { } // Update withdrawals incrementally - let withdrawal_updates: Vec<(Buf32, WithdrawalInfo, u64)> = - get_withdrawals(&context.config, chain_tip_height, &active_withdrawals) - .await - .iter() - .map(|(info, confirmations)| (info.withdrawal_request_txid, *info, *confirmations)) - .collect(); + let withdrawal_updates: Vec<(Buf32, WithdrawalInfo, u64)> = get_withdrawals( + &rpc_manager, + &context.config, + chain_tip_height, + &active_withdrawals, + ) + .await + .iter() + .map(|(info, confirmations)| (info.withdrawal_request_txid, *info, *confirmations)) + .collect(); { let mut cache = context.status_cache.write().await; @@ -249,12 +394,16 @@ pub async fn bridge_monitoring_task(context: Arc) { } // Update reimbursements incrementally - let reimbursement_updates: Vec<(Txid, ReimbursementInfo, u64)> = - get_reimbursements(&context.config, chain_tip_height, &active_reimbursements) - .await - .iter() - .map(|(info, confirmations)| (info.claim_txid, *info, *confirmations)) - .collect(); + let reimbursement_updates: Vec<(Txid, ReimbursementInfo, u64)> = get_reimbursements( + &rpc_manager, + &context.config, + chain_tip_height, + &active_reimbursements, + ) + .await + .iter() + .map(|(info, confirmations)| (info.claim_txid, *info, *confirmations)) + .collect(); { let mut cache = context.status_cache.write().await; @@ -302,28 +451,62 @@ async fn get_bitcoin_chain_tip_height( Ok(height) } -/// Fetch deposit requests -async fn get_deposit_requests(config: &BridgeMonitoringConfig) -> Vec { - for operator in config.operators().iter() { - let rpc_client = create_rpc_client(operator.rpc_url()); - - match rpc_client.get_deposit_requests().await { - Ok(txids) if !txids.is_empty() => return txids, - Ok(_) | Err(_) => {} // Try next operator - } - } - - warn!("No deposit requests found"); - Vec::new() +/// Fetch all pending deposit request transaction IDs from bridge operators +/// +/// Uses the RPC client manager with retry logic to query all operators for their +/// list of deposit requests. Returns the first non-empty result, or an empty vector +/// if all operators have no deposits or all fail. +/// +/// # Arguments +/// +/// * `rpc_manager` - RPC client manager with retry/failover logic +/// +/// # Returns +/// +/// Vector of deposit request transaction IDs. Empty if no deposits found or all operators failed. +async fn get_deposit_requests(rpc_manager: &RpcClientManager) -> Vec { + let result = rpc_manager + .query_clients_with_retry(|client| async move { + client.get_deposit_requests().await.map_err(|e| e.into()) + }) + .await; + + result.unwrap_or_else(|| { + warn!("No deposit requests found from any operator"); + Vec::new() + }) } -/// Fetch deposit details +/// Fetch detailed information for all deposit requests +/// +/// Queries bridge operators for deposit details, including both new deposit requests +/// and existing active deposits that need status updates. Filters results based on +/// confirmation count to only include deposits below the maximum confirmation threshold. +/// +/// # Arguments +/// +/// * `rpc_manager` - RPC client manager with retry/failover logic +/// * `config` - Bridge monitoring configuration (contains max confirmations threshold) +/// * `chain_tip_height` - Current Bitcoin blockchain height +/// * `active_deposit_txids` - List of deposit txids already being tracked (for status updates) +/// +/// # Returns +/// +/// Vector of tuples containing: +/// +/// - [`DepositInfo`] - Detailed deposit information +/// - [`u64`] - Number of confirmations (0 for in-progress deposits) +/// +/// Deposits are filtered to only include those with confirmations < [`BridgeMonitoringConfig::max_tx_confirmations`]. async fn get_deposits( + rpc_manager: &RpcClientManager, config: &BridgeMonitoringConfig, chain_tip_height: u64, active_deposit_txids: &[Txid], ) -> Vec<(DepositInfo, u64)> { - let mut deposit_requests = get_deposit_requests(config).await; + let new_deposit_requests = get_deposit_requests(rpc_manager).await; + let new_count = new_deposit_requests.len(); + let mut deposit_requests = new_deposit_requests; // Add existing active deposits that we need to check for status updates for txid in active_deposit_txids { @@ -335,26 +518,24 @@ async fn get_deposits( info!( "Checking {} deposit requests ({} new, {} existing active)", deposit_requests.len(), - get_deposit_requests(config).await.len(), + new_count, active_deposit_txids.len() ); let mut deposit_infos: Vec<(DepositInfo, u64)> = Vec::new(); for deposit_request_txid in deposit_requests.iter() { - let mut rpc_info = None; - for operator in config.operators() { - let rpc_client = create_rpc_client(operator.rpc_url()); - if let Ok(info) = rpc_client - .get_deposit_request_info(*deposit_request_txid) - .await - { - rpc_info = Some(info); - break; - } - } + let txid = *deposit_request_txid; + let rpc_info = rpc_manager + .query_clients_with_retry(|client| async move { + client + .get_deposit_request_info(txid) + .await + .map_err(|e| e.into()) + }) + .await; let Some(dep_info) = rpc_info else { - error!(%deposit_request_txid, "Failed to fetch deposit info"); + error!(%deposit_request_txid, "Failed to fetch deposit info after retries"); continue; }; @@ -388,28 +569,61 @@ async fn get_deposits( deposit_infos } -/// Fetch withdrawal requests -async fn get_withdrawal_requests(config: &BridgeMonitoringConfig) -> Vec { - for operator in config.operators().iter() { - let rpc_client = create_rpc_client(operator.rpc_url()); - - match rpc_client.get_withdrawals().await { - Ok(txids) if !txids.is_empty() => return txids, - Ok(_) | Err(_) => {} // Try next operator - } - } - - warn!("No withdrawal requests found"); - Vec::new() +/// Fetch all pending withdrawal request IDs from bridge operators +/// +/// Uses the RPC client manager with retry logic to query all operators for their +/// list of withdrawal requests. Returns the first non-empty result, or an empty vector +/// if all operators have no withdrawals or all fail. +/// +/// # Arguments +/// +/// * `rpc_manager` - RPC client manager with retry/failover logic +/// +/// # Returns +/// +/// Vector of withdrawal request IDs. Empty if no withdrawals found or all operators failed. +async fn get_withdrawal_requests(rpc_manager: &RpcClientManager) -> Vec { + let result = rpc_manager + .query_clients_with_retry(|client| async move { + client.get_withdrawals().await.map_err(|e| e.into()) + }) + .await; + + result.unwrap_or_else(|| { + warn!("No withdrawal requests found from any operator"); + Vec::new() + }) } -/// Fetch withdrawal/fullfillment details +/// Fetch detailed information for all withdrawal requests and fulfillments +/// +/// Queries bridge operators for withdrawal details, including both new withdrawal requests +/// and existing active withdrawals that need status updates. Filters results based on +/// confirmation count to only include withdrawals below the maximum confirmation threshold. +/// +/// # Arguments +/// +/// * `rpc_manager` - RPC client manager with retry/failover logic +/// * `config` - Bridge monitoring configuration (contains max confirmations threshold) +/// * `chain_tip_height` - Current Bitcoin blockchain height +/// * `active_withdrawal_request_ids` - List of withdrawal request IDs already being tracked +/// +/// # Returns +/// +/// Vector of tuples containing: +/// - `WithdrawalInfo` - Detailed withdrawal information +/// - `u64` - Number of confirmations (0 for in-progress withdrawals) +/// +/// Withdrawals are filtered to only include those with confirmations < max_tx_confirmations. async fn get_withdrawals( + rpc_manager: &RpcClientManager, config: &BridgeMonitoringConfig, chain_tip_height: u64, active_withdrawal_request_ids: &[Buf32], ) -> Vec<(WithdrawalInfo, u64)> { - let mut withdrawal_requests = get_withdrawal_requests(config).await; + let new_withdrawal_requests = get_withdrawal_requests(rpc_manager).await; + let new_count = new_withdrawal_requests.len(); + let mut withdrawal_requests = new_withdrawal_requests; // Add existing active withdrawals that we need to check for status updates for request_id in active_withdrawal_request_ids { @@ -421,26 +635,25 @@ async fn get_withdrawals( info!( "Checking {} withdrawal requests ({} new, {} existing active)", withdrawal_requests.len(), - get_withdrawal_requests(config).await.len(), + new_count, active_withdrawal_request_ids.len() ); let mut withdrawal_infos: Vec<(WithdrawalInfo, u64)> = Vec::new(); for withdrawal_request_txid in withdrawal_requests.iter() { - let mut rpc_info = None; - for operator in config.operators().iter() { - let rpc_client = create_rpc_client(operator.rpc_url()); - if let Ok(info) = rpc_client - .get_withdrawal_info(*withdrawal_request_txid) - .await - { - rpc_info = info; - break; - } - } + let request_id = *withdrawal_request_txid; + let rpc_info = rpc_manager + .query_clients_with_retry(|client| async move { + match client.get_withdrawal_info(request_id).await { + Ok(Some(info)) => Ok(info), + Ok(None) => Err("No withdrawal info found".into()), + Err(e) => Err(e.into()), + } + }) + .await; let Some(wd_info) = rpc_info else { - error!(%withdrawal_request_txid, "Failed to fetch withdrawal info"); + error!(%withdrawal_request_txid, "Failed to fetch withdrawal info after retries"); continue; }; @@ -469,28 +682,62 @@ async fn get_withdrawals( withdrawal_infos } -/// Fetch claims -async fn get_claims(config: &BridgeMonitoringConfig) -> Vec { - for operator in config.operators().iter() { - let rpc_client = create_rpc_client(operator.rpc_url()); - - match rpc_client.get_claims().await { - Ok(txids) if !txids.is_empty() => return txids, - Ok(_) | Err(_) => {} // Try next operator - } - } - - warn!("No claims found"); - Vec::new() +/// Fetch all pending claim transaction IDs from bridge operators +/// +/// Uses the RPC client manager with retry logic to query all operators for their +/// list of claim transactions. Returns the first non-empty result, or an empty vector +/// if all operators have no claims or all fail. +/// +/// # Arguments +/// +/// * `rpc_manager` - RPC client manager with retry/failover logic +/// +/// # Returns +/// +/// Vector of claim transaction IDs. Empty if no claims found or all operators failed. +async fn get_claims(rpc_manager: &RpcClientManager) -> Vec { + let result = rpc_manager + .query_clients_with_retry(|client| async move { + client.get_claims().await.map_err(|e| e.into()) + }) + .await; + + result.unwrap_or_else(|| { + warn!("No claims found from any operator"); + Vec::new() + }) } -/// Fetch claim/reimbursement details +/// Fetch detailed information for all claim and reimbursement transactions +/// +/// Queries bridge operators for claim/reimbursement details, including both new claims +/// and existing active reimbursements that need status updates. Filters results based on +/// confirmation count and status (skips NotStarted claims). +/// +/// # Arguments +/// +/// * `rpc_manager` - RPC client manager with retry/failover logic +/// * `config` - Bridge monitoring configuration (contains max confirmations threshold) +/// * `chain_tip_height` - Current Bitcoin blockchain height +/// * `active_reimbursement_txids` - List of claim txids already being tracked +/// +/// # Returns +/// +/// Vector of tuples containing: +/// - `ReimbursementInfo` - Detailed claim/reimbursement information +/// - `u64` - Number of confirmations (0 for in-progress/challenged claims) +/// +/// Claims with status `NotStarted` are excluded. Completed/cancelled claims are filtered +/// to only include those with confirmations < max_tx_confirmations. async fn get_reimbursements( + rpc_manager: &RpcClientManager, config: &BridgeMonitoringConfig, chain_tip_height: u64, active_reimbursement_txids: &[Txid], ) -> Vec<(ReimbursementInfo, u64)> { - let mut claims = get_claims(config).await; + let new_claims = get_claims(rpc_manager).await; + let new_count = new_claims.len(); + let mut claims = new_claims; // Add existing active reimbursements that we need to check for status updates for txid in active_reimbursement_txids { @@ -502,23 +749,25 @@ async fn get_reimbursements( info!( "Checking {} claims ({} new, {} existing active)", claims.len(), - get_claims(config).await.len(), + new_count, active_reimbursement_txids.len() ); let mut reimbursement_infos = Vec::new(); for claim_txid in claims.iter() { - let mut rpc_info = None; - for operator in config.operators().iter() { - let rpc_client = create_rpc_client(operator.rpc_url()); - if let Ok(info) = rpc_client.get_claim_info(*claim_txid).await { - rpc_info = info; - break; - } - } + let txid = *claim_txid; + let rpc_info = rpc_manager + .query_clients_with_retry(|client| async move { + match client.get_claim_info(txid).await { + Ok(Some(info)) => Ok(info), + Ok(None) => Err("No claim info found".into()), + Err(e) => Err(e.into()), + } + }) + .await; let Some(claim_info) = rpc_info else { - error!(%claim_txid, "Failed to fetch claim info"); + error!(%claim_txid, "Failed to fetch claim info after retries"); continue; }; diff --git a/backend/src/utils/rpc_client.rs b/backend/src/utils/rpc_client.rs index 870c4dc6..7d8ef095 100644 --- a/backend/src/utils/rpc_client.rs +++ b/backend/src/utils/rpc_client.rs @@ -1,8 +1,76 @@ use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; +use std::time::Duration; +use std::{fmt, future::Future}; +use tokio::time::sleep; +use tracing::warn; -/// Creates a JSON-RPC client with a dynamic URL +use super::retry_policy::ExponentialBackoff; + +/// Creates a JSON-RPC HTTP client with connection pooling and timeout configuration +/// +/// This creates a reusable HTTP client that maintains a connection pool to avoid +/// connection exhaustion. The client is configured with: +/// +/// - 30-second request timeout to prevent hanging requests +/// - 10MB max request size limit +/// - Internal connection pooling (managed by hyper) +/// +/// # Arguments +/// +/// * `rpc_url` - Base URL of the JSON-RPC server +/// +/// # Returns +/// +/// A configured [`HttpClient`] ready to make JSON-RPC requests +/// +/// # Panics +/// +/// Panics if the client cannot be built (e.g., invalid URL format) +/// +/// # Example +/// +/// ```ignore +/// let client = create_rpc_client("http://localhost:8332"); +/// let result = client.request("getblockcount", ()).await?; +/// ``` pub fn create_rpc_client(rpc_url: &str) -> HttpClient { HttpClientBuilder::default() + .request_timeout(Duration::from_secs(30)) + .max_request_size(10 * 1024 * 1024) // 10MB .build(rpc_url) .expect("Failed to create JSON-RPC client") } + +/// Execute an async operation with exponential backoff retry logic +pub async fn execute_with_retries(operation: F, operation_name: &str) -> Result +where + F: Fn() -> Fut, + Fut: Future>, + E: fmt::Display, +{ + let retry_policy = ExponentialBackoff::new(3, 10, 1.5); + let mut last_error = None; + + for attempt in 0..=retry_policy.max_retries() { + match operation().await { + Ok(result) => return Ok(result), + Err(e) => { + if attempt < retry_policy.max_retries() { + let delay = retry_policy.get_delay(attempt + 1); + warn!( + operation = operation_name, + attempt = attempt + 1, + max_retries = retry_policy.max_retries(), + delay_secs = delay, + error = %e, + "Operation failed, retrying..." + ); + sleep(Duration::from_secs(delay)).await; + } + last_error = Some(e); + } + } + } + + Err(last_error.expect("last_error should be set after all retries")) +}