Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions src/node/network/bsc_protocol/protocol/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ impl ConnectionHandler for BscConnectionHandlerV2 {
) -> Self::Connection {
tracing::debug!(target: "bsc_protocol", "Into connection, direction: {}, peer_id: {}", direction, _peer_id);
let (tx, rx) = mpsc::unbounded_channel();
// Save sender so other components can broadcast BSC messages
// Note: PeerId is not exposed directly here, so we rely on the local peer id for keying
// when available. However, reth passes `_peer_id` which we can use.
// Even if the connection drops, failed sends will lazily clean up entries.
registry::register_peer(_peer_id, tx);
let registry_conn_token = registry::register_peer(_peer_id, tx);
// EVN: mark this peer if present in whitelist and mark as trusted at runtime
crate::node::network::evn_peers::mark_evn_if_whitelisted(_peer_id);
if crate::node::network::evn_peers::is_evn_peer(_peer_id) {
Expand All @@ -61,7 +57,14 @@ impl ConnectionHandler for BscConnectionHandlerV2 {
// Ensure EVN refresh listener is running to handle post-sync EVN updates
// for existing peers.
crate::node::network::bsc_protocol::registry::spawn_evn_refresh_listener();
BscProtocolConnection::new(conn, rx, direction.is_outgoing(), 2, Some(_peer_id))
BscProtocolConnection::new(
conn,
rx,
direction.is_outgoing(),
2,
Some(_peer_id),
registry_conn_token,
)
}
}

Expand Down Expand Up @@ -104,14 +107,21 @@ impl ConnectionHandler for BscConnectionHandlerV1 {
conn: ProtocolConnection,
) -> Self::Connection {
let (tx, rx) = mpsc::unbounded_channel();
registry::register_peer(_peer_id, tx);
let registry_conn_token = registry::register_peer(_peer_id, tx);
crate::node::network::evn_peers::mark_evn_if_whitelisted(_peer_id);
if crate::node::network::evn_peers::is_evn_peer(_peer_id) {
if let Some(net) = crate::shared::get_network_handle() {
net.add_trusted_peer_id(_peer_id);
}
}
crate::node::network::bsc_protocol::registry::spawn_evn_refresh_listener();
BscProtocolConnection::new(conn, rx, direction.is_outgoing(), 1, Some(_peer_id))
BscProtocolConnection::new(
conn,
rx,
direction.is_outgoing(),
1,
Some(_peer_id),
registry_conn_token,
)
}
}
107 changes: 74 additions & 33 deletions src/node/network/bsc_protocol/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,19 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use tokio::time::timeout;

/// Per-connection token for [`PeerRegistryEntry`]. Starts at 1; `0` means
/// "not registered" (e.g. poisoned lock during `register_peer`).
static PEER_CONN_TOKEN: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(1));

struct PeerRegistryEntry {
tx: UnboundedSender<BscCommand>,
/// Monotonic id for this BSC subprotocol session; used so `Drop` / failed
/// sends only remove the matching entry after same-`PeerId` reconnect.
conn_token: u64,
}

/// Global registry of active BSC protocol senders per peer.
static REGISTRY: Lazy<RwLock<HashMap<PeerId, UnboundedSender<BscCommand>>>> =
static REGISTRY: Lazy<RwLock<HashMap<PeerId, PeerRegistryEntry>>> =
Lazy::new(|| RwLock::new(HashMap::new()));

/// Optional background task handle for EVN post-sync peer refresh.
Expand All @@ -32,22 +43,53 @@ static EVN_REFRESH_TASK: Lazy<RwLock<Option<JoinHandle<()>>>> = Lazy::new(|| RwL
static PROXYED_PEER_IDS_MAP: Lazy<RwLock<HashSet<PeerId>>> =
Lazy::new(|| RwLock::new(HashSet::new()));

/// Register a new peer's sender channel.
pub fn register_peer(peer: PeerId, tx: UnboundedSender<BscCommand>) {
/// Register a new peer's sender channel. Returns a per-connection token for
/// [`unregister_peer_if_current`]; `0` if the registry lock was poisoned.
pub fn register_peer(peer: PeerId, tx: UnboundedSender<BscCommand>) -> u64 {
let tx_for_sync = tx.clone();
let mut inserted = false;
let guard = REGISTRY.write();
match guard {
Ok(mut g) => {
g.insert(peer, tx);
inserted = true;
}
let mut g = match REGISTRY.write() {
Ok(g) => g,
Err(e) => {
tracing::error!(target: "bsc::registry", error=%e, "Registry lock poisoned (register)");
tracing::error!(
target: "bsc::registry",
error=%e,
"Registry lock poisoned (register)"
);
return 0;
}
};
let conn_token = PEER_CONN_TOKEN.fetch_add(1, Ordering::Relaxed);
g.insert(
peer,
PeerRegistryEntry {
tx,
conn_token,
},
);
sync_pending_votes_to_peer(peer, tx_for_sync);
conn_token
}

/// Remove `peer` from the registry only if the stored session matches
/// `conn_token`. Safe across same-`PeerId` reconnects and replaced entries.
pub fn unregister_peer_if_current(peer: PeerId, conn_token: u64) {
if conn_token == 0 {
return;
}
if inserted {
sync_pending_votes_to_peer(peer, tx_for_sync);
let Ok(mut g) = REGISTRY.write() else {
return;
};
let remove = g
.get(&peer)
.is_some_and(|entry| entry.conn_token == conn_token);
if remove {
g.remove(&peer);
tracing::debug!(
target: "bsc::registry",
%peer,
conn_token,
"Unregistered BSC protocol peer (session ended)"
);
}
}

Expand Down Expand Up @@ -132,10 +174,10 @@ pub async fn request_blocks_by_range(
return Err(format!("invalid count {}", count));
}

let tx = {
let (tx, conn_token) = {
let guard = REGISTRY.read();
match guard {
Ok(g) => g.get(&peer).cloned(),
Ok(g) => g.get(&peer).map(|e| (e.tx.clone(), e.conn_token)),
Err(_) => None,
}
}
Expand All @@ -149,8 +191,13 @@ pub async fn request_blocks_by_range(
start_block_hash: start_hash,
count,
};
tx.send(BscCommand::GetBlocksByRange(packet, resp_tx))
.map_err(|_| "failed to send GetBlocksByRange command".to_string())?;
if tx
.send(BscCommand::GetBlocksByRange(packet, resp_tx))
.is_err()
{
unregister_peer_if_current(peer, conn_token);
return Err("failed to send GetBlocksByRange command".to_string());
}

match timeout(timeout_dur, resp_rx).await {
Ok(Ok(Ok(res))) => Ok(res),
Expand All @@ -166,8 +213,11 @@ pub fn broadcast_votes(votes: Vec<crate::consensus::parlia::vote::VoteEnvelope>)
tokio::spawn(async move {
let votes_arc = Arc::new(votes);
// Snapshot registry to avoid holding lock during await
let reg_snapshot: Vec<(PeerId, UnboundedSender<BscCommand>)> = match REGISTRY.read() {
Ok(guard) => guard.iter().map(|(p, tx)| (*p, tx.clone())).collect(),
let reg_snapshot: Vec<(PeerId, UnboundedSender<BscCommand>, u64)> = match REGISTRY.read() {
Ok(guard) => guard
.iter()
.map(|(p, e)| (*p, e.tx.clone(), e.conn_token))
.collect(),
Err(e) => {
tracing::error!(target: "bsc::registry", error=%e, "Registry lock poisoned (broadcast snapshot)");
return;
Expand Down Expand Up @@ -198,8 +248,8 @@ pub fn broadcast_votes(votes: Vec<crate::consensus::parlia::vote::VoteEnvelope>)
std::collections::HashMap::new()
};

let mut to_remove: Vec<PeerId> = Vec::new();
for (peer, tx) in reg_snapshot {
let mut to_remove: Vec<(PeerId, u64)> = Vec::new();
for (peer, tx, conn_token) in reg_snapshot {
let peer_best_td = peer_info_map.get(&peer).and_then(|info| info.best_td);
let allow = should_allow_vote_broadcast(
is_evn(&peer) || is_proxyed_peer(&peer),
Expand All @@ -223,21 +273,12 @@ pub fn broadcast_votes(votes: Vec<crate::consensus::parlia::vote::VoteEnvelope>)
tracing::trace!(target: "bsc::vote", peer=%peer, allow=allow, is_proxyed=is_proxyed_peer(&peer), "broadcast votes to peer");
if allow && tx.send(BscCommand::Votes(Arc::clone(&votes_arc))).is_err() {
tracing::trace!(target: "bsc::vote", peer=%peer, "failed to send votes to peer, remove from registry");
to_remove.push(peer);
to_remove.push((peer, conn_token));
}
}

if !to_remove.is_empty() {
match REGISTRY.write() {
Ok(mut guard) => {
for peer in to_remove {
guard.remove(&peer);
}
}
Err(e) => {
tracing::error!(target: "bsc::registry", error=%e, "Registry lock poisoned (cleanup)");
}
}
for (peer, conn_token) in to_remove {
unregister_peer_if_current(peer, conn_token);
}
});
}
Expand Down
16 changes: 16 additions & 0 deletions src/node/network/bsc_protocol/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ pub struct BscProtocolConnection {
proto_version: u64,
/// PeerId for this connection, if known
_peer_id: Option<PeerId>,
/// Token returned by [`crate::node::network::bsc_protocol::registry::register_peer`];
/// used to remove only this session's registry entry on drop.
registry_conn_token: u64,
/// Last time we pruned pending requests
last_prune: std::time::Instant,
}
Expand All @@ -70,6 +73,7 @@ impl BscProtocolConnection {
is_dialer: bool,
proto_version: u64,
peer_id: Option<PeerId>,
registry_conn_token: u64,
) -> Self {
let handshake_deadline = Some(Box::pin(tokio::time::sleep(HANDSHAKE_TIMEOUT)));
// Both sides should send initial capability in BSC protocol
Expand All @@ -89,6 +93,7 @@ impl BscProtocolConnection {
pending_range_reqs: HashMap::new(),
proto_version,
_peer_id: peer_id,
registry_conn_token,
last_prune: std::time::Instant::now(),
}
}
Expand Down Expand Up @@ -354,6 +359,17 @@ impl BscProtocolConnection {
}
}

impl Drop for BscProtocolConnection {
fn drop(&mut self) {
if let Some(peer) = self._peer_id {
crate::node::network::bsc_protocol::registry::unregister_peer_if_current(
peer,
self.registry_conn_token,
);
}
}
}

impl Stream for BscProtocolConnection {
type Item = BytesMut;

Expand Down