diff --git a/src/node/network/bsc_protocol/protocol/handler.rs b/src/node/network/bsc_protocol/protocol/handler.rs index 9788f570..3f5f92d6 100644 --- a/src/node/network/bsc_protocol/protocol/handler.rs +++ b/src/node/network/bsc_protocol/protocol/handler.rs @@ -46,11 +46,7 @@ impl ConnectionHandler for BscConnectionHandlerV2 { ) -> Self::Connection { tracing::debug!(target: "bsc_protocol", "Into connection, direction: {}, peer_id: {}", direction, _peer_id); let (tx, rx) = mpsc::unbounded_channel(); - // Save sender so other components can broadcast BSC messages - // Note: PeerId is not exposed directly here, so we rely on the local peer id for keying - // when available. However, reth passes `_peer_id` which we can use. - // Even if the connection drops, failed sends will lazily clean up entries. - registry::register_peer(_peer_id, tx); + let registry_conn_token = registry::register_peer(_peer_id, tx); // EVN: mark this peer if present in whitelist and mark as trusted at runtime crate::node::network::evn_peers::mark_evn_if_whitelisted(_peer_id); if crate::node::network::evn_peers::is_evn_peer(_peer_id) { @@ -61,7 +57,14 @@ impl ConnectionHandler for BscConnectionHandlerV2 { // Ensure EVN refresh listener is running to handle post-sync EVN updates // for existing peers. crate::node::network::bsc_protocol::registry::spawn_evn_refresh_listener(); - BscProtocolConnection::new(conn, rx, direction.is_outgoing(), 2, Some(_peer_id)) + BscProtocolConnection::new( + conn, + rx, + direction.is_outgoing(), + 2, + Some(_peer_id), + registry_conn_token, + ) } } @@ -104,7 +107,7 @@ impl ConnectionHandler for BscConnectionHandlerV1 { conn: ProtocolConnection, ) -> Self::Connection { let (tx, rx) = mpsc::unbounded_channel(); - registry::register_peer(_peer_id, tx); + let registry_conn_token = registry::register_peer(_peer_id, tx); crate::node::network::evn_peers::mark_evn_if_whitelisted(_peer_id); if crate::node::network::evn_peers::is_evn_peer(_peer_id) { if let Some(net) = crate::shared::get_network_handle() { @@ -112,6 +115,13 @@ impl ConnectionHandler for BscConnectionHandlerV1 { } } crate::node::network::bsc_protocol::registry::spawn_evn_refresh_listener(); - BscProtocolConnection::new(conn, rx, direction.is_outgoing(), 1, Some(_peer_id)) + BscProtocolConnection::new( + conn, + rx, + direction.is_outgoing(), + 1, + Some(_peer_id), + registry_conn_token, + ) } } diff --git a/src/node/network/bsc_protocol/registry.rs b/src/node/network/bsc_protocol/registry.rs index 2dd53efc..5baeae39 100644 --- a/src/node/network/bsc_protocol/registry.rs +++ b/src/node/network/bsc_protocol/registry.rs @@ -20,8 +20,19 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; use tokio::time::timeout; +/// Per-connection token for [`PeerRegistryEntry`]. Starts at 1; `0` means +/// "not registered" (e.g. poisoned lock during `register_peer`). +static PEER_CONN_TOKEN: Lazy = Lazy::new(|| AtomicU64::new(1)); + +struct PeerRegistryEntry { + tx: UnboundedSender, + /// Monotonic id for this BSC subprotocol session; used so `Drop` / failed + /// sends only remove the matching entry after same-`PeerId` reconnect. + conn_token: u64, +} + /// Global registry of active BSC protocol senders per peer. -static REGISTRY: Lazy>>> = +static REGISTRY: Lazy>> = Lazy::new(|| RwLock::new(HashMap::new())); /// Optional background task handle for EVN post-sync peer refresh. @@ -32,22 +43,53 @@ static EVN_REFRESH_TASK: Lazy>>> = Lazy::new(|| RwL static PROXYED_PEER_IDS_MAP: Lazy>> = Lazy::new(|| RwLock::new(HashSet::new())); -/// Register a new peer's sender channel. -pub fn register_peer(peer: PeerId, tx: UnboundedSender) { +/// Register a new peer's sender channel. Returns a per-connection token for +/// [`unregister_peer_if_current`]; `0` if the registry lock was poisoned. +pub fn register_peer(peer: PeerId, tx: UnboundedSender) -> u64 { let tx_for_sync = tx.clone(); - let mut inserted = false; - let guard = REGISTRY.write(); - match guard { - Ok(mut g) => { - g.insert(peer, tx); - inserted = true; - } + let mut g = match REGISTRY.write() { + Ok(g) => g, Err(e) => { - tracing::error!(target: "bsc::registry", error=%e, "Registry lock poisoned (register)"); + tracing::error!( + target: "bsc::registry", + error=%e, + "Registry lock poisoned (register)" + ); + return 0; } + }; + let conn_token = PEER_CONN_TOKEN.fetch_add(1, Ordering::Relaxed); + g.insert( + peer, + PeerRegistryEntry { + tx, + conn_token, + }, + ); + sync_pending_votes_to_peer(peer, tx_for_sync); + conn_token +} + +/// Remove `peer` from the registry only if the stored session matches +/// `conn_token`. Safe across same-`PeerId` reconnects and replaced entries. +pub fn unregister_peer_if_current(peer: PeerId, conn_token: u64) { + if conn_token == 0 { + return; } - if inserted { - sync_pending_votes_to_peer(peer, tx_for_sync); + let Ok(mut g) = REGISTRY.write() else { + return; + }; + let remove = g + .get(&peer) + .is_some_and(|entry| entry.conn_token == conn_token); + if remove { + g.remove(&peer); + tracing::debug!( + target: "bsc::registry", + %peer, + conn_token, + "Unregistered BSC protocol peer (session ended)" + ); } } @@ -132,10 +174,10 @@ pub async fn request_blocks_by_range( return Err(format!("invalid count {}", count)); } - let tx = { + let (tx, conn_token) = { let guard = REGISTRY.read(); match guard { - Ok(g) => g.get(&peer).cloned(), + Ok(g) => g.get(&peer).map(|e| (e.tx.clone(), e.conn_token)), Err(_) => None, } } @@ -149,8 +191,13 @@ pub async fn request_blocks_by_range( start_block_hash: start_hash, count, }; - tx.send(BscCommand::GetBlocksByRange(packet, resp_tx)) - .map_err(|_| "failed to send GetBlocksByRange command".to_string())?; + if tx + .send(BscCommand::GetBlocksByRange(packet, resp_tx)) + .is_err() + { + unregister_peer_if_current(peer, conn_token); + return Err("failed to send GetBlocksByRange command".to_string()); + } match timeout(timeout_dur, resp_rx).await { Ok(Ok(Ok(res))) => Ok(res), @@ -166,8 +213,11 @@ pub fn broadcast_votes(votes: Vec) tokio::spawn(async move { let votes_arc = Arc::new(votes); // Snapshot registry to avoid holding lock during await - let reg_snapshot: Vec<(PeerId, UnboundedSender)> = match REGISTRY.read() { - Ok(guard) => guard.iter().map(|(p, tx)| (*p, tx.clone())).collect(), + let reg_snapshot: Vec<(PeerId, UnboundedSender, u64)> = match REGISTRY.read() { + Ok(guard) => guard + .iter() + .map(|(p, e)| (*p, e.tx.clone(), e.conn_token)) + .collect(), Err(e) => { tracing::error!(target: "bsc::registry", error=%e, "Registry lock poisoned (broadcast snapshot)"); return; @@ -198,8 +248,8 @@ pub fn broadcast_votes(votes: Vec) std::collections::HashMap::new() }; - let mut to_remove: Vec = Vec::new(); - for (peer, tx) in reg_snapshot { + let mut to_remove: Vec<(PeerId, u64)> = Vec::new(); + for (peer, tx, conn_token) in reg_snapshot { let peer_best_td = peer_info_map.get(&peer).and_then(|info| info.best_td); let allow = should_allow_vote_broadcast( is_evn(&peer) || is_proxyed_peer(&peer), @@ -223,21 +273,12 @@ pub fn broadcast_votes(votes: Vec) tracing::trace!(target: "bsc::vote", peer=%peer, allow=allow, is_proxyed=is_proxyed_peer(&peer), "broadcast votes to peer"); if allow && tx.send(BscCommand::Votes(Arc::clone(&votes_arc))).is_err() { tracing::trace!(target: "bsc::vote", peer=%peer, "failed to send votes to peer, remove from registry"); - to_remove.push(peer); + to_remove.push((peer, conn_token)); } } - if !to_remove.is_empty() { - match REGISTRY.write() { - Ok(mut guard) => { - for peer in to_remove { - guard.remove(&peer); - } - } - Err(e) => { - tracing::error!(target: "bsc::registry", error=%e, "Registry lock poisoned (cleanup)"); - } - } + for (peer, conn_token) in to_remove { + unregister_peer_if_current(peer, conn_token); } }); } diff --git a/src/node/network/bsc_protocol/stream.rs b/src/node/network/bsc_protocol/stream.rs index 111f5621..54c5a1a7 100644 --- a/src/node/network/bsc_protocol/stream.rs +++ b/src/node/network/bsc_protocol/stream.rs @@ -59,6 +59,9 @@ pub struct BscProtocolConnection { proto_version: u64, /// PeerId for this connection, if known _peer_id: Option, + /// Token returned by [`crate::node::network::bsc_protocol::registry::register_peer`]; + /// used to remove only this session's registry entry on drop. + registry_conn_token: u64, /// Last time we pruned pending requests last_prune: std::time::Instant, } @@ -70,6 +73,7 @@ impl BscProtocolConnection { is_dialer: bool, proto_version: u64, peer_id: Option, + registry_conn_token: u64, ) -> Self { let handshake_deadline = Some(Box::pin(tokio::time::sleep(HANDSHAKE_TIMEOUT))); // Both sides should send initial capability in BSC protocol @@ -89,6 +93,7 @@ impl BscProtocolConnection { pending_range_reqs: HashMap::new(), proto_version, _peer_id: peer_id, + registry_conn_token, last_prune: std::time::Instant::now(), } } @@ -354,6 +359,17 @@ impl BscProtocolConnection { } } +impl Drop for BscProtocolConnection { + fn drop(&mut self) { + if let Some(peer) = self._peer_id { + crate::node::network::bsc_protocol::registry::unregister_peer_if_current( + peer, + self.registry_conn_token, + ); + } + } +} + impl Stream for BscProtocolConnection { type Item = BytesMut;