Skip to content

Commit 167caf5

Browse files
authored
feat(ws): retry mechanism in WsConnect (#2303)
* feat(`ws`): retry mechanism in WsConnect * wasm
1 parent e3e6f7f commit 167caf5

File tree

4 files changed

+90
-19
lines changed

4 files changed

+90
-19
lines changed

Diff for: crates/pubsub/src/handle.rs

+34-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
use alloy_json_rpc::PubSubItem;
22
use serde_json::value::RawValue;
3-
use tokio::sync::{
4-
mpsc,
5-
oneshot::{self, error::TryRecvError},
3+
use tokio::{
4+
sync::{
5+
mpsc,
6+
oneshot::{self, error::TryRecvError},
7+
},
8+
time::Duration,
69
};
710

811
/// A handle to a backend. Communicates to a `ConnectionInterface` on the
@@ -23,6 +26,13 @@ pub struct ConnectionHandle {
2326

2427
/// Notify the backend of intentional shutdown.
2528
pub(crate) shutdown: oneshot::Sender<()>,
29+
30+
/// Max number of retries before failing and exiting the connection.
31+
/// Default is 10.
32+
pub(crate) max_retries: u32,
33+
/// The interval between retries.
34+
/// Default is 3 seconds.
35+
pub(crate) retry_interval: Duration,
2636
}
2737

2838
impl ConnectionHandle {
@@ -33,7 +43,14 @@ impl ConnectionHandle {
3343
let (error_tx, error_rx) = oneshot::channel();
3444
let (shutdown_tx, shutdown_rx) = oneshot::channel();
3545

36-
let handle = Self { to_socket, from_socket, error: error_rx, shutdown: shutdown_tx };
46+
let handle = Self {
47+
to_socket,
48+
from_socket,
49+
error: error_rx,
50+
shutdown: shutdown_tx,
51+
max_retries: 10,
52+
retry_interval: Duration::from_secs(3),
53+
};
3754
let interface = ConnectionInterface {
3855
from_frontend,
3956
to_frontend,
@@ -43,6 +60,19 @@ impl ConnectionHandle {
4360
(handle, interface)
4461
}
4562

63+
/// Set the max number of retries before failing and exiting the connection.
64+
/// Default is 10.
65+
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
66+
self.max_retries = max_retries;
67+
self
68+
}
69+
70+
/// Set the interval between retries.
71+
pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
72+
self.retry_interval = retry_interval;
73+
self
74+
}
75+
4676
/// Shutdown the backend.
4777
pub fn shutdown(self) {
4878
let _ = self.shutdown.send(());

Diff for: crates/pubsub/src/service.rs

+5-11
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,6 @@ pub(crate) struct PubSubService<T> {
3737

3838
/// The request manager.
3939
pub(crate) in_flights: RequestManager,
40-
41-
/// Number of retries. Default is 10.
42-
///
43-
/// Every retry is made at an interval of 3 seconds.
44-
pub(crate) retries: u32,
4540
}
4641

4742
impl<T: PubSubConnect> PubSubService<T> {
@@ -56,7 +51,6 @@ impl<T: PubSubConnect> PubSubService<T> {
5651
reqs,
5752
subs: SubscriptionManager::default(),
5853
in_flights: Default::default(),
59-
retries: 10,
6054
};
6155
this.spawn();
6256
Ok(PubSubFrontend::new(tx))
@@ -205,7 +199,8 @@ impl<T: PubSubConnect> PubSubService<T> {
205199
/// Attempt to reconnect with retries
206200
async fn reconnect_with_retries(&mut self) -> TransportResult<()> {
207201
let mut retry_count = 0;
208-
let max_retries = self.retries;
202+
let max_retries = self.handle.max_retries;
203+
let interval = self.handle.retry_interval;
209204
loop {
210205
match self.reconnect().await {
211206
Ok(()) => break Ok(()),
@@ -215,13 +210,12 @@ impl<T: PubSubConnect> PubSubService<T> {
215210
error!("Reconnect failed after {max_retries} attempts, shutting down: {e}");
216211
break Err(e);
217212
}
218-
let duration = std::time::Duration::from_secs(3);
219213
warn!(
220214
"Reconnection attempt {retry_count}/{max_retries} failed: {e}. \
221-
Retrying in {:.3}s...",
222-
duration.as_secs_f64(),
215+
Retrying in {:?}s...",
216+
interval.as_secs_f64(),
223217
);
224-
sleep(duration).await;
218+
sleep(interval).await;
225219
}
226220
}
227221
}

Diff for: crates/transport-ws/src/native.rs

+28-2
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,24 @@ pub struct WsConnect {
2525
pub auth: Option<Authorization>,
2626
/// The websocket config.
2727
pub config: Option<WebSocketConfig>,
28+
/// Max number of retries before failing and exiting the connection.
29+
/// Default is 10.
30+
max_retries: u32,
31+
/// The interval between retries.
32+
/// Default is 3 seconds.
33+
retry_interval: Duration,
2834
}
2935

3036
impl WsConnect {
3137
/// Creates a new websocket connection configuration.
3238
pub fn new<S: Into<String>>(url: S) -> Self {
33-
Self { url: url.into(), auth: None, config: None }
39+
Self {
40+
url: url.into(),
41+
auth: None,
42+
config: None,
43+
max_retries: 10,
44+
retry_interval: Duration::from_secs(3),
45+
}
3446
}
3547

3648
/// Sets the authorization header.
@@ -44,6 +56,20 @@ impl WsConnect {
4456
self.config = Some(config);
4557
self
4658
}
59+
60+
/// Sets the max number of retries before failing and exiting the connection.
61+
/// Default is 10.
62+
pub const fn with_max_retries(mut self, max_retries: u32) -> Self {
63+
self.max_retries = max_retries;
64+
self
65+
}
66+
67+
/// Sets the interval between retries.
68+
/// Default is 3 seconds.
69+
pub const fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
70+
self.retry_interval = retry_interval;
71+
self
72+
}
4773
}
4874

4975
impl IntoClientRequest for WsConnect {
@@ -77,7 +103,7 @@ impl PubSubConnect for WsConnect {
77103

78104
backend.spawn();
79105

80-
Ok(handle)
106+
Ok(handle.with_max_retries(self.max_retries).with_retry_interval(self.retry_interval))
81107
}
82108
}
83109

Diff for: crates/transport-ws/src/wasm.rs

+23-2
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,40 @@ use futures::{
66
stream::{Fuse, StreamExt},
77
};
88
use serde_json::value::RawValue;
9+
use std::time::Duration;
910
use ws_stream_wasm::{WsErr, WsMessage, WsMeta, WsStream};
1011

1112
/// Simple connection info for the websocket.
1213
#[derive(Clone, Debug)]
1314
pub struct WsConnect {
1415
/// The URL to connect to.
1516
pub url: String,
17+
/// Max number of retries before failing and exiting the connection.
18+
/// Default is 10.
19+
max_retries: u32,
20+
/// The interval between retries.
21+
/// Default is 3 seconds.
22+
retry_interval: Duration,
1623
}
1724

1825
impl WsConnect {
1926
/// Creates a new websocket connection configuration.
2027
pub fn new<S: Into<String>>(url: S) -> Self {
21-
Self { url: url.into() }
28+
Self { url: url.into(), max_retries: 10, retry_interval: Duration::from_secs(3) }
29+
}
30+
31+
/// Sets the max number of retries before failing and exiting the connection.
32+
/// Default is 10.
33+
pub const fn with_max_retries(mut self, max_retries: u32) -> Self {
34+
self.max_retries = max_retries;
35+
self
36+
}
37+
38+
/// Sets the interval between retries.
39+
/// Default is 3 seconds.
40+
pub const fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
41+
self.retry_interval = retry_interval;
42+
self
2243
}
2344
}
2445

@@ -36,7 +57,7 @@ impl PubSubConnect for WsConnect {
3657

3758
backend.spawn();
3859

39-
Ok(handle)
60+
Ok(handle.with_max_retries(self.max_retries).with_retry_interval(self.retry_interval))
4061
}
4162
}
4263

0 commit comments

Comments
 (0)