Skip to content

Commit 99cc4cb

Browse files
authored
Add RPC endpoint pool with failover support (#1209)
* Add RPC endpoint pool with failover support - Accept comma-separated URIs for multiple endpoints (--uri "ws://a,ws://b") - Add runtime failover when subscription stalls or terminates unexpectedly, swapping backend across all tasks * clippy
1 parent 4ccb803 commit 99cc4cb

File tree

8 files changed

+455
-126
lines changed

8 files changed

+455
-126
lines changed

README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,32 @@ The miner consists of **four independent tasks** that communicate via bounded ch
116116
- **Fault Isolation**: Task failures are isolated and handled appropriately
117117
- **Performance Optimization**: Mining operations are never delayed by cleanup tasks
118118

119+
### RPC Endpoint Pool
120+
121+
The miner supports multiple RPC endpoints for high availability and automatic failover:
122+
123+
```bash
124+
polkadot-staking-miner --uri "wss://rpc1.example.com,wss://rpc2.example.com" monitor --seed-or-path //Alice
125+
```
126+
127+
**Key Features:**
128+
129+
- **Round-Robin Failover**: When a connection fails, the miner automatically tries the next endpoint
130+
in the pool, cycling through all available endpoints
131+
- **Fast Failover for Pools**: When multiple endpoints are configured, connection retries are
132+
reduced to 1 per endpoint (vs 3 for single endpoint) to fail fast and try alternatives
133+
- **Zombie Node Detection**: The miner task monitors for consecutive RPC failures (e.g., transaction
134+
rejections, timeouts) and triggers failover even when block streaming appears healthy
135+
- **Unified Client Wrapper**: All tasks (listener, miner, runtime upgrade, etc.) share the same
136+
connection pool and benefit from automatic failover
137+
138+
**Prometheus Metrics for Monitoring:**
139+
140+
- `staking_miner_endpoint_switches_total`: Total number of times the client switched endpoints
141+
- `staking_miner_miner_triggered_failovers_total`: Failovers triggered by consecutive miner failures
142+
- `staking_miner_listener_subscription_stalls_total`: Listener-detected stalls causing failover
143+
- `staking_miner_connection_timeouts_total`: Initial connection attempt timeouts
144+
119145
## Usage
120146

121147
You can check the help with:
@@ -353,6 +379,17 @@ staking_miner_updater_subscription_stalls_total 0
353379
# HELP staking_miner_block_processing_stalls_total Total number of times block processing was detected as stalled
354380
# TYPE staking_miner_block_processing_stalls_total counter
355381
staking_miner_block_processing_stalls_total 2
382+
383+
# RPC Endpoint Pool Metrics
384+
# HELP staking_miner_endpoint_switches_total Total number of times the client switched from one RPC endpoint to another
385+
# TYPE staking_miner_endpoint_switches_total counter
386+
staking_miner_endpoint_switches_total 2
387+
# HELP staking_miner_miner_triggered_failovers_total Total number of times the miner task triggered a failover due to consecutive RPC failures
388+
# TYPE staking_miner_miner_triggered_failovers_total counter
389+
staking_miner_miner_triggered_failovers_total 1
390+
# HELP staking_miner_connection_timeouts_total Total number of initial connection attempt timeouts
391+
# TYPE staking_miner_connection_timeouts_total counter
392+
staking_miner_connection_timeouts_total 0
356393
# HELP staking_miner_mining_timeouts_total Total number of solution mining timeouts
357394
# TYPE staking_miner_mining_timeouts_total counter
358395
staking_miner_mining_timeouts_total 0

src/client.rs

Lines changed: 195 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -3,80 +3,170 @@ use crate::{
33
prelude::{ChainClient, Config, LOG_TARGET},
44
prometheus,
55
};
6-
use std::{sync::Arc, time::Duration};
6+
use std::{
7+
sync::{
8+
Arc,
9+
atomic::{AtomicUsize, Ordering},
10+
},
11+
time::Duration,
12+
};
713
use subxt::backend::{
814
chain_head::{ChainHeadBackend, ChainHeadBackendBuilder},
915
rpc::reconnecting_rpc_client::{ExponentialBackoff, RpcClient as ReconnectingRpcClient},
1016
};
17+
use tokio::sync::RwLock;
1118

1219
/// Timeout for each connection attempt in seconds.
1320
/// If a connection attempt doesn't complete within this time, we retry.
1421
const CONNECTION_ATTEMPT_TIMEOUT_SECS: u64 = 30;
1522

16-
/// Maximum number of connection attempts before giving up.
23+
/// Maximum number of connection attempts per endpoint before trying the next.
1724
const MAX_CONNECTION_ATTEMPTS: u32 = 3;
1825

1926
/// Delay between connection attempts in seconds.
2027
const CONNECTION_RETRY_DELAY_SECS: u64 = 5;
2128

2229
/// Wraps the subxt interfaces to make it easy to use for the staking-miner.
30+
/// Supports multiple RPC endpoints with failover capability.
2331
#[derive(Clone, Debug)]
2432
pub struct Client {
2533
/// Access to chain APIs such as storage, events etc.
26-
chain_api: ChainClient,
34+
/// Uses interior mutability to allow runtime failover.
35+
/// Each task (listener, miner, clear_old_rounds, era_pruning) holds its own Client and when a
36+
/// failover happens (e.g. RPC endpoint dies) we need to swap out the dead ChainClient with a
37+
/// new one and all tasks must see the new connection. Instead of reconnecting in each task
38+
/// independently, we can leverage RwLock semantics to swap all tasks to use the new
39+
/// connection without explicit coordination. One writer updates the client and after that, all
40+
/// subsequent .read() returns the new client.
41+
chain_api: Arc<RwLock<ChainClient>>,
42+
/// List of RPC endpoints for failover support.
43+
endpoints: Arc<Vec<String>>,
44+
/// Current endpoint index for round-robin selection.
45+
/// When reconnecting, we start from (current_index + 1) % len to avoid wasting time
46+
/// on known-bad endpoints.
47+
current_endpoint_index: Arc<AtomicUsize>,
48+
/// Generation counter for reconnect deduplication.
49+
/// Prevents racing reconnects when multiple tasks detect failures simultaneously.
50+
/// Each successful reconnect increments this counter.
51+
reconnect_generation: Arc<AtomicUsize>,
2752
}
2853

2954
impl Client {
30-
pub async fn new(uri: &str) -> Result<Self, Error> {
31-
for attempt in 1..=MAX_CONNECTION_ATTEMPTS {
32-
log::debug!(
33-
target: LOG_TARGET,
34-
"attempting to connect to {uri:?} (attempt {attempt}/{MAX_CONNECTION_ATTEMPTS})"
35-
);
55+
/// Create a new client from a comma-separated list of RPC endpoints.
56+
///
57+
/// The client will try each endpoint in sequence until one connects successfully.
58+
/// Multiple endpoints can be specified for failover:
59+
/// "wss://rpc1.example.com,wss://rpc2.example.com"
60+
pub async fn new(uris: &str) -> Result<Self, Error> {
61+
let endpoints: Vec<String> = uris
62+
.split(',')
63+
.map(|s| s.trim().to_string())
64+
.filter(|s| !s.is_empty())
65+
.collect();
66+
67+
if endpoints.is_empty() {
68+
return Err(Error::Other("No RPC endpoints provided".into()));
69+
}
70+
71+
if endpoints.len() > 1 {
72+
log::info!(target: LOG_TARGET, "RPC endpoint pool: {} endpoint(s)", endpoints.len());
73+
}
74+
75+
let (chain_api, connected_index) = Self::connect_with_failover(&endpoints, 0).await?;
76+
77+
Ok(Self {
78+
chain_api: Arc::new(RwLock::new(chain_api)),
79+
endpoints: Arc::new(endpoints),
80+
current_endpoint_index: Arc::new(AtomicUsize::new(connected_index)),
81+
reconnect_generation: Arc::new(AtomicUsize::new(0)),
82+
})
83+
}
3684

37-
match Self::try_connect(uri).await {
38-
Ok(client) => return Ok(client),
39-
Err(e) => {
40-
if attempt == MAX_CONNECTION_ATTEMPTS {
41-
log::error!(
42-
target: LOG_TARGET,
43-
"Failed to connect after {MAX_CONNECTION_ATTEMPTS} attempts: {e:?}"
44-
);
45-
return Err(e);
46-
}
47-
log::warn!(
48-
target: LOG_TARGET,
49-
"Connection attempt {attempt}/{MAX_CONNECTION_ATTEMPTS} failed: {e:?}, \
50-
retrying in {CONNECTION_RETRY_DELAY_SECS}s..."
51-
);
52-
tokio::time::sleep(Duration::from_secs(CONNECTION_RETRY_DELAY_SECS)).await;
53-
},
85+
/// Try to connect to endpoints in round-robin sequence until one succeeds.
86+
/// Used for both initial connection and runtime failover.
87+
///
88+
/// # Arguments
89+
/// * `endpoints` - List of RPC endpoints to try
90+
/// * `start_index` - Index to start from (for round-robin selection)
91+
///
92+
/// # Returns
93+
/// * `Ok((ChainClient, usize))` - The connected client and the index of the successful endpoint
94+
/// * `Err(Error)` - If all endpoints fail
95+
async fn connect_with_failover(
96+
endpoints: &[String],
97+
start_index: usize,
98+
) -> Result<(ChainClient, usize), Error> {
99+
let mut last_error = None;
100+
let total = endpoints.len();
101+
// When pool has multiple endpoints, reduce retries to fail fast and try next endpoint
102+
let max_attempts = if total > 1 { 1 } else { MAX_CONNECTION_ATTEMPTS };
103+
104+
for i in 0..total {
105+
let idx = (start_index + i) % total;
106+
let uri = &endpoints[idx];
107+
let endpoint_num = idx + 1;
108+
109+
for attempt in 1..=max_attempts {
110+
log::debug!(
111+
target: LOG_TARGET,
112+
"attempting to connect to {uri:?} (endpoint {endpoint_num}/{total}, attempt {attempt}/{max_attempts})"
113+
);
114+
115+
match Self::try_connect(uri).await {
116+
Ok(client) => {
117+
if total > 1 {
118+
log::info!(
119+
target: LOG_TARGET,
120+
"Connected to endpoint {endpoint_num}/{total}: {uri}"
121+
);
122+
}
123+
return Ok((client, idx));
124+
},
125+
Err(e) => {
126+
last_error = Some(e);
127+
if attempt < max_attempts {
128+
log::warn!(
129+
target: LOG_TARGET,
130+
"Connection attempt {attempt}/{max_attempts} to {uri} failed, \
131+
retrying in {CONNECTION_RETRY_DELAY_SECS}s..."
132+
);
133+
tokio::time::sleep(Duration::from_secs(CONNECTION_RETRY_DELAY_SECS))
134+
.await;
135+
}
136+
},
137+
}
138+
}
139+
140+
if total > 1 {
141+
log::warn!(
142+
target: LOG_TARGET,
143+
"Failed to connect to endpoint {endpoint_num}/{total}: {uri}, trying next..."
144+
);
54145
}
55146
}
56147

57-
unreachable!("Loop should have returned or errored")
148+
log::error!(target: LOG_TARGET, "Failed to connect to any endpoint in the pool");
149+
Err(last_error.unwrap_or_else(|| Error::Other("No endpoints available".into())))
58150
}
59151

60-
async fn try_connect(uri: &str) -> Result<Self, Error> {
61-
// Wrap the entire connection process with a timeout
152+
/// Try to connect to a single endpoint with timeout.
153+
async fn try_connect(uri: &str) -> Result<ChainClient, Error> {
62154
let connect_future = async {
63-
// Create a reconnecting RPC client with exponential backoff
64-
let reconnecting_rpc =
65-
ReconnectingRpcClient::builder()
66-
.retry_policy(
67-
ExponentialBackoff::from_millis(500)
68-
.max_delay(Duration::from_secs(10))
69-
.take(3), // Fewer internal retries since we have outer retry loop
70-
)
71-
.build(uri.to_string())
72-
.await
73-
.map_err(|e| Error::Other(format!("Failed to connect: {e:?}")))?;
155+
let reconnecting_rpc = ReconnectingRpcClient::builder()
156+
.retry_policy(
157+
ExponentialBackoff::from_millis(500).max_delay(Duration::from_secs(10)).take(3),
158+
)
159+
.build(uri.to_string())
160+
.await
161+
.map_err(|e| Error::Other(format!("Failed to connect: {e:?}")))?;
74162

75163
let backend: ChainHeadBackend<Config> =
76164
ChainHeadBackendBuilder::default().build_with_background_driver(reconnecting_rpc);
77165
let chain_api = ChainClient::from_backend(Arc::new(backend)).await?;
78166

79-
Ok::<Self, Error>(Self { chain_api })
167+
log::info!(target: LOG_TARGET, "Connected to {uri} with ChainHead backend");
168+
169+
Ok::<ChainClient, Error>(chain_api)
80170
};
81171

82172
match tokio::time::timeout(
@@ -85,12 +175,7 @@ impl Client {
85175
)
86176
.await
87177
{
88-
Ok(result) => {
89-
if result.is_ok() {
90-
log::info!(target: LOG_TARGET, "Connected to {uri} with ChainHead backend");
91-
}
92-
result
93-
},
178+
Ok(result) => result,
94179
Err(_) => {
95180
prometheus::on_connection_timeout();
96181
log::warn!(
@@ -99,16 +184,75 @@ impl Client {
99184
);
100185
Err(TimeoutError::InitialConnection {
101186
timeout_secs: CONNECTION_ATTEMPT_TIMEOUT_SECS,
102-
attempt: 0, // Will be filled by caller context
187+
attempt: 0,
103188
max_attempts: MAX_CONNECTION_ATTEMPTS,
104189
}
105190
.into())
106191
},
107192
}
108193
}
109194

110-
/// Get a reference to the chain API.
111-
pub fn chain_api(&self) -> &ChainClient {
112-
&self.chain_api
195+
/// Attempt to reconnect using the endpoint pool with round-robin selection.
196+
/// This is called when subscription stalls are detected for runtime failover.
197+
/// Starts from (current_index + 1) % len to avoid retrying the currently-failed endpoint.
198+
///
199+
/// Uses a generation counter to prevent racing reconnects when multiple tasks detect
200+
/// failures simultaneously. If another task has already reconnected, this call returns
201+
/// Ok(()) without doing redundant work.
202+
pub async fn reconnect(&self) -> Result<(), Error> {
203+
// Capture generation before acquiring lock to detect racing reconnects
204+
let generation_before = self.reconnect_generation.load(Ordering::Relaxed);
205+
206+
let current_idx = self.current_endpoint_index.load(Ordering::Relaxed);
207+
let start_idx = (current_idx + 1) % self.endpoints.len();
208+
209+
log::info!(
210+
target: LOG_TARGET,
211+
"Attempting runtime failover across {} endpoint(s), starting from index {} (generation {})...",
212+
self.endpoints.len(),
213+
start_idx,
214+
generation_before
215+
);
216+
217+
// Establish new connection before acquiring write lock
218+
let (new_client, connected_idx) =
219+
Self::connect_with_failover(&self.endpoints, start_idx).await?;
220+
221+
// Acquire write lock and check if another task already reconnected
222+
let mut guard = self.chain_api.write().await;
223+
224+
// Check if generation changed while we were connecting (another task already reconnected)
225+
let generation_now = self.reconnect_generation.load(Ordering::Relaxed);
226+
if generation_now != generation_before {
227+
log::info!(
228+
target: LOG_TARGET,
229+
"Reconnect skipped - another task already reconnected (generation {generation_before} -> {generation_now})"
230+
);
231+
// Connection already updated by another task, drop our new connection
232+
return Ok(());
233+
}
234+
235+
// Update the client and generation
236+
*guard = new_client;
237+
self.current_endpoint_index.store(connected_idx, Ordering::Relaxed);
238+
self.reconnect_generation.fetch_add(1, Ordering::Relaxed);
239+
240+
// Record the endpoint switch
241+
prometheus::on_endpoint_switch();
242+
243+
log::info!(
244+
target: LOG_TARGET,
245+
"Runtime failover successful, now using endpoint {}/{} (generation {})",
246+
connected_idx + 1,
247+
self.endpoints.len(),
248+
generation_before + 1
249+
);
250+
Ok(())
251+
}
252+
253+
/// Get access to the chain API.
254+
/// Returns a read guard that must be held while using the API.
255+
pub async fn chain_api(&self) -> tokio::sync::RwLockReadGuard<'_, ChainClient> {
256+
self.chain_api.read().await
113257
}
114258
}

0 commit comments

Comments
 (0)