Skip to content

Commit 17101d6

Browse files
authored
Add Timeout and Retry Logic for Initial Connection (#1206)
The staking miner can hang indefinitely during the initial connection attempt. When this happens, only this log appears: ``` attempting to connect to "wss://polkadot-asset-hub-rpc.polkadot.io" ``` And the process never progresses or errors out. This is problematic in production because: 1. The miner appears to be running but is actually stuck 2. No alerts are triggered (prometheus endpoint is not yet started) 3. The only remedy is to manually detect the stall and restart The `ReconnectingRpcClient` has retry logic (10 attempts with exponential backoff), but if each individual connection attempt hangs internally, the retry logic never gets a chance to kick in. Add timeout and retry logic to `Client::new()`, following the existing patterns used in `monitor.rs` for stall detection.
1 parent 7d56771 commit 17101d6

File tree

4 files changed

+116
-26
lines changed

4 files changed

+116
-26
lines changed

src/client.rs

Lines changed: 93 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,24 @@
1-
use crate::prelude::{ChainClient, Config, LOG_TARGET};
1+
use crate::{
2+
error::{Error, TimeoutError},
3+
prelude::{ChainClient, Config, LOG_TARGET},
4+
prometheus,
5+
};
26
use std::{sync::Arc, time::Duration};
37
use subxt::backend::{
48
chain_head::{ChainHeadBackend, ChainHeadBackendBuilder},
59
rpc::reconnecting_rpc_client::{ExponentialBackoff, RpcClient as ReconnectingRpcClient},
610
};
711

12+
/// Timeout for each connection attempt in seconds.
13+
/// If a connection attempt doesn't complete within this time, we retry.
14+
const CONNECTION_ATTEMPT_TIMEOUT_SECS: u64 = 30;
15+
16+
/// Maximum number of connection attempts before giving up.
17+
const MAX_CONNECTION_ATTEMPTS: u32 = 3;
18+
19+
/// Delay between connection attempts in seconds.
20+
const CONNECTION_RETRY_DELAY_SECS: u64 = 5;
21+
822
/// Wraps the subxt interfaces to make it easy to use for the staking-miner.
923
#[derive(Clone, Debug)]
1024
pub struct Client {
@@ -13,28 +27,84 @@ pub struct Client {
1327
}
1428

1529
impl Client {
16-
pub async fn new(uri: &str) -> Result<Self, subxt::Error> {
17-
log::debug!(target: LOG_TARGET, "attempting to connect to {uri:?}");
18-
19-
// Create a reconnecting RPC client with exponential backoff
20-
let reconnecting_rpc =
21-
ReconnectingRpcClient::builder()
22-
.retry_policy(
23-
ExponentialBackoff::from_millis(500)
24-
.max_delay(Duration::from_secs(30))
25-
.take(10), // Allow up to 10 retry attempts before giving up
26-
)
27-
.build(uri.to_string())
28-
.await
29-
.map_err(|e| subxt::Error::Other(format!("Failed to connect: {e:?}")))?;
30-
31-
let backend: ChainHeadBackend<Config> =
32-
ChainHeadBackendBuilder::default().build_with_background_driver(reconnecting_rpc);
33-
let chain_api = ChainClient::from_backend(Arc::new(backend)).await?;
34-
35-
log::info!(target: LOG_TARGET, "Connected to {uri} with ChainHead backend");
36-
37-
Ok(Self { chain_api })
30+
pub async fn new(uri: &str) -> Result<Self, Error> {
31+
for attempt in 1..=MAX_CONNECTION_ATTEMPTS {
32+
log::debug!(
33+
target: LOG_TARGET,
34+
"attempting to connect to {uri:?} (attempt {attempt}/{MAX_CONNECTION_ATTEMPTS})"
35+
);
36+
37+
match Self::try_connect(uri).await {
38+
Ok(client) => return Ok(client),
39+
Err(e) => {
40+
if attempt == MAX_CONNECTION_ATTEMPTS {
41+
log::error!(
42+
target: LOG_TARGET,
43+
"Failed to connect after {MAX_CONNECTION_ATTEMPTS} attempts: {e:?}"
44+
);
45+
return Err(e);
46+
}
47+
log::warn!(
48+
target: LOG_TARGET,
49+
"Connection attempt {attempt}/{MAX_CONNECTION_ATTEMPTS} failed: {e:?}, \
50+
retrying in {CONNECTION_RETRY_DELAY_SECS}s..."
51+
);
52+
tokio::time::sleep(Duration::from_secs(CONNECTION_RETRY_DELAY_SECS)).await;
53+
},
54+
}
55+
}
56+
57+
unreachable!("Loop should have returned or errored")
58+
}
59+
60+
async fn try_connect(uri: &str) -> Result<Self, Error> {
61+
// Wrap the entire connection process with a timeout
62+
let connect_future = async {
63+
// Create a reconnecting RPC client with exponential backoff
64+
let reconnecting_rpc =
65+
ReconnectingRpcClient::builder()
66+
.retry_policy(
67+
ExponentialBackoff::from_millis(500)
68+
.max_delay(Duration::from_secs(10))
69+
.take(3), // Fewer internal retries since we have outer retry loop
70+
)
71+
.build(uri.to_string())
72+
.await
73+
.map_err(|e| Error::Other(format!("Failed to connect: {e:?}")))?;
74+
75+
let backend: ChainHeadBackend<Config> =
76+
ChainHeadBackendBuilder::default().build_with_background_driver(reconnecting_rpc);
77+
let chain_api = ChainClient::from_backend(Arc::new(backend)).await?;
78+
79+
Ok::<Self, Error>(Self { chain_api })
80+
};
81+
82+
match tokio::time::timeout(
83+
Duration::from_secs(CONNECTION_ATTEMPT_TIMEOUT_SECS),
84+
connect_future,
85+
)
86+
.await
87+
{
88+
Ok(result) => {
89+
if result.is_ok() {
90+
log::info!(target: LOG_TARGET, "Connected to {uri} with ChainHead backend");
91+
}
92+
result
93+
},
94+
Err(_) => {
95+
prometheus::on_connection_timeout();
96+
log::warn!(
97+
target: LOG_TARGET,
98+
"Connection attempt timed out after {CONNECTION_ATTEMPT_TIMEOUT_SECS}s"
99+
);
100+
Err(TimeoutError::InitialConnection {
101+
timeout_secs: CONNECTION_ATTEMPT_TIMEOUT_SECS,
102+
attempt: 0, // Will be filled by caller context
103+
max_attempts: MAX_CONNECTION_ATTEMPTS,
104+
}
105+
.into())
106+
},
107+
}
38108
}
39109

40110
/// Get a reference to the chain API.

src/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ pub enum TimeoutError {
3030
ScoreCheck { timeout_secs: u64 },
3131
#[error("Missing pages submission timed out after {timeout_secs} seconds")]
3232
MissingPages { timeout_secs: u64 },
33+
#[error(
34+
"Initial connection timed out after {timeout_secs} seconds (attempt {attempt}/{max_attempts})"
35+
)]
36+
InitialConnection { timeout_secs: u64, attempt: u32, max_attempts: u32 },
3337
}
3438

3539
#[derive(thiserror::Error, Debug)]

src/main.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,13 @@ async fn main() -> Result<(), Error> {
9393
let filter = EnvFilter::from_default_env().add_directive(log.parse()?);
9494
tracing_subscriber::fmt().with_env_filter(filter).init();
9595

96+
// Start prometheus endpoint early so metrics are available during connection attempts.
97+
if let Err(e) = prometheus::run(prometheus_port).await {
98+
log::warn!("Failed to start prometheus endpoint: {e}");
99+
}
100+
// Initialize the timestamp so that if connection hangs, the stall detection alert can fire.
101+
prometheus::set_last_block_processing_time();
102+
96103
let client = Client::new(&uri).await?;
97104

98105
let version_bytes = client
@@ -106,9 +113,6 @@ async fn main() -> Result<(), Error> {
106113
Decode::decode(&mut &version_bytes[..])?;
107114

108115
let chain = opt::Chain::try_from(&runtime_version)?;
109-
if let Err(e) = prometheus::run(prometheus_port).await {
110-
log::warn!("Failed to start prometheus endpoint: {e}");
111-
}
112116
log::info!(target: LOG_TARGET, "Connected to chain: {chain}");
113117

114118
SHARED_CLIENT.set(client.clone()).expect("shared client only set once; qed");

src/prometheus.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,14 @@ mod hidden {
391391
.unwrap()
392392
});
393393

394+
static CONNECTION_TIMEOUTS: Lazy<Counter> = Lazy::new(|| {
395+
register_counter!(opts!(
396+
"staking_miner_connection_timeouts_total",
397+
"Total number of initial connection attempt timeouts"
398+
))
399+
.unwrap()
400+
});
401+
394402
pub fn on_runtime_upgrade() {
395403
RUNTIME_UPGRADES.inc();
396404
}
@@ -526,4 +534,8 @@ mod hidden {
526534
pub fn on_era_pruning_timeout() {
527535
ERA_PRUNING_TIMEOUTS.inc();
528536
}
537+
538+
pub fn on_connection_timeout() {
539+
CONNECTION_TIMEOUTS.inc();
540+
}
529541
}

0 commit comments

Comments
 (0)