Skip to content

Commit 4518a88

Browse files
committed
ckb_network: prevent EventHandler::handle_error removed user configured public_addrs
Signed-off-by: Eval EXEC <[email protected]>
1 parent 2d30744 commit 4518a88

File tree

1 file changed

+77
-12
lines changed

1 file changed

+77
-12
lines changed

network/src/network.rs

Lines changed: 77 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use ckb_systemtime::{Duration, Instant};
2828
use ckb_util::{Condvar, Mutex, RwLock};
2929
use futures::{Future, channel::mpsc::Sender};
3030
use ipnetwork::IpNetwork;
31+
use p2p::multiaddr::MultiAddr;
3132
use p2p::{
3233
SessionId, async_trait,
3334
builder::ServiceBuilder,
@@ -67,6 +68,54 @@ const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
6768
// After 5 minutes we consider this dial hang
6869
const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);
6970

71+
/// CKB node's public addresses:
72+
///
73+
/// This struct holds the public addresses of the CKB node, categorized by how they were obtained.
74+
pub struct PublicAddresses {
75+
/// Addresses explicitly configured by the user in the ckb.toml configuration file.
76+
/// These addresses are considered static and represent the node's intended public endpoints.
77+
configured: HashSet<MultiAddr>,
78+
79+
/// Addresses discovered dynamically at runtime through observing successful outbound connections.
80+
/// These addresses may change over time and are managed behind a `RwLock` to allow concurrent
81+
/// read access while providing exclusive write access for updates. Addresses that fail to connect
82+
/// are removed from this set.
83+
discovered: RwLock<HashSet<Multiaddr>>,
84+
}
85+
86+
impl PublicAddresses {
87+
fn new(configured: HashSet<MultiAddr>, discovered: HashSet<Multiaddr>) -> Self {
88+
Self {
89+
configured,
90+
discovered: RwLock::new(discovered),
91+
}
92+
}
93+
94+
fn all(&self) -> Vec<MultiAddr> {
95+
self.configured
96+
.iter()
97+
.chain(self.discovered.read().iter())
98+
.cloned()
99+
.collect()
100+
}
101+
102+
fn contains(&self, addr: &MultiAddr) -> bool {
103+
self.discovered.read().contains(addr) || self.configured.contains(addr)
104+
}
105+
106+
fn count(&self) -> usize {
107+
self.configured.len() + self.discovered.read().len()
108+
}
109+
110+
fn random_choose(&self) -> Option<MultiAddr> {
111+
let addrs = self.all();
112+
if addrs.is_empty() {
113+
return None;
114+
}
115+
addrs.into_iter().choose(&mut rand::thread_rng())
116+
}
117+
}
118+
70119
/// The global shared state of the network module
71120
pub struct NetworkState {
72121
pub(crate) peer_registry: RwLock<PeerRegistry>,
@@ -76,7 +125,7 @@ pub struct NetworkState {
76125
dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
77126
/// Node public addresses,
78127
/// includes manually public addrs and remote peer observed addrs
79-
public_addrs: RwLock<HashSet<Multiaddr>>,
128+
public_addrs: PublicAddresses,
80129
pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
81130
local_private_key: secio::SecioKeyPair,
82131
local_peer_id: PeerId,
@@ -99,7 +148,7 @@ impl NetworkState {
99148
let local_private_key = config.fetch_private_key()?;
100149
let local_peer_id = local_private_key.peer_id();
101150
// set max score to public addresses
102-
let public_addrs: HashSet<Multiaddr> = config
151+
let configured_public_addrs: HashSet<Multiaddr> = config
103152
.listen_addresses
104153
.iter()
105154
.chain(config.public_addresses.iter())
@@ -114,6 +163,9 @@ impl NetworkState {
114163
}
115164
})
116165
.collect();
166+
167+
let discovered_public_addrs = HashSet::new();
168+
let public_addrs = PublicAddresses::new(configured_public_addrs, discovered_public_addrs);
117169
info!("Loading the peer store. This process may take a few seconds to complete.");
118170

119171
let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
@@ -134,7 +186,7 @@ impl NetworkState {
134186
bootnodes,
135187
peer_registry: RwLock::new(peer_registry),
136188
dialing_addrs: RwLock::new(HashMap::default()),
137-
public_addrs: RwLock::new(public_addrs),
189+
public_addrs,
138190
listened_addrs: RwLock::new(Vec::new()),
139191
pending_observed_addrs: RwLock::new(HashSet::default()),
140192
local_private_key,
@@ -334,7 +386,7 @@ impl NetworkState {
334386

335387
pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
336388
self.public_addrs
337-
.read()
389+
.all()
338390
.iter()
339391
.take(count)
340392
.cloned()
@@ -387,7 +439,7 @@ impl NetworkState {
387439
trace!("Do not dial self: {:?}, {}", peer_id, addr);
388440
return false;
389441
}
390-
if self.public_addrs.read().contains(addr) {
442+
if self.public_addrs.contains(addr) {
391443
trace!(
392444
"Do not dial listened address(self): {:?}, {}",
393445
peer_id, addr
@@ -499,12 +551,12 @@ impl NetworkState {
499551
pub(crate) fn try_dial_observed_addrs(&self, p2p_control: &ServiceControl) {
500552
let mut pending_observed_addrs = self.pending_observed_addrs.write();
501553
if pending_observed_addrs.is_empty() {
502-
let addrs = self.public_addrs.read();
503-
if addrs.is_empty() {
554+
let addrs = &self.public_addrs;
555+
if addrs.count() == 0 {
504556
return;
505557
}
506558
// random get addr
507-
if let Some(addr) = addrs.iter().choose(&mut rand::thread_rng()) {
559+
if let Some(addr) = addrs.random_choose() {
508560
if let Err(err) = p2p_control.dial(
509561
addr.clone(),
510562
TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
@@ -608,7 +660,11 @@ impl ServiceHandle for EventHandler {
608660
async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
609661
match error {
610662
ServiceError::DialerError { address, error } => {
611-
let mut public_addrs = self.network_state.public_addrs.write();
663+
let mut discovered_public_addrs =
664+
self.network_state.public_addrs.discovered.write();
665+
666+
let mut user_configured_public_addrs =
667+
self.network_state.public_addrs.configured.write();
612668

613669
match error {
614670
DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
@@ -617,7 +673,7 @@ impl ServiceHandle for EventHandler {
617673
debug!("dial observed address success: {:?}", address);
618674
if let Some(ip) = multiaddr_to_socketaddr(&address) {
619675
if is_reachable(ip.ip()) {
620-
public_addrs.insert(address);
676+
discovered_public_addrs.insert(address);
621677
}
622678
}
623679
return;
@@ -631,9 +687,18 @@ impl ServiceHandle for EventHandler {
631687
debug!("DialerError({}) {}", address, error);
632688
}
633689
}
634-
if public_addrs.remove(&address) {
690+
691+
if user_configured_public_addrs.contains(&address) {
692+
// don't remove the public_addr, sicne its user configred in ckb.toml
693+
warn!(
694+
"Dial the user configured public addr: {} failed, keep it.",
695+
address
696+
);
697+
}
698+
699+
if discovered_public_addrs.remove(&address) {
635700
info!(
636-
"Dial {} failed, remove it from network_state.public_addrs",
701+
"Dial {} failed, remove it from network_state.public_addrs.discovered",
637702
address
638703
);
639704
}

0 commit comments

Comments
 (0)