Skip to content

Commit ecc8da0

Browse files
committed
Add RPC endpoint pool with failover support
- Accept comma-separated URIs for multiple endpoints (--uri "ws://a,ws://b") - Add runtime failover when subscription stalls or terminates unexpectedly, swapping backend across all tasks
1 parent 17101d6 commit ecc8da0

File tree

6 files changed

+236
-79
lines changed

6 files changed

+236
-79
lines changed

src/client.rs

Lines changed: 121 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -8,75 +8,131 @@ use subxt::backend::{
88
chain_head::{ChainHeadBackend, ChainHeadBackendBuilder},
99
rpc::reconnecting_rpc_client::{ExponentialBackoff, RpcClient as ReconnectingRpcClient},
1010
};
11+
use tokio::sync::RwLock;
1112

1213
/// Timeout for each connection attempt in seconds.
1314
/// If a connection attempt doesn't complete within this time, we retry.
1415
const CONNECTION_ATTEMPT_TIMEOUT_SECS: u64 = 30;
1516

16-
/// Maximum number of connection attempts before giving up.
17+
/// Maximum number of connection attempts per endpoint before trying the next.
1718
const MAX_CONNECTION_ATTEMPTS: u32 = 3;
1819

1920
/// Delay between connection attempts in seconds.
2021
const CONNECTION_RETRY_DELAY_SECS: u64 = 5;
2122

2223
/// Wraps the subxt interfaces to make it easy to use for the staking-miner.
24+
/// Supports multiple RPC endpoints with failover capability.
2325
#[derive(Clone, Debug)]
2426
pub struct Client {
2527
/// Access to chain APIs such as storage, events etc.
26-
chain_api: ChainClient,
28+
/// Uses interior mutability to allow runtime failover.
29+
/// Each task (listenr, miner, clear_old_rounds, era_pruning) holds its own Client and when a
30+
/// failover happens (e.g. RPC endpoint dies) we need to swap out the dead ChainClient with a
31+
/// new one and all tasks must see the new connection. Instead of reconnecting in each task
32+
/// independently, we can leverage RwLock semantics to swap all tasks to use the new
33+
/// connection without explicit coordination. One writer updates the client and after that, all
34+
/// subsequent .read() returns the new client.
35+
chain_api: Arc<RwLock<ChainClient>>,
36+
/// List of RPC endpoints for failover support.
37+
endpoints: Arc<Vec<String>>,
2738
}
2839

2940
impl Client {
30-
pub async fn new(uri: &str) -> Result<Self, Error> {
31-
for attempt in 1..=MAX_CONNECTION_ATTEMPTS {
32-
log::debug!(
33-
target: LOG_TARGET,
34-
"attempting to connect to {uri:?} (attempt {attempt}/{MAX_CONNECTION_ATTEMPTS})"
35-
);
36-
37-
match Self::try_connect(uri).await {
38-
Ok(client) => return Ok(client),
39-
Err(e) => {
40-
if attempt == MAX_CONNECTION_ATTEMPTS {
41-
log::error!(
42-
target: LOG_TARGET,
43-
"Failed to connect after {MAX_CONNECTION_ATTEMPTS} attempts: {e:?}"
44-
);
45-
return Err(e);
46-
}
47-
log::warn!(
48-
target: LOG_TARGET,
49-
"Connection attempt {attempt}/{MAX_CONNECTION_ATTEMPTS} failed: {e:?}, \
50-
retrying in {CONNECTION_RETRY_DELAY_SECS}s..."
51-
);
52-
tokio::time::sleep(Duration::from_secs(CONNECTION_RETRY_DELAY_SECS)).await;
53-
},
41+
/// Create a new client from a comma-separated list of RPC endpoints.
42+
///
43+
/// The client will try each endpoint in sequence until one connects successfully.
44+
/// Multiple endpoints can be specified for failover:
45+
/// "wss://rpc1.example.com,wss://rpc2.example.com"
46+
pub async fn new(uris: &str) -> Result<Self, Error> {
47+
let endpoints: Vec<String> = uris
48+
.split(',')
49+
.map(|s| s.trim().to_string())
50+
.filter(|s| !s.is_empty())
51+
.collect();
52+
53+
if endpoints.is_empty() {
54+
return Err(Error::Other("No RPC endpoints provided".into()));
55+
}
56+
57+
if endpoints.len() > 1 {
58+
log::info!(target: LOG_TARGET, "RPC endpoint pool: {} endpoint(s)", endpoints.len());
59+
}
60+
61+
let chain_api = Self::connect_with_failover(&endpoints).await?;
62+
63+
Ok(Self { chain_api: Arc::new(RwLock::new(chain_api)), endpoints: Arc::new(endpoints) })
64+
}
65+
66+
/// Try to connect to endpoints in sequence until one succeeds.
67+
/// Used for both initial connection and runtime failover.
68+
async fn connect_with_failover(endpoints: &[String]) -> Result<ChainClient, Error> {
69+
let mut last_error = None;
70+
let total = endpoints.len();
71+
72+
for (idx, uri) in endpoints.iter().enumerate() {
73+
let endpoint_num = idx + 1;
74+
75+
for attempt in 1..=MAX_CONNECTION_ATTEMPTS {
76+
log::debug!(
77+
target: LOG_TARGET,
78+
"attempting to connect to {uri:?} (endpoint {endpoint_num}/{total}, attempt {attempt}/{MAX_CONNECTION_ATTEMPTS})"
79+
);
80+
81+
match Self::try_connect(uri).await {
82+
Ok(client) => {
83+
if total > 1 {
84+
log::info!(
85+
target: LOG_TARGET,
86+
"Connected to endpoint {endpoint_num}/{total}: {uri}"
87+
);
88+
}
89+
return Ok(client);
90+
},
91+
Err(e) => {
92+
last_error = Some(e);
93+
if attempt < MAX_CONNECTION_ATTEMPTS {
94+
log::warn!(
95+
target: LOG_TARGET,
96+
"Connection attempt {attempt}/{MAX_CONNECTION_ATTEMPTS} to {uri} failed, \
97+
retrying in {CONNECTION_RETRY_DELAY_SECS}s..."
98+
);
99+
tokio::time::sleep(Duration::from_secs(CONNECTION_RETRY_DELAY_SECS))
100+
.await;
101+
}
102+
},
103+
}
104+
}
105+
106+
if total > 1 {
107+
log::warn!(
108+
target: LOG_TARGET,
109+
"Failed to connect to endpoint {endpoint_num}/{total}: {uri}, trying next..."
110+
);
54111
}
55112
}
56113

57-
unreachable!("Loop should have returned or errored")
114+
log::error!(target: LOG_TARGET, "Failed to connect to any endpoint in the pool");
115+
Err(last_error.unwrap_or_else(|| Error::Other("No endpoints available".into())))
58116
}
59117

60-
async fn try_connect(uri: &str) -> Result<Self, Error> {
61-
// Wrap the entire connection process with a timeout
118+
/// Try to connect to a single endpoint with timeout.
119+
async fn try_connect(uri: &str) -> Result<ChainClient, Error> {
62120
let connect_future = async {
63-
// Create a reconnecting RPC client with exponential backoff
64-
let reconnecting_rpc =
65-
ReconnectingRpcClient::builder()
66-
.retry_policy(
67-
ExponentialBackoff::from_millis(500)
68-
.max_delay(Duration::from_secs(10))
69-
.take(3), // Fewer internal retries since we have outer retry loop
70-
)
71-
.build(uri.to_string())
72-
.await
73-
.map_err(|e| Error::Other(format!("Failed to connect: {e:?}")))?;
121+
let reconnecting_rpc = ReconnectingRpcClient::builder()
122+
.retry_policy(
123+
ExponentialBackoff::from_millis(500).max_delay(Duration::from_secs(10)).take(3),
124+
)
125+
.build(uri.to_string())
126+
.await
127+
.map_err(|e| Error::Other(format!("Failed to connect: {e:?}")))?;
74128

75129
let backend: ChainHeadBackend<Config> =
76130
ChainHeadBackendBuilder::default().build_with_background_driver(reconnecting_rpc);
77131
let chain_api = ChainClient::from_backend(Arc::new(backend)).await?;
78132

79-
Ok::<Self, Error>(Self { chain_api })
133+
log::info!(target: LOG_TARGET, "Connected to {uri} with ChainHead backend");
134+
135+
Ok::<ChainClient, Error>(chain_api)
80136
};
81137

82138
match tokio::time::timeout(
@@ -85,12 +141,7 @@ impl Client {
85141
)
86142
.await
87143
{
88-
Ok(result) => {
89-
if result.is_ok() {
90-
log::info!(target: LOG_TARGET, "Connected to {uri} with ChainHead backend");
91-
}
92-
result
93-
},
144+
Ok(result) => result,
94145
Err(_) => {
95146
prometheus::on_connection_timeout();
96147
log::warn!(
@@ -99,16 +150,35 @@ impl Client {
99150
);
100151
Err(TimeoutError::InitialConnection {
101152
timeout_secs: CONNECTION_ATTEMPT_TIMEOUT_SECS,
102-
attempt: 0, // Will be filled by caller context
153+
attempt: 0,
103154
max_attempts: MAX_CONNECTION_ATTEMPTS,
104155
}
105156
.into())
106157
},
107158
}
108159
}
109160

110-
/// Get a reference to the chain API.
111-
pub fn chain_api(&self) -> &ChainClient {
112-
&self.chain_api
161+
/// Attempt to reconnect using the endpoint pool.
162+
/// This is called when subscription stalls are detected for runtime failover.
163+
pub async fn reconnect(&self) -> Result<(), Error> {
164+
log::info!(
165+
target: LOG_TARGET,
166+
"Attempting runtime failover across {} endpoint(s)...",
167+
self.endpoints.len()
168+
);
169+
170+
let new_client = Self::connect_with_failover(&self.endpoints).await?;
171+
172+
let mut guard = self.chain_api.write().await;
173+
*guard = new_client;
174+
175+
log::info!(target: LOG_TARGET, "Runtime failover successful");
176+
Ok(())
177+
}
178+
179+
/// Get access to the chain API.
180+
/// Returns a read guard that must be held while using the API.
181+
pub async fn chain_api(&self) -> tokio::sync::RwLockReadGuard<'_, ChainClient> {
182+
self.chain_api.read().await
113183
}
114184
}

0 commit comments

Comments
 (0)