Skip to content

Commit 45a649b

Browse files
authored
Add connection timeout, TCP keepalive, and pool health features (#283)
* feat: add connection timeout, TCP keepalive, and pool health features - Add connection_timeout (default 30s) wrapping TcpStream::connect and recv - Add tcp_keepalive (default 60s) via socket2 on new connections - Add idle_timeout and max_lifetime config options wired to deadpool - Add 5s timeout on pool recycle() to prevent broken connection poisoning - Add ConnectionTimedOut error variant Fixes indefinite hangs when TCP connections become half-open (container restarts, network interruptions), which previously exhausted the deadpool connection pool permanently. * test/docs: add tests and documentation for timeout/keepalive features - Add 7 unit tests covering config defaults, custom timeouts, pool creation, and error display - Update lib.rs Configurations doc section with new options - All 290 tests pass * style: `cargo fmt` * fix: change function definitions to requested * fix: update MSRV lockfiles and add missing Config fields in tests Rebase onto upstream/main which includes MSRV pin updates (#284). Re-ran `cargo xtask msrv min` to regenerate lockfiles and added missing timeout/keepalive fields to Config initializers in tests.
1 parent 2c55357 commit 45a649b

12 files changed

Lines changed: 306 additions & 24 deletions

File tree

ci/Cargo.lock.min

Lines changed: 18 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ci/Cargo.lock.msrv

Lines changed: 18 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ features = ["std", "serde"]
6161
[dependencies.deadpool]
6262
version = "0.12.0" # 0.13 requires bumping MSRV to 1.85
6363
default-features = false
64-
features = ["managed"]
64+
features = ["managed", "rt_tokio_1"]
6565

6666
[dependencies.rustls]
6767
version = "0.23.29"
@@ -77,6 +77,10 @@ version = "0.26.0"
7777
default-features = false
7878
features = ["tls12", "ring"]
7979

80+
[dependencies.socket2]
81+
version = "0.5"
82+
features = ["all"]
83+
8084
[dev-dependencies]
8185
pretty_env_logger = "0.5.0"
8286
serde_bytes = "0.11.0"

lib/src/config.rs

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ use crate::errors::{Error, Result};
33
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
44
use serde::{Deserialize, Deserializer, Serialize};
55
use std::path::Path;
6-
use std::{ops::Deref, sync::Arc};
6+
use std::{ops::Deref, sync::Arc, time::Duration};
77

88
const DEFAULT_FETCH_SIZE: usize = 200;
99
const DEFAULT_MAX_CONNECTIONS: usize = 16;
10+
const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
11+
const DEFAULT_TCP_KEEPALIVE: Option<Duration> = Some(Duration::from_secs(60));
1012

1113
/// Newtype for the name of the database.
1214
/// Stores the name as an `Arc<str>` to avoid cloning the name around.
@@ -131,6 +133,14 @@ pub struct Config {
131133
pub(crate) fetch_size: usize,
132134
pub(crate) imp_user: Option<ImpersonateUser>,
133135
pub(crate) tls_config: ConnectionTLSConfig,
136+
/// Timeout for establishing a new connection and for read operations.
137+
pub(crate) connection_timeout: Duration,
138+
/// TCP keepalive interval. If `Some`, TCP keepalive is enabled on the socket.
139+
pub(crate) tcp_keepalive: Option<Duration>,
140+
/// Maximum idle time for a connection in the pool before it is discarded.
141+
pub(crate) idle_timeout: Option<Duration>,
142+
/// Maximum lifetime of a connection in the pool before it is discarded.
143+
pub(crate) max_lifetime: Option<Duration>,
134144
}
135145

136146
impl Config {
@@ -153,6 +163,10 @@ pub struct ConfigBuilder {
153163
max_connections: usize,
154164
imp_user: Option<ImpersonateUser>,
155165
tls_config: ConnectionTLSConfig,
166+
connection_timeout: Duration,
167+
tcp_keepalive: Option<Duration>,
168+
idle_timeout: Option<Duration>,
169+
max_lifetime: Option<Duration>,
156170
}
157171

158172
impl ConfigBuilder {
@@ -240,6 +254,39 @@ impl ConfigBuilder {
240254
self
241255
}
242256

257+
/// The timeout for establishing a new connection and for read operations.
258+
///
259+
/// Defaults to 30 seconds if not set.
260+
pub fn connection_timeout(mut self, timeout: Duration) -> Self {
261+
self.connection_timeout = timeout;
262+
self
263+
}
264+
265+
/// The TCP keepalive interval. Set to `Some(duration)` to enable TCP keepalive
266+
/// on the underlying socket, or `None` to disable it.
267+
///
268+
/// Defaults to `Some(60 seconds)` if not set.
269+
pub fn tcp_keepalive(mut self, interval: impl Into<Option<Duration>>) -> Self {
270+
self.tcp_keepalive = interval.into();
271+
self
272+
}
273+
274+
/// The maximum idle time for a connection in the pool before it is discarded.
275+
///
276+
/// Defaults to `None` (no idle timeout) if not set.
277+
pub fn idle_timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
278+
self.idle_timeout = timeout.into();
279+
self
280+
}
281+
282+
/// The maximum lifetime of a connection in the pool before it is discarded.
283+
///
284+
/// Defaults to `None` (no maximum lifetime) if not set.
285+
pub fn max_lifetime(mut self, lifetime: impl Into<Option<Duration>>) -> Self {
286+
self.max_lifetime = lifetime.into();
287+
self
288+
}
289+
243290
pub fn build(self) -> Result<Config> {
244291
if let (Some(uri), Some(user), Some(password)) = (self.uri, self.user, self.password) {
245292
Ok(Config {
@@ -251,6 +298,10 @@ impl ConfigBuilder {
251298
db: self.db,
252299
imp_user: self.imp_user,
253300
tls_config: self.tls_config,
301+
connection_timeout: self.connection_timeout,
302+
tcp_keepalive: self.tcp_keepalive,
303+
idle_timeout: self.idle_timeout,
304+
max_lifetime: self.max_lifetime,
254305
})
255306
} else {
256307
Err(Error::InvalidConfig)
@@ -269,6 +320,10 @@ impl Default for ConfigBuilder {
269320
imp_user: None,
270321
fetch_size: DEFAULT_FETCH_SIZE,
271322
tls_config: ConnectionTLSConfig::None,
323+
connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
324+
tcp_keepalive: DEFAULT_TCP_KEEPALIVE,
325+
idle_timeout: None,
326+
max_lifetime: None,
272327
}
273328
}
274329
}
@@ -332,6 +387,64 @@ mod tests {
332387
assert_eq!(config.tls_config, ConnectionTLSConfig::NoSSLValidation);
333388
}
334389

390+
#[test]
391+
fn should_build_with_timeout_defaults() {
392+
let config = ConfigBuilder::default()
393+
.uri("127.0.0.1:7687")
394+
.user("some_user")
395+
.password("some_password")
396+
.build()
397+
.unwrap();
398+
assert_eq!(config.connection_timeout, Duration::from_secs(30));
399+
assert_eq!(config.tcp_keepalive, Some(Duration::from_secs(60)));
400+
assert_eq!(config.idle_timeout, None);
401+
assert_eq!(config.max_lifetime, None);
402+
}
403+
404+
#[test]
405+
fn should_build_with_custom_timeouts() {
406+
let config = ConfigBuilder::default()
407+
.uri("127.0.0.1:7687")
408+
.user("some_user")
409+
.password("some_password")
410+
.connection_timeout(Duration::from_secs(10))
411+
.tcp_keepalive(Some(Duration::from_secs(120)))
412+
.idle_timeout(Some(Duration::from_secs(300)))
413+
.max_lifetime(Some(Duration::from_secs(3600)))
414+
.build()
415+
.unwrap();
416+
assert_eq!(config.connection_timeout, Duration::from_secs(10));
417+
assert_eq!(config.tcp_keepalive, Some(Duration::from_secs(120)));
418+
assert_eq!(config.idle_timeout, Some(Duration::from_secs(300)));
419+
assert_eq!(config.max_lifetime, Some(Duration::from_secs(3600)));
420+
}
421+
422+
#[test]
423+
fn should_disable_tcp_keepalive() {
424+
let config = ConfigBuilder::default()
425+
.uri("127.0.0.1:7687")
426+
.user("some_user")
427+
.password("some_password")
428+
.tcp_keepalive(None)
429+
.build()
430+
.unwrap();
431+
assert_eq!(config.tcp_keepalive, None);
432+
}
433+
434+
#[test]
435+
fn should_set_idle_and_max_lifetime() {
436+
let config = ConfigBuilder::default()
437+
.uri("127.0.0.1:7687")
438+
.user("some_user")
439+
.password("some_password")
440+
.idle_timeout(Some(Duration::from_secs(600)))
441+
.max_lifetime(Some(Duration::from_secs(1800)))
442+
.build()
443+
.unwrap();
444+
assert_eq!(config.idle_timeout, Some(Duration::from_secs(600)));
445+
assert_eq!(config.max_lifetime, Some(Duration::from_secs(1800)));
446+
}
447+
335448
#[test]
336449
fn should_reject_invalid_config() {
337450
assert!(ConfigBuilder::default()

0 commit comments

Comments
 (0)