Skip to content

Commit 58262e8

Browse files
sanityclaude
andauthored
fix: clear stale crypto state on peer reconnection (#2236)
Co-authored-by: Claude Opus 4.5 <[email protected]>
1 parent 30854f1 commit 58262e8

File tree

1 file changed

+261
-0
lines changed

1 file changed

+261
-0
lines changed

crates/core/src/transport/connection_handler.rs

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,33 @@ impl<S: Socket> UdpPacketsListener<S> {
457457
continue;
458458
}
459459

460+
// Issue #2235: Clean up stale CLOSED connections from the same IP but different port.
461+
// When a peer reconnects (e.g., after restart), it may get a new ephemeral port.
462+
// The old connection's crypto state is stale and will cause AEAD decryption
463+
// failures if we try to reuse it.
464+
//
465+
// IMPORTANT: We only remove connections where the channel is closed, to avoid
466+
// breaking scenarios where multiple legitimate peers are behind the same NAT
467+
// (sharing the same public IP but using different source ports).
468+
let remote_ip = remote_addr.ip();
469+
let stale_addrs: Vec<_> = self.remote_connections
470+
.iter()
471+
.filter(|(addr, conn)| {
472+
addr.ip() == remote_ip
473+
&& **addr != remote_addr
474+
&& conn.inbound_packet_sender.is_closed()
475+
})
476+
.map(|(addr, _)| *addr)
477+
.collect();
478+
for stale_addr in stale_addrs {
479+
self.remote_connections.remove(&stale_addr);
480+
tracing::debug!(
481+
%stale_addr,
482+
%remote_addr,
483+
"removed stale closed connection from same IP"
484+
);
485+
}
486+
460487
let inbound_key_bytes = key_from_addr(&remote_addr);
461488
let (gw_ongoing_connection, packets_sender) = self.gateway_connection(packet_data, remote_addr, inbound_key_bytes);
462489
let task = tokio::spawn(gw_ongoing_connection
@@ -2033,4 +2060,238 @@ mod test {
20332060

20342061
Ok(())
20352062
}
2063+
2064+
/// Test that a peer reconnecting from a different port is handled correctly.
2065+
///
2066+
/// This simulates the scenario from issue #2235 where:
2067+
/// 1. Peer connects to gateway from port A - succeeds
2068+
/// 2. Peer disconnects (connection dropped)
2069+
/// 3. Peer reconnects from port B (new ephemeral port)
2070+
/// 4. Gateway should recognize this as a new session and accept immediately
2071+
///
2072+
/// The bug was that the gateway retained stale crypto state, causing AEAD
2073+
/// decryption failures on the new connection.
2074+
#[tokio::test]
2075+
async fn gateway_handles_peer_reconnection_from_different_port() -> anyhow::Result<()> {
2076+
let channels = Arc::new(DashMap::new());
2077+
2078+
// Create the gateway
2079+
let (gw_pub, (gw_outbound, mut gw_conn), gw_addr) =
2080+
set_gateway_connection(Default::default(), channels.clone()).await?;
2081+
2082+
// Create peer A at port X
2083+
let (_peer_a_pub, mut peer_a, peer_a_addr) =
2084+
set_peer_connection(Default::default(), channels.clone()).await?;
2085+
2086+
tracing::info!(
2087+
"Step 1: Peer A at {} connecting to gateway at {}",
2088+
peer_a_addr,
2089+
gw_addr
2090+
);
2091+
2092+
// Peer A connects to gateway
2093+
let gw_task = tokio::spawn(async move {
2094+
let conn = tokio::time::timeout(Duration::from_secs(10), gw_conn.recv())
2095+
.await?
2096+
.ok_or(anyhow::anyhow!("gateway: no inbound connection"))?;
2097+
tracing::info!("Gateway received connection from {}", conn.remote_addr());
2098+
Ok::<_, anyhow::Error>((gw_conn, conn))
2099+
});
2100+
2101+
let peer_a_conn = tokio::time::timeout(
2102+
Duration::from_secs(10),
2103+
peer_a.connect(gw_pub.clone(), gw_addr).await,
2104+
)
2105+
.await??;
2106+
tracing::info!("Peer A connected successfully");
2107+
2108+
let (mut gw_conn, _gw_peer_a_conn) = gw_task.await??;
2109+
2110+
// Step 2: Simulate peer disconnect by dropping the connection
2111+
// The gateway still has state for the old connection in remote_connections
2112+
tracing::info!("Step 2: Dropping peer A connection");
2113+
drop(peer_a_conn);
2114+
drop(peer_a);
2115+
2116+
// Give a moment for cleanup (in reality this could be much longer)
2117+
tokio::time::sleep(Duration::from_millis(100)).await;
2118+
2119+
// Step 3: Create a new peer at a DIFFERENT port (simulating restart with new ephemeral port)
2120+
// This peer uses the same shared channels so packets can still route
2121+
tracing::info!("Step 3: Creating peer B at different port");
2122+
let (_peer_b_pub, mut peer_b, peer_b_addr) =
2123+
set_peer_connection(Default::default(), channels.clone()).await?;
2124+
2125+
// Verify this is testing the same-IP-different-port scenario
2126+
assert_eq!(
2127+
peer_a_addr.ip(),
2128+
peer_b_addr.ip(),
2129+
"Test requires both peers to have same IP (simulating reconnection)"
2130+
);
2131+
assert_ne!(
2132+
peer_a_addr.port(),
2133+
peer_b_addr.port(),
2134+
"Peer B should have different port than peer A"
2135+
);
2136+
tracing::info!(
2137+
"Peer B created at {} (different from peer A at {})",
2138+
peer_b_addr,
2139+
peer_a_addr
2140+
);
2141+
2142+
// Step 4: Peer B connects to gateway - this should work on first attempt
2143+
tracing::info!("Step 4: Peer B connecting to gateway");
2144+
2145+
let gw_task = tokio::spawn(async move {
2146+
let conn = tokio::time::timeout(Duration::from_secs(5), gw_conn.recv())
2147+
.await?
2148+
.ok_or(anyhow::anyhow!("gateway: no inbound connection for peer B"))?;
2149+
tracing::info!(
2150+
"Gateway received connection from {} (peer B)",
2151+
conn.remote_addr()
2152+
);
2153+
Ok::<_, anyhow::Error>(conn)
2154+
});
2155+
2156+
let start = std::time::Instant::now();
2157+
let peer_b_conn = tokio::time::timeout(
2158+
Duration::from_secs(5),
2159+
peer_b.connect(gw_pub.clone(), gw_addr).await,
2160+
)
2161+
.await??;
2162+
let elapsed = start.elapsed();
2163+
2164+
tracing::info!("Peer B connected in {:?}", elapsed);
2165+
2166+
let _gw_peer_b_conn = gw_task.await??;
2167+
2168+
// Verify the connection works by checking addresses match
2169+
assert_eq!(peer_b_conn.remote_addr(), gw_addr);
2170+
2171+
// The connection should have completed quickly (within the first attempt window)
2172+
// If the bug exists, this would take multiple retries (16+ attempts at 200ms each = 3+ seconds)
2173+
assert!(
2174+
elapsed < Duration::from_secs(2),
2175+
"Connection took {:?}, which suggests multiple retry attempts were needed. \
2176+
This indicates stale crypto state issue from #2235.",
2177+
elapsed
2178+
);
2179+
2180+
// Cleanup
2181+
drop(peer_b_conn);
2182+
drop(peer_b);
2183+
drop(gw_outbound);
2184+
2185+
Ok(())
2186+
}
2187+
2188+
/// Test that multiple peers behind the same NAT (same IP, different ports) can
2189+
/// connect simultaneously without interfering with each other.
2190+
///
2191+
/// This verifies that the stale connection cleanup (issue #2235) doesn't break
2192+
/// legitimate multi-peer scenarios where multiple devices share the same public IP.
2193+
#[tokio::test]
2194+
async fn multiple_peers_behind_same_nat() -> anyhow::Result<()> {
2195+
let channels = Arc::new(DashMap::new());
2196+
2197+
// Create the gateway
2198+
let (gw_pub, (_gw_outbound, mut gw_conn), gw_addr) =
2199+
set_gateway_connection(Default::default(), channels.clone()).await?;
2200+
2201+
// Create peer A (first device behind NAT)
2202+
let (_peer_a_pub, mut peer_a, peer_a_addr) =
2203+
set_peer_connection(Default::default(), channels.clone()).await?;
2204+
2205+
// Create peer B (second device behind NAT, same IP different port)
2206+
let (_peer_b_pub, mut peer_b, peer_b_addr) =
2207+
set_peer_connection(Default::default(), channels.clone()).await?;
2208+
2209+
// Verify both peers have same IP but different ports (simulating NAT)
2210+
assert_eq!(
2211+
peer_a_addr.ip(),
2212+
peer_b_addr.ip(),
2213+
"Both peers should have same IP (behind same NAT)"
2214+
);
2215+
assert_ne!(
2216+
peer_a_addr.port(),
2217+
peer_b_addr.port(),
2218+
"Peers should have different ports"
2219+
);
2220+
2221+
tracing::info!(
2222+
"Testing multiple peers behind same NAT: Peer A at {}, Peer B at {}",
2223+
peer_a_addr,
2224+
peer_b_addr
2225+
);
2226+
2227+
// Step 1: Peer A connects to gateway
2228+
let gw_task_a = tokio::spawn(async move {
2229+
let conn = tokio::time::timeout(Duration::from_secs(10), gw_conn.recv())
2230+
.await?
2231+
.ok_or(anyhow::anyhow!("gateway: no connection from peer A"))?;
2232+
tracing::info!(
2233+
"Gateway received connection from peer A: {}",
2234+
conn.remote_addr()
2235+
);
2236+
Ok::<_, anyhow::Error>((gw_conn, conn))
2237+
});
2238+
2239+
let peer_a_conn = tokio::time::timeout(
2240+
Duration::from_secs(10),
2241+
peer_a.connect(gw_pub.clone(), gw_addr).await,
2242+
)
2243+
.await??;
2244+
tracing::info!("Peer A connected successfully");
2245+
2246+
let (mut gw_conn, gw_peer_a_conn) = gw_task_a.await??;
2247+
2248+
// Step 2: Peer B connects to gateway (while peer A is still connected)
2249+
let gw_task_b = tokio::spawn(async move {
2250+
let conn = tokio::time::timeout(Duration::from_secs(10), gw_conn.recv())
2251+
.await?
2252+
.ok_or(anyhow::anyhow!("gateway: no connection from peer B"))?;
2253+
tracing::info!(
2254+
"Gateway received connection from peer B: {}",
2255+
conn.remote_addr()
2256+
);
2257+
Ok::<_, anyhow::Error>((gw_conn, conn))
2258+
});
2259+
2260+
let peer_b_conn = tokio::time::timeout(
2261+
Duration::from_secs(10),
2262+
peer_b.connect(gw_pub.clone(), gw_addr).await,
2263+
)
2264+
.await??;
2265+
tracing::info!("Peer B connected successfully");
2266+
2267+
let (_gw_conn, gw_peer_b_conn) = gw_task_b.await??;
2268+
2269+
// Step 3: Verify BOTH connections are still valid
2270+
// The critical check: peer A's connection should NOT have been disrupted
2271+
// when peer B connected from the same IP
2272+
assert_eq!(
2273+
peer_a_conn.remote_addr(),
2274+
gw_addr,
2275+
"Peer A connection should still be valid"
2276+
);
2277+
assert_eq!(
2278+
peer_b_conn.remote_addr(),
2279+
gw_addr,
2280+
"Peer B connection should be valid"
2281+
);
2282+
assert_eq!(
2283+
gw_peer_a_conn.remote_addr(),
2284+
peer_a_addr,
2285+
"Gateway should still have peer A's connection"
2286+
);
2287+
assert_eq!(
2288+
gw_peer_b_conn.remote_addr(),
2289+
peer_b_addr,
2290+
"Gateway should have peer B's connection"
2291+
);
2292+
2293+
tracing::info!("Both peers successfully connected and remain active");
2294+
2295+
Ok(())
2296+
}
20362297
}

0 commit comments

Comments
 (0)