Skip to content

Commit e39dc5a

Browse files
committed
fix: Prevent SSH idle disconnects via proper keepalive wiring
Idle SSH sessions were disconnecting inconsistently - sometimes after a few minutes, sometimes after ~10 minutes. Three underlying issues: 1. russh's default Config::inactivity_timeout is 10 minutes and was inherited verbatim by to_russh_config(), imposing a hard ceiling on every session regardless of keepalive liveness. Now set to None when keepalive is enabled, so the keepalive mechanism alone decides when a peer is dead. 2. No TCP-level SO_KEEPALIVE was set on the underlying socket, so NAT and stateful firewall conntrack entries could expire silently between SSH keepalive packets. connect_with_config now builds the TcpStream manually, applies socket2::TcpKeepalive derived from the SSH keepalive config, and hands it to russh::client::connect_stream. 3. The exec-mode path threaded an SshConnectionConfig through the executor but dropped it at ConnectionConfig (the field was marked dead_code), so user-configured server_alive_interval never reached Client::connect_with_ssh_config. The field is now live and flows through connect_direct / connect_via_jump_hosts / the jump chain. Adds socket2 as a direct dependency (already transitive via tokio).
1 parent 293217f commit e39dc5a

9 files changed

Lines changed: 140 additions & 13 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ lru = "0.16.2"
6161
uuid = { version = "1.23.0", features = ["v4"] }
6262
fastrand = "2.3.0"
6363
tokio-util = "0.7.17"
64+
socket2 = "0.6"
6465
shell-words = "1.1.1"
6566
libc = "0.2"
6667
ipnetwork = "0.21"

src/executor/connection_manager.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,8 @@ pub(crate) struct ExecutionConfig<'a> {
4242
pub sudo_password: Option<Arc<SudoPassword>>,
4343
pub ssh_config: Option<&'a SshConfig>,
4444
/// SSH connection configuration (keepalive settings).
45-
/// Note: This field is currently passed through the executor for future use.
46-
/// Keepalive is applied at the Client::connect_with_ssh_config level.
47-
#[allow(dead_code)]
45+
/// Threaded through to `Client::connect_with_ssh_config` so user-configured
46+
/// `server_alive_interval` / `server_alive_count_max` apply to exec mode.
4847
pub ssh_connection_config: Option<&'a SshConnectionConfig>,
4948
}
5049

@@ -82,6 +81,7 @@ pub(crate) async fn execute_on_node_with_jump_hosts(
8281
timeout_seconds: config.timeout,
8382
connect_timeout_seconds: config.connect_timeout,
8483
jump_hosts_spec: effective_jump_hosts,
84+
ssh_connection_config: config.ssh_connection_config,
8585
};
8686

8787
// If sudo password is provided, use streaming execution to handle prompts

src/executor/parallel.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,6 +1132,7 @@ impl ParallelExecutor {
11321132
let jump_hosts = self.jump_hosts.clone();
11331133
let sudo_password = self.sudo_password.clone();
11341134
let semaphore = Arc::clone(&semaphore);
1135+
let ssh_connection_config = self.ssh_connection_config.clone();
11351136

11361137
let handle = tokio::spawn(async move {
11371138
// Use defer pattern to ensure cleanup even on panic
@@ -1177,6 +1178,7 @@ impl ParallelExecutor {
11771178
timeout_seconds: timeout,
11781179
connect_timeout_seconds: connect_timeout,
11791180
jump_hosts_spec: jump_hosts.as_deref(),
1181+
ssh_connection_config: Some(&ssh_connection_config),
11801182
};
11811183

11821184
// Execute with or without sudo password support

src/ssh/client/command.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ impl SshClient {
6262
timeout_seconds,
6363
connect_timeout_seconds: None, // Use default
6464
jump_hosts_spec: None, // No jump hosts
65+
ssh_connection_config: None,
6566
};
6667

6768
self.connect_and_execute_with_jump_hosts(command, &config)
@@ -101,6 +102,7 @@ impl SshClient {
101102
config.use_agent,
102103
config.use_password,
103104
config.connect_timeout_seconds,
105+
config.ssh_connection_config,
104106
)
105107
.await?;
106108

@@ -211,6 +213,7 @@ impl SshClient {
211213
config.use_agent,
212214
config.use_password,
213215
config.connect_timeout_seconds,
216+
config.ssh_connection_config,
214217
)
215218
.await?;
216219

@@ -322,6 +325,7 @@ impl SshClient {
322325
config.use_agent,
323326
config.use_password,
324327
config.connect_timeout_seconds,
328+
config.ssh_connection_config,
325329
)
326330
.await?;
327331

src/ssh/client/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use crate::ssh::known_hosts::StrictHostKeyChecking;
16+
use crate::ssh::tokio_client::SshConnectionConfig;
1617
use std::path::Path;
1718

1819
/// Configuration for SSH connection and command execution
@@ -27,4 +28,6 @@ pub struct ConnectionConfig<'a> {
2728
pub timeout_seconds: Option<u64>,
2829
pub connect_timeout_seconds: Option<u64>,
2930
pub jump_hosts_spec: Option<&'a str>,
31+
/// SSH keepalive / inactivity settings. `None` falls back to defaults.
32+
pub ssh_connection_config: Option<&'a SshConnectionConfig>,
3033
}

src/ssh/client/connection.rs

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use super::core::SshClient;
1616
use crate::jump::{parse_jump_hosts, JumpHostChain};
1717
use crate::ssh::known_hosts::StrictHostKeyChecking;
18-
use crate::ssh::tokio_client::{AuthMethod, Client};
18+
use crate::ssh::tokio_client::{AuthMethod, Client, SshConnectionConfig};
1919
use anyhow::{Context, Result};
2020
use std::path::Path;
2121
use std::time::Duration;
@@ -65,6 +65,7 @@ impl SshClient {
6565
auth_method: &AuthMethod,
6666
strict_mode: StrictHostKeyChecking,
6767
connect_timeout_seconds: Option<u64>,
68+
ssh_connection_config: Option<&SshConnectionConfig>,
6869
) -> Result<Client> {
6970
// SECURITY: Add rate limiting before connection attempts
7071
const RATE_LIMIT_DELAY: Duration = Duration::from_millis(100);
@@ -79,9 +80,24 @@ impl SshClient {
7980
let connect_timeout =
8081
Duration::from_secs(connect_timeout_seconds.unwrap_or(SSH_CONNECT_TIMEOUT_SECS));
8182

83+
let default_conn_cfg;
84+
let conn_cfg = match ssh_connection_config {
85+
Some(c) => c,
86+
None => {
87+
default_conn_cfg = SshConnectionConfig::default();
88+
&default_conn_cfg
89+
}
90+
};
91+
8292
let result = match tokio::time::timeout(
8393
connect_timeout,
84-
Client::connect(addr, &self.username, auth_method.clone(), check_method),
94+
Client::connect_with_ssh_config(
95+
addr,
96+
&self.username,
97+
auth_method.clone(),
98+
check_method,
99+
conn_cfg,
100+
),
85101
)
86102
.await
87103
{
@@ -148,13 +164,17 @@ impl SshClient {
148164
use_agent: bool,
149165
use_password: bool,
150166
connect_timeout_seconds: Option<u64>,
167+
ssh_connection_config: Option<&SshConnectionConfig>,
151168
) -> Result<Client> {
152169
// Create jump host chain with user-specified or default connect timeout
153170
let connect_timeout =
154171
Duration::from_secs(connect_timeout_seconds.unwrap_or(SSH_CONNECT_TIMEOUT_SECS));
155-
let chain = JumpHostChain::new(jump_hosts.to_vec())
172+
let mut chain = JumpHostChain::new(jump_hosts.to_vec())
156173
.with_connect_timeout(connect_timeout)
157174
.with_command_timeout(Duration::from_secs(300));
175+
if let Some(cfg) = ssh_connection_config {
176+
chain = chain.with_ssh_connection_config(cfg.clone());
177+
}
158178

159179
// Connect through the chain
160180
let connection = chain
@@ -195,6 +215,7 @@ impl SshClient {
195215
use_agent: bool,
196216
use_password: bool,
197217
connect_timeout_seconds: Option<u64>,
218+
ssh_connection_config: Option<&SshConnectionConfig>,
198219
) -> Result<Client> {
199220
if let Some(jump_spec) = jump_hosts_spec {
200221
// Parse jump hosts
@@ -204,8 +225,13 @@ impl SshClient {
204225

205226
if jump_hosts.is_empty() {
206227
tracing::debug!("No valid jump hosts found, using direct connection");
207-
self.connect_direct(auth_method, strict_mode, connect_timeout_seconds)
208-
.await
228+
self.connect_direct(
229+
auth_method,
230+
strict_mode,
231+
connect_timeout_seconds,
232+
ssh_connection_config,
233+
)
234+
.await
209235
} else {
210236
tracing::info!(
211237
"Connecting to {}:{} via {} jump host(s): {}",
@@ -227,14 +253,20 @@ impl SshClient {
227253
use_agent,
228254
use_password,
229255
connect_timeout_seconds,
256+
ssh_connection_config,
230257
)
231258
.await
232259
}
233260
} else {
234261
// Direct connection
235262
tracing::debug!("Using direct connection (no jump hosts)");
236-
self.connect_direct(auth_method, strict_mode, connect_timeout_seconds)
237-
.await
263+
self.connect_direct(
264+
auth_method,
265+
strict_mode,
266+
connect_timeout_seconds,
267+
ssh_connection_config,
268+
)
269+
.await
238270
}
239271
}
240272
}

src/ssh/client/file_transfer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,7 @@ impl SshClient {
700700
use_agent,
701701
use_password,
702702
connect_timeout_seconds,
703+
None,
703704
)
704705
.await
705706
}

src/ssh/tokio_client/connection.rs

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,53 @@ impl SshConnectionConfig {
100100
}
101101

102102
/// Convert this configuration to a russh client Config.
103+
///
104+
/// When keepalive is enabled, `inactivity_timeout` is set to `None` so the
105+
/// keepalive mechanism is the sole dead-peer detector. russh's default
106+
/// `inactivity_timeout` is 10 minutes and would otherwise tear down an
107+
/// otherwise-healthy idle session at that mark regardless of keepalive
108+
/// liveness. When keepalive is disabled, we preserve a generous
109+
/// inactivity timeout so truly dead sockets are still reaped.
103110
pub fn to_russh_config(&self) -> Config {
111+
let inactivity_timeout = if self.keepalive_interval.is_some() {
112+
None
113+
} else {
114+
Some(Duration::from_secs(3600))
115+
};
104116
Config {
105117
keepalive_interval: self.keepalive_interval.map(Duration::from_secs),
106118
keepalive_max: self.keepalive_max,
119+
inactivity_timeout,
107120
..Default::default()
108121
}
109122
}
123+
124+
/// Derive a TCP-level keepalive configuration from this SSH keepalive
125+
/// configuration. Returns `None` if SSH keepalive is disabled.
126+
///
127+
/// TCP keepalive is a belt-and-suspenders mechanism: it lets the kernel
128+
/// detect a broken TCP path even when no application data is flowing and
129+
/// even if SSH-level keepalive replies are dropped by a middlebox.
130+
pub fn to_tcp_keepalive(&self) -> Option<socket2::TcpKeepalive> {
131+
let interval = self.keepalive_interval?;
132+
// Start probing after `interval` seconds of idleness, probe every
133+
// half-interval, up to keepalive_max retries.
134+
let probe_interval = (interval / 2).max(1);
135+
let ka = socket2::TcpKeepalive::new()
136+
.with_time(Duration::from_secs(interval))
137+
.with_interval(Duration::from_secs(probe_interval));
138+
#[cfg(any(
139+
target_os = "linux",
140+
target_os = "macos",
141+
target_os = "freebsd",
142+
target_os = "netbsd",
143+
target_os = "tvos",
144+
target_os = "watchos",
145+
target_os = "ios",
146+
))]
147+
let ka = ka.with_retries(self.keepalive_max.max(1) as u32);
148+
Some(ka)
149+
}
110150
}
111151
use super::ToSocketAddrsWithHostname;
112152

@@ -213,7 +253,16 @@ impl Client {
213253
ssh_config: &SshConnectionConfig,
214254
) -> Result<Self, super::Error> {
215255
let config = ssh_config.to_russh_config();
216-
Self::connect_with_config(addr, username, auth, server_check, config).await
256+
let tcp_keepalive = ssh_config.to_tcp_keepalive();
257+
Self::connect_with_config_inner(
258+
addr,
259+
username,
260+
auth,
261+
server_check,
262+
config,
263+
tcp_keepalive.as_ref(),
264+
)
265+
.await
217266
}
218267

219268
/// Same as `connect`, but with the option to specify a non default
@@ -227,14 +276,28 @@ impl Client {
227276
auth: AuthMethod,
228277
server_check: ServerCheckMethod,
229278
config: Config,
279+
) -> Result<Self, super::Error> {
280+
Self::connect_with_config_inner(addr, username, auth, server_check, config, None).await
281+
}
282+
283+
async fn connect_with_config_inner(
284+
addr: impl ToSocketAddrsWithHostname,
285+
username: &str,
286+
auth: AuthMethod,
287+
server_check: ServerCheckMethod,
288+
config: Config,
289+
tcp_keepalive: Option<&socket2::TcpKeepalive>,
230290
) -> Result<Self, super::Error> {
231291
let config = Arc::new(config);
232292

233293
// Connection code inspired from std::net::TcpStream::connect and std::net::each_addr
234294
let socket_addrs = addr
235295
.to_socket_addrs()
236296
.map_err(super::Error::AddressInvalid)?;
237-
let mut connect_res = Err(super::Error::AddressInvalid(io::Error::new(
297+
let mut connect_res: Result<
298+
(SocketAddr, russh::client::Handle<ClientHandler>),
299+
super::Error,
300+
> = Err(super::Error::AddressInvalid(io::Error::new(
238301
io::ErrorKind::InvalidInput,
239302
"could not resolve to any addresses",
240303
)));
@@ -244,7 +307,27 @@ impl Client {
244307
host: socket_addr,
245308
server_check: server_check.clone(),
246309
};
247-
match russh::client::connect(config.clone(), socket_addr, handler).await {
310+
311+
let stream = match tokio::net::TcpStream::connect(socket_addr).await {
312+
Ok(s) => s,
313+
Err(e) => {
314+
connect_res = Err(super::Error::IoError(e));
315+
continue;
316+
}
317+
};
318+
319+
if let Some(ka) = tcp_keepalive {
320+
let sock_ref = socket2::SockRef::from(&stream);
321+
if let Err(e) = sock_ref.set_tcp_keepalive(ka) {
322+
tracing::debug!(
323+
"Failed to set TCP keepalive on socket to {}: {}",
324+
socket_addr,
325+
e
326+
);
327+
}
328+
}
329+
330+
match russh::client::connect_stream(config.clone(), stream, handler).await {
248331
Ok(h) => {
249332
connect_res = Ok((socket_addr, h));
250333
break;

0 commit comments

Comments
 (0)