Skip to content

Commit 88421c6

Browse files
committed
ckb_network: prevent EventHandler::handle_error removed user configured public_addrs
Signed-off-by: Eval EXEC <[email protected]>
1 parent 8444466 commit 88421c6

File tree

1 file changed

+42
-15
lines changed

1 file changed

+42
-15
lines changed

network/src/network.rs

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use ckb_systemtime::{Duration, Instant};
2828
use ckb_util::{Condvar, Mutex, RwLock};
2929
use futures::{channel::mpsc::Sender, Future};
3030
use ipnetwork::IpNetwork;
31+
use p2p::multiaddr::MultiAddr;
3132
use p2p::{
3233
async_trait,
3334
builder::ServiceBuilder,
@@ -68,6 +69,33 @@ const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
6869
// After 5 minutes we consider this dial hang
6970
const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);
7071

72+
/// CKB node's public addresses:
73+
///
74+
/// This struct holds the public addresses of the CKB node, categorized by how they were obtained.
75+
pub struct PublicAddresses {
76+
/// Addresses explicitly configured by the user in the ckb.toml configuration file.
77+
/// These addresses are considered static and represent the node's intended public endpoints.
78+
configured: HashSet<MultiAddr>,
79+
80+
/// Addresses discovered dynamically at runtime through observing successful outbound connections.
81+
/// These addresses may change over time and are managed behind a `RwLock` to allow concurrent
82+
/// read access while providing exclusive write access for updates. Addresses that fail to connect
83+
/// are removed from this set.
84+
discovered: RwLock<HashSet<Multiaddr>>,
85+
}
86+
87+
impl PublicAddresses {
88+
fn new(configured: HashSet<MultiAddr>, discoverd: HashSet<Multiaddr>) -> Self {
89+
Self {
90+
configured,
91+
discovered: RwLock::new(discovered),
92+
}
93+
}
94+
fn iter(&self) -> impl Iterator<Item = &Multiaddr> {
95+
self.configured.iter().chain(self.discovered.read().iter())
96+
}
97+
}
98+
7199
/// The global shared state of the network module
72100
pub struct NetworkState {
73101
pub(crate) peer_registry: RwLock<PeerRegistry>,
@@ -77,7 +105,7 @@ pub struct NetworkState {
77105
dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
78106
/// Node public addresses,
79107
/// includes manually public addrs and remote peer observed addrs
80-
public_addrs: RwLock<HashSet<Multiaddr>>,
108+
public_addrs: PublicAddresses,
81109
pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
82110
local_private_key: secio::SecioKeyPair,
83111
local_peer_id: PeerId,
@@ -100,7 +128,7 @@ impl NetworkState {
100128
let local_private_key = config.fetch_private_key()?;
101129
let local_peer_id = local_private_key.peer_id();
102130
// set max score to public addresses
103-
let public_addrs: HashSet<Multiaddr> = config
131+
let configured_public_addrs: HashSet<Multiaddr> = config
104132
.listen_addresses
105133
.iter()
106134
.chain(config.public_addresses.iter())
@@ -115,6 +143,9 @@ impl NetworkState {
115143
}
116144
})
117145
.collect();
146+
147+
let discovered_public_addrs = HashSet::new();
148+
let public_addrs = PublicAddresses::new(configured_public_addrs, discovered_public_addrs);
118149
info!("Loading the peer store. This process may take a few seconds to complete.");
119150

120151
let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
@@ -135,7 +166,7 @@ impl NetworkState {
135166
bootnodes,
136167
peer_registry: RwLock::new(peer_registry),
137168
dialing_addrs: RwLock::new(HashMap::default()),
138-
public_addrs: RwLock::new(public_addrs),
169+
public_addrs,
139170
listened_addrs: RwLock::new(Vec::new()),
140171
pending_observed_addrs: RwLock::new(HashSet::default()),
141172
local_private_key,
@@ -334,12 +365,7 @@ impl NetworkState {
334365
}
335366

336367
pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
337-
self.public_addrs
338-
.read()
339-
.iter()
340-
.take(count)
341-
.cloned()
342-
.collect()
368+
self.public_addrs.iter().take(count).cloned().collect()
343369
}
344370

345371
pub(crate) fn connection_status(&self) -> ConnectionStatus {
@@ -388,7 +414,7 @@ impl NetworkState {
388414
trace!("Do not dial self: {:?}, {}", peer_id, addr);
389415
return false;
390416
}
391-
if self.public_addrs.read().contains(addr) {
417+
if self.public_addrs.iter().contains(addr) {
392418
trace!(
393419
"Do not dial listened address(self): {:?}, {}",
394420
peer_id,
@@ -502,7 +528,7 @@ impl NetworkState {
502528
pub(crate) fn try_dial_observed_addrs(&self, p2p_control: &ServiceControl) {
503529
let mut pending_observed_addrs = self.pending_observed_addrs.write();
504530
if pending_observed_addrs.is_empty() {
505-
let addrs = self.public_addrs.read();
531+
let addrs = self.public_addrs;
506532
if addrs.is_empty() {
507533
return;
508534
}
@@ -611,7 +637,8 @@ impl ServiceHandle for EventHandler {
611637
async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
612638
match error {
613639
ServiceError::DialerError { address, error } => {
614-
let mut public_addrs = self.network_state.public_addrs.write();
640+
let mut discovered_public_addrs =
641+
self.network_state.public_addrs.discovered.write();
615642

616643
match error {
617644
DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
@@ -620,7 +647,7 @@ impl ServiceHandle for EventHandler {
620647
debug!("dial observed address success: {:?}", address);
621648
if let Some(ip) = multiaddr_to_socketaddr(&address) {
622649
if is_reachable(ip.ip()) {
623-
public_addrs.insert(address);
650+
discovered_public_addrs.insert(address);
624651
}
625652
}
626653
return;
@@ -634,9 +661,9 @@ impl ServiceHandle for EventHandler {
634661
debug!("DialerError({}) {}", address, error);
635662
}
636663
}
637-
if public_addrs.remove(&address) {
664+
if discovered_public_addrs.remove(&address) {
638665
info!(
639-
"Dial {} failed, remove it from network_state.public_addrs",
666+
"Dial {} failed, remove it from network_state.public_addrs.discovered",
640667
address
641668
);
642669
}

0 commit comments

Comments
 (0)