Skip to content

Commit b76fd1c

Browse files
committed
fix: heartbeat race condition, lock contention, add resilience layer, bump to v0.1.5
1 parent 9c7f818 commit b76fd1c

File tree

12 files changed

+648
-49
lines changed

12 files changed

+648
-49
lines changed

CHANGELOG.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
## [0.1.5] - 2026-02-14
11+
12+
### Added
13+
- Resilience module with circuit breaker to prevent infinite demux restart loops
14+
- Demux task watchdog for automatic recovery from task failures
15+
- Tunnel metrics tracking for observability (demux restarts, channel full events, frames dropped, lock wait times, heartbeat timeouts)
16+
- Configurable channel sizes for connection management (`connection_channel_size` in source and dest configs)
17+
- Circuit breaker configuration options (`circuit_breaker_window_secs`, `circuit_breaker_max_restarts`)
18+
- Dedicated writer task for lock-free frame sending via unbounded write queue
19+
20+
### Fixed
21+
- Heartbeat race condition where pong could arrive after timeout check began, now uses atomic flag checked atomically before timeout
22+
- Potential deadlock in destination response channel by switching to unbounded channel (monitored via metrics for backpressure)
23+
- Lock contention during frame writes by implementing dedicated writer task that minimizes critical section to just write/flush operations
24+
- Clippy dead_code warnings by adding appropriate allow attributes to config fields and public API methods not yet used
25+
26+
### Changed
27+
- Source demux task now takes ownership of tunnel read half for lock-free operation (no mutex on read path)
28+
- Connection channels now use configurable size (default 1024) instead of hardcoded 100
29+
- Heartbeat pong detection now atomic (flag cleared before ping sent, checked atomically after timeout)
30+
- Frame sending now uses unbounded write queue instead of direct writes (eliminates per-frame lock acquisition)
31+
1032
## [0.1.4] - 2026-02-13
1133

1234
### Added

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "utun"
3-
version = "0.1.4"
3+
version = "0.1.5"
44
edition = "2021"
55
authors = ["UTun Contributors"]
66
description = "Quantum-safe tunnel system"

src/config.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub enum ConfigError {
1818
}
1919

2020
#[derive(Debug, Deserialize)]
21+
#[allow(dead_code)] // Config fields loaded from TOML
2122
pub struct Config {
2223
pub source: Option<SourceConfig>,
2324
pub dest: Option<DestConfig>,
@@ -28,6 +29,7 @@ pub struct Config {
2829
}
2930

3031
#[derive(Debug, Clone, Deserialize)]
32+
#[allow(dead_code)] // Config fields loaded from TOML
3133
pub struct SourceConfig {
3234
#[serde(default = "default_listen_ip")]
3335
pub listen_ip: String,
@@ -76,6 +78,17 @@ pub struct SourceConfig {
7678
#[serde(default = "default_frame_buffer_size")]
7779
pub frame_buffer_size: usize,
7880

81+
// Channel configuration
82+
#[serde(default = "default_connection_channel_size")]
83+
pub connection_channel_size: usize,
84+
85+
// Circuit breaker configuration
86+
#[serde(default = "default_circuit_breaker_window_secs")]
87+
pub circuit_breaker_window_secs: u64,
88+
89+
#[serde(default = "default_circuit_breaker_max_restarts")]
90+
pub circuit_breaker_max_restarts: usize,
91+
7992
#[serde(default)]
8093
pub allowed_outbound: AllowedOutboundConfig,
8194

@@ -92,6 +105,7 @@ pub struct SourceConfig {
92105
}
93106

94107
#[derive(Debug, Clone, Deserialize)]
108+
#[allow(dead_code)] // Config fields loaded from TOML
95109
pub struct DestConfig {
96110
#[serde(default = "default_listen_ip")]
97111
pub listen_ip: String,
@@ -109,6 +123,10 @@ pub struct DestConfig {
109123
#[serde(default = "default_target_timeout")]
110124
pub target_connect_timeout_ms: u64,
111125

126+
// Channel configuration
127+
#[serde(default = "default_connection_channel_size")]
128+
pub connection_channel_size: usize,
129+
112130
#[serde(default)]
113131
pub connection_filter: ConnectionFilterConfig,
114132

@@ -122,6 +140,7 @@ pub struct DestConfig {
122140
}
123141

124142
#[derive(Debug, Deserialize)]
143+
#[allow(dead_code)] // Config fields loaded from TOML
125144
pub struct AuthConfig {
126145
#[serde(default = "default_mtls")]
127146
pub use_mtls: bool,
@@ -139,6 +158,7 @@ pub struct AuthConfig {
139158
}
140159

141160
#[derive(Debug, Deserialize)]
161+
#[allow(dead_code)] // Config fields loaded from TOML
142162
pub struct CryptoConfig {
143163
#[serde(default = "default_kem_mode")]
144164
pub kem_mode: KemMode,
@@ -220,6 +240,7 @@ pub struct ExposedPortConfig {
220240
}
221241

222242
#[derive(Debug, Clone, Deserialize)]
243+
#[allow(dead_code)] // Config fields loaded from TOML
223244
pub struct ServiceConfig {
224245
pub name: String,
225246
pub port: u16,
@@ -231,6 +252,7 @@ pub struct ServiceConfig {
231252
}
232253

233254
impl ServiceConfig {
255+
#[allow(dead_code)] // Public API method
234256
pub fn get_protocol(&self) -> crate::tunnel::Protocol {
235257
match self.protocol.to_lowercase().as_str() {
236258
"udp" => crate::tunnel::Protocol::Udp,
@@ -246,6 +268,7 @@ pub struct ConnectionFilterConfig {
246268
}
247269

248270
#[derive(Debug, Deserialize)]
271+
#[allow(dead_code)] // Config fields loaded from TOML
249272
pub struct LoggingConfig {
250273
#[serde(default = "default_log_level")]
251274
pub level: String,
@@ -361,6 +384,15 @@ fn default_max_reconnect_delay() -> u64 {
361384
fn default_frame_buffer_size() -> usize {
362385
1000
363386
}
387+
fn default_connection_channel_size() -> usize {
388+
1024
389+
}
390+
fn default_circuit_breaker_window_secs() -> u64 {
391+
60
392+
}
393+
fn default_circuit_breaker_max_restarts() -> usize {
394+
5
395+
}
364396

365397
/// Load configuration from file
366398
pub fn load_config(path: &Path) -> Result<Config, ConfigError> {
@@ -455,6 +487,7 @@ fn validate_config(config: &Config) -> Result<(), ConfigError> {
455487

456488
impl SourceConfig {
457489
/// Check if IP is allowed for outbound connections
490+
#[allow(dead_code)] // Public API method
458491
pub fn is_ip_allowed(&self, ip: std::net::IpAddr) -> bool {
459492
for cidr in &self.allowed_outbound.allowed_ips {
460493
if let Ok(network) = cidr.parse::<IpNetwork>() {
@@ -469,6 +502,7 @@ impl SourceConfig {
469502

470503
impl DestConfig {
471504
/// Check if source IP is allowed
505+
#[allow(dead_code)] // Public API method
472506
pub fn is_source_allowed(&self, ip: std::net::IpAddr) -> bool {
473507
for cidr in &self.connection_filter.allowed_source_ips {
474508
if let Ok(network) = cidr.parse::<IpNetwork>() {

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
//! post-quantum cryptography (ML-KEM-768 + Classic McEliece 460896) for
55
//! key exchange and AES-256-GCM for symmetric encryption.
66
7+
// Allow dead code for library API methods not yet used
8+
#![allow(dead_code)]
9+
710
pub mod cert;
811
pub mod config;
912
pub mod crypto;

src/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
// Allow dead code for library API methods not yet used
2+
#![allow(dead_code)]
3+
14
use clap::Parser;
25
use std::path::{Path, PathBuf};
36
use std::sync::Arc;

src/tunnel/connection.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub enum ConnectionState {
1616
Closed,
1717
}
1818

19+
#[allow(dead_code)] // Fields used by public API methods
1920
pub struct Connection {
2021
id: u32,
2122
remote_addr: SocketAddr,
@@ -36,8 +37,18 @@ impl Connection {
3637
_protocol: Protocol,
3738
_service_port: u16,
3839
) -> (Self, mpsc::Sender<Frame>, mpsc::Receiver<Frame>) {
39-
let (tx_to_tunnel, rx_to_tunnel) = mpsc::channel(100);
40-
let (tx_from_tunnel, rx_from_tunnel) = mpsc::channel(100);
40+
Self::new_with_channel_size(id, remote_addr, _protocol, _service_port, 1024)
41+
}
42+
43+
pub fn new_with_channel_size(
44+
id: u32,
45+
remote_addr: SocketAddr,
46+
_protocol: Protocol,
47+
_service_port: u16,
48+
channel_size: usize,
49+
) -> (Self, mpsc::Sender<Frame>, mpsc::Receiver<Frame>) {
50+
let (tx_to_tunnel, rx_to_tunnel) = mpsc::channel(channel_size);
51+
let (tx_from_tunnel, rx_from_tunnel) = mpsc::channel(channel_size);
4152

4253
let conn = Self {
4354
id,
@@ -85,32 +96,38 @@ impl Connection {
8596
self.id
8697
}
8798

99+
#[allow(dead_code)] // Public API method
88100
pub fn remote_addr(&self) -> SocketAddr {
89101
self.remote_addr
90102
}
91103

104+
#[allow(dead_code)] // Public API method
92105
pub async fn set_service(&self, name: String, target: SocketAddr) {
93106
let mut sn = self.service_name.write().await;
94107
*sn = Some(name);
95108
let mut ta = self.target_addr.write().await;
96109
*ta = Some(target);
97110
}
98111

112+
#[allow(dead_code)] // Public API method
99113
pub async fn service_name(&self) -> Option<String> {
100114
self.service_name.read().await.clone()
101115
}
102116

117+
#[allow(dead_code)] // Public API method
103118
pub async fn target_addr(&self) -> Option<SocketAddr> {
104119
*self.target_addr.read().await
105120
}
106121

122+
#[allow(dead_code)] // Public API method
107123
pub async fn send_to_tunnel(&self, frame: Frame) -> Result<(), ConnectionError> {
108124
self.tx_to_tunnel
109125
.send(frame)
110126
.await
111127
.map_err(|_| ConnectionError::ChannelClosed)
112128
}
113129

130+
#[allow(dead_code)] // Public API method
114131
pub async fn recv_from_tunnel(&self) -> Result<Frame, ConnectionError> {
115132
let mut rx_guard = self.rx_from_tunnel.write().await;
116133
if let Some(ref mut rx) = *rx_guard {
@@ -166,15 +183,25 @@ pub struct ConnectionManager {
166183
connections: RwLock<HashMap<u32, Arc<Connection>>>,
167184
max_connections: usize,
168185
connection_timeout: Duration,
186+
channel_size: usize,
169187
}
170188

171189
impl ConnectionManager {
172190
pub fn new(max_connections: usize, connection_timeout_ms: u64) -> Self {
191+
Self::new_with_channel_size(max_connections, connection_timeout_ms, 1024)
192+
}
193+
194+
pub fn new_with_channel_size(
195+
max_connections: usize,
196+
connection_timeout_ms: u64,
197+
channel_size: usize,
198+
) -> Self {
173199
Self {
174200
next_id: AtomicU32::new(1),
175201
connections: RwLock::new(HashMap::new()),
176202
max_connections,
177203
connection_timeout: Duration::from_millis(connection_timeout_ms),
204+
channel_size,
178205
}
179206
}
180207

@@ -194,7 +221,13 @@ impl ConnectionManager {
194221
}
195222

196223
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
197-
let (conn, tx, rx) = Connection::new(id, remote_addr, protocol, service_port);
224+
let (conn, tx, rx) = Connection::new_with_channel_size(
225+
id,
226+
remote_addr,
227+
protocol,
228+
service_port,
229+
self.channel_size,
230+
);
198231
let conn = Arc::new(conn);
199232

200233
connections.insert(id, conn.clone());

src/tunnel/dest.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,17 +115,22 @@ pub struct DestContainer {
115115
/// Maps connection_id -> target connection write half for data forwarding
116116
target_connections: Arc<RwLock<HashMap<u32, TargetConnection>>>,
117117
/// Channel to send response frames back to the tunnel
118-
response_tx: Arc<RwLock<Option<mpsc::Sender<Frame>>>>,
118+
response_tx: Arc<RwLock<Option<mpsc::UnboundedSender<Frame>>>>,
119119
/// Maximum handshake message size (depends on KEM mode)
120120
max_handshake_size: u32,
121+
/// Channel size configuration
122+
channel_size: usize,
121123
}
122124

123125
impl DestContainer {
124126
pub async fn new(config: DestConfig, crypto_config: CryptoConfig) -> Result<Self, DestError> {
127+
let channel_size = config.connection_channel_size;
128+
125129
let key_manager = Arc::new(KeyManager::new(3600, 300)); // 1 hour rotation, 5 min window
126-
let connection_manager = Arc::new(ConnectionManager::new(
130+
let connection_manager = Arc::new(ConnectionManager::new_with_channel_size(
127131
config.max_connections_per_service * config.exposed_services.len(),
128132
config.connection_timeout_ms,
133+
channel_size,
129134
));
130135
let service_registry = Arc::new(ServiceRegistry::new(config.exposed_services.clone()));
131136

@@ -149,6 +154,7 @@ impl DestContainer {
149154
target_connections: Arc::new(RwLock::new(HashMap::new())),
150155
response_tx: Arc::new(RwLock::new(None)),
151156
max_handshake_size,
157+
channel_size,
152158
})
153159
}
154160

@@ -211,14 +217,21 @@ impl DestContainer {
211217
drop(codec);
212218

213219
// Create channel for response frames from target connections
214-
let (response_tx, mut response_rx) = mpsc::channel::<Frame>(256);
220+
// Use unbounded to prevent backpressure deadlock (monitored via metrics)
221+
let (response_tx, mut response_rx) = mpsc::unbounded_channel::<Frame>();
215222

216223
// Store the response sender
217224
{
218225
let mut tx = self.response_tx.write().await;
219226
*tx = Some(response_tx.clone());
220227
}
221228

229+
// Log channel configuration
230+
tracing::info!(
231+
"Response channel configured as unbounded for connection from {}",
232+
addr
233+
);
234+
222235
// Handle incoming frames
223236
let connection_manager = self.connection_manager.clone();
224237
let service_registry = self.service_registry.clone();
@@ -326,7 +339,7 @@ impl DestContainer {
326339

327340
// Send response if any (through the channel)
328341
if let Some(resp_frame) = response {
329-
if response_tx.send(resp_frame).await.is_err() {
342+
if response_tx.send(resp_frame).is_err() {
330343
tracing::error!("Failed to send response - channel closed");
331344
break;
332345
}
@@ -460,7 +473,7 @@ impl DestContainer {
460473
connection_manager: &Arc<ConnectionManager>,
461474
service_registry: &Arc<ServiceRegistry>,
462475
metrics: &Arc<DestMetrics>,
463-
response_tx: &mpsc::Sender<Frame>,
476+
response_tx: &mpsc::UnboundedSender<Frame>,
464477
) -> Option<Frame> {
465478
match frame.frame_type() {
466479
FrameType::Connect => {
@@ -544,7 +557,7 @@ impl DestContainer {
544557
if let Ok(mut close_frame) = Frame::new_data(connection_id, 0, &[])
545558
{
546559
close_frame.set_fin();
547-
let _ = response_tx_clone.send(close_frame).await;
560+
let _ = response_tx_clone.send(close_frame);
548561
}
549562
break;
550563
}
@@ -553,7 +566,7 @@ impl DestContainer {
553566
conn_clone.record_receive(n);
554567
if let Ok(data_frame) = Frame::new_data(connection_id, 0, &buf[..n])
555568
{
556-
if response_tx_clone.send(data_frame).await.is_err() {
569+
if response_tx_clone.send(data_frame).is_err() {
557570
tracing::error!(
558571
"Failed to send data frame - channel closed"
559572
);
@@ -715,6 +728,7 @@ impl Clone for DestContainer {
715728
target_connections: self.target_connections.clone(),
716729
response_tx: self.response_tx.clone(),
717730
max_handshake_size: self.max_handshake_size,
731+
channel_size: self.channel_size,
718732
}
719733
}
720734
}

0 commit comments

Comments
 (0)