Skip to content

Commit 16f2c4c

Browse files
committed
feat(net-svc): testing & correctness improvements
1 parent 3729c6f commit 16f2c4c

8 files changed

Lines changed: 473 additions & 189 deletions

File tree

crates/net-svc/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
//! let config = NetServiceConfig::new(signing_key, bind_addr, peers);
2121
//!
2222
//! // Start the service (spawns background thread)
23-
//! let (handle, controller) = NetService::new(config);
23+
//! let (handle, controller) = NetService::new(config)?;
2424
//!
2525
//! // Use handle from any thread
2626
//! let stream = handle.open_protocol_stream(peer_id, 0).await?;

crates/net-svc/src/svc/handlers.rs

Lines changed: 89 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use kanal::{AsyncReceiver, bounded_async};
1717
use crate::api::{ExpectError, NetCommand, OpenStreamError, Stream};
1818
use crate::tls::PeerId;
1919

20-
use super::state::{PendingStreamRequest, ServiceEvent, ServiceState};
20+
use super::state::{
21+
ConnectionDirection, PendingStreamRequest, ServiceEvent, ServiceState, TrackedConnection,
22+
};
2123
use super::tasks;
2224

2325
/// Handle a command from a NetServiceHandle.
@@ -77,11 +79,11 @@ fn handle_open_stream_request(
7779
}
7880

7981
// Check for existing active connection
80-
if let Some(connection) = state.connections.get(&peer).cloned()
81-
&& connection.close_reason().is_none()
82+
if let Some(conn) = state.connections.get(&peer).cloned()
83+
&& conn.connection.close_reason().is_none()
8284
{
8385
// Have active connection - spawn stream opener
84-
tasks::spawn_stream_opener(peer, connection, stream_type, priority, respond_to);
86+
tasks::spawn_stream_opener(peer, conn.connection, stream_type, priority, respond_to);
8587
return;
8688
}
8789

@@ -160,19 +162,54 @@ fn handle_expect_bulk_transfer(
160162
pub fn handle_event(event: ServiceEvent, state: &mut ServiceState) {
161163
match event {
162164
ServiceEvent::IncomingConnectionReady { peer, connection } => {
163-
tracing::info!(peer = %hex::encode(peer), "storing incoming connection");
165+
tracing::info!(peer = %hex::encode(peer), "incoming connection ready");
164166

165167
// Remove from connecting set if we were also trying to connect outbound
166168
state.connecting.remove(&peer);
167169

168-
// Close any existing connection first to ensure its monitor exits cleanly
169-
// This prevents duplicate monitors when both peers connect simultaneously
170+
// Deterministic connection selection based on peer_id ordering.
171+
//
172+
// Rule: prefer the connection initiated by the peer with the LOWER peer_id.
173+
// - Incoming = they initiated the connection
174+
// - Outgoing = we initiated the connection
175+
//
176+
// This ensures both sides agree on which connection to keep during
177+
// simultaneous connect, avoiding the race where each side closes
178+
// the other's chosen connection.
179+
let our_id = state.config.our_peer_id();
180+
let incoming_is_preferred = our_id > peer; // they have lower id, prefer their connection
181+
182+
if !incoming_is_preferred {
183+
// This Incoming is NOT preferred (we have lower id, prefer Outgoing).
184+
// Always reject it - we'll use our outbound connection instead.
185+
// This avoids the race where we accept temporarily, process pending
186+
// requests, then close when the preferred connection arrives.
187+
tracing::debug!(
188+
peer = %hex::encode(peer),
189+
"rejecting incoming; prefer outgoing (we have lower peer_id)"
190+
);
191+
connection.close(0u32.into(), b"redundant");
192+
return;
193+
}
194+
195+
// This Incoming is the preferred type. Accept it, replacing any existing.
170196
if let Some(old_conn) = state.connections.remove(&peer) {
171-
old_conn.close(0u32.into(), b"replaced");
197+
tracing::debug!(
198+
peer = %hex::encode(peer),
199+
old_direction = ?old_conn.direction,
200+
"replacing existing connection with preferred incoming"
201+
);
202+
old_conn.connection.close(0u32.into(), b"replaced");
172203
}
173204

174205
// Store new connection
175-
state.connections.insert(peer, connection.clone());
206+
state.connections.insert(
207+
peer,
208+
TrackedConnection {
209+
connection: connection.clone(),
210+
direction: ConnectionDirection::Incoming,
211+
},
212+
);
176213

177214
// Remove from pending reconnects
178215
state.pending_reconnects.retain(|(p, _)| *p != peer);
@@ -204,14 +241,48 @@ pub fn handle_event(event: ServiceEvent, state: &mut ServiceState) {
204241
// Remove from connecting set
205242
state.connecting.remove(&peer);
206243

207-
// Close any existing connection first to ensure its monitor exits cleanly
208-
// This prevents duplicate monitors when both peers connect simultaneously
244+
// Deterministic connection selection based on peer_id ordering.
245+
//
246+
// Rule: prefer the connection initiated by the peer with the LOWER peer_id.
247+
// - Incoming = they initiated the connection
248+
// - Outgoing = we initiated the connection
249+
//
250+
// This ensures both sides agree on which connection to keep during
251+
// simultaneous connect.
252+
let our_id = state.config.our_peer_id();
253+
let outgoing_is_preferred = our_id < peer; // we have lower id, prefer our connection
254+
255+
if !outgoing_is_preferred {
256+
// This Outgoing is NOT preferred (they have lower id, prefer Incoming).
257+
// Always reject it - we'll use their inbound connection instead.
258+
// This avoids the race where we accept temporarily, process pending
259+
// requests, then close when the preferred connection arrives.
260+
tracing::debug!(
261+
peer = %hex::encode(peer),
262+
"rejecting outgoing; prefer incoming (they have lower peer_id)"
263+
);
264+
connection.close(0u32.into(), b"redundant");
265+
return;
266+
}
267+
268+
// This Outgoing is the preferred type. Accept it, replacing any existing.
209269
if let Some(old_conn) = state.connections.remove(&peer) {
210-
old_conn.close(0u32.into(), b"replaced");
270+
tracing::debug!(
271+
peer = %hex::encode(peer),
272+
old_direction = ?old_conn.direction,
273+
"replacing existing connection with preferred outgoing"
274+
);
275+
old_conn.connection.close(0u32.into(), b"replaced");
211276
}
212277

213278
// Store new connection
214-
state.connections.insert(peer, connection.clone());
279+
state.connections.insert(
280+
peer,
281+
TrackedConnection {
282+
connection: connection.clone(),
283+
direction: ConnectionDirection::Outgoing,
284+
},
285+
);
215286

216287
// Remove from pending reconnects
217288
state.pending_reconnects.retain(|(p, _)| *p != peer);
@@ -257,15 +328,15 @@ pub fn handle_event(event: ServiceEvent, state: &mut ServiceState) {
257328
}
258329

259330
ServiceEvent::ConnectionLost { peer, reason } => {
260-
// Check if we already have a valid connection (can happen if connection was replaced)
261-
// In that case, this is a stale event from the old connection's monitor
331+
// Check if we already have a valid connection.
332+
// If so, this is a stale event from an older connection monitor.
262333
if let Some(existing) = state.connections.get(&peer)
263-
&& existing.close_reason().is_none()
334+
&& existing.connection.close_reason().is_none()
264335
{
265336
tracing::debug!(
266337
peer = %hex::encode(peer),
267338
reason = %reason,
268-
"ignoring connection lost for replaced connection"
339+
"ignoring connection lost for non-current connection"
269340
);
270341
return;
271342
}
@@ -350,7 +421,7 @@ pub fn process_pending_reconnects(state: &mut ServiceState) {
350421
for peer in to_reconnect {
351422
// Skip if already connected
352423
if let Some(conn) = state.connections.get(&peer)
353-
&& conn.close_reason().is_none()
424+
&& conn.connection.close_reason().is_none()
354425
{
355426
continue;
356427
}

0 commit comments

Comments
 (0)