Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 192 additions & 35 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::protocols::{
discovery::{DiscoveryAddressManager, DiscoveryProtocol},
feeler::Feeler,
identify::{Flags, IdentifyCallback, IdentifyProtocol},
penetration::{self, Penetration},
ping::PingHandler,
support_protocols::SupportProtocols,
};
Expand All @@ -25,6 +26,7 @@ use ckb_logger::{debug, error, info, trace, warn};
use ckb_spawn::Spawn;
use ckb_stop_handler::{broadcast_exit_signals, new_tokio_exit_rx, CancellationToken};
use ckb_systemtime::{Duration, Instant};
use ckb_types::{packed, prelude::*};
use ckb_util::{Condvar, Mutex, RwLock};
use futures::{channel::mpsc::Sender, Future};
use ipnetwork::IpNetwork;
Expand All @@ -33,7 +35,10 @@ use p2p::{
builder::ServiceBuilder,
bytes::Bytes,
context::{ServiceContext, SessionContext},
error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
error::{
DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind,
TransportErrorKind,
},
multiaddr::{Multiaddr, Protocol},
secio::{self, error::SecioError, PeerId, SecioKeyPair},
service::{
Expand Down Expand Up @@ -80,6 +85,14 @@ pub struct NetworkState {
/// includes manually public addrs and remote peer observed addrs
public_addrs: RwLock<HashSet<Multiaddr>>,
pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
/// All possible addresses.
/// Different with `public_addrs`:
/// - Include remote peer observed addresses which are failed to dial.
/// - Allow loopback addresses, private addresses, and so on.
/// - So, two peers behind a private NAT could be connected with each others.
pub(crate) possible_addrs: RwLock<HashSet<Multiaddr>>,
penetrated_addrs: RwLock<HashMap<PeerId, Instant>>,

local_private_key: secio::SecioKeyPair,
local_peer_id: PeerId,
pub(crate) bootnodes: Vec<Multiaddr>,
Expand Down Expand Up @@ -116,6 +129,7 @@ impl NetworkState {
}
})
.collect();
let possible_addrs = public_addrs.clone();
info!("Loading the peer store. This process may take a few seconds to complete.");

let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
Expand All @@ -140,6 +154,8 @@ impl NetworkState {
public_addrs: RwLock::new(public_addrs),
listened_addrs: RwLock::new(Vec::new()),
pending_observed_addrs: RwLock::new(HashSet::default()),
possible_addrs: RwLock::new(possible_addrs),
penetrated_addrs: RwLock::new(HashMap::default()),
local_private_key,
local_peer_id,
active: AtomicBool::new(true),
Expand Down Expand Up @@ -169,6 +185,7 @@ impl NetworkState {
}
})
.collect();
let possible_addrs = public_addrs.clone();
info!("Loading the peer store. This process may take a few seconds to complete.");
let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await);
let bootnodes = config.bootnodes();
Expand All @@ -189,6 +206,8 @@ impl NetworkState {
public_addrs: RwLock::new(public_addrs),
listened_addrs: RwLock::new(Vec::new()),
pending_observed_addrs: RwLock::new(HashSet::default()),
possible_addrs: RwLock::new(possible_addrs),
penetrated_addrs: RwLock::new(HashMap::default()),
local_private_key,
local_peer_id,
active: AtomicBool::new(true),
Expand Down Expand Up @@ -345,6 +364,15 @@ impl NetworkState {
.collect()
}

pub(crate) fn possible_addrs(&self, count: usize) -> Vec<Multiaddr> {
self.possible_addrs
.read()
.iter()
.take(count)
.cloned()
.collect()
}

pub(crate) fn connection_status(&self) -> ConnectionStatus {
self.peer_registry.read().connection_status()
}
Expand Down Expand Up @@ -542,13 +570,91 @@ impl NetworkState {
/// add observed address for identify protocol
pub(crate) fn add_observed_addrs(&self, iter: impl Iterator<Item = Multiaddr>) {
let mut pending_observed_addrs = self.pending_observed_addrs.write();
pending_observed_addrs.extend(iter)
pending_observed_addrs.extend(iter);
}

/// Add possible address for identify protocol
pub(crate) fn add_possible_addr(&self, addr: Multiaddr) {
let mut possible_addrs = self.possible_addrs.write();
possible_addrs.insert(addr);
}

/// Network message processing controller, default is true, if false, discard any received messages
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Acquire)
}

/// Try to connect to a peer which may behind firewalls or NAT routers.
pub(crate) fn try_penetrating(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
let peer_id = extract_peer_id(&addr);
if peer_id.is_none() {
warn!("Do not penetrate addr without peer id, addr: {}", addr);
return;
}
let to_peer_id = peer_id.as_ref().unwrap();
let from_peer_id = self.local_peer_id();
if from_peer_id == to_peer_id {
trace!("Do not penetrate self: {:?}, {}", peer_id, addr);
return;
}
if self.public_addrs.read().contains(&addr) {
trace!(
"Do not penetrate listened address(self): {:?}, {}",
peer_id,
addr
);
return;
}
if self
.peer_registry
.read()
.get_key_by_peer_id(to_peer_id)
.is_some()
{
trace!(
"Do not penetrate peer which in registry: {:?}, {}",
peer_id,
addr
);
return;
}
if let Some(last_penetrated) = self.penetrated_addrs.read().get(to_peer_id) {
if Instant::now().saturating_duration_since(*last_penetrated)
< penetration::PENETRATED_INTERVAL
{
trace!(
"Do not penetrate peer which already penetrated a moment ago: {:?}, {}",
peer_id,
addr
);
return;
}
}
let conn_req = {
let listen_addrs = {
let reader = self.possible_addrs(penetration::ADDRS_COUNT_LIMIT);
let iter = reader
.iter()
.map(Multiaddr::to_vec)
.map(|v| packed::Address::new_builder().bytes(v.pack()).build());
packed::AddressVec::new_builder().extend(iter).build()
};
let content = packed::ConnectionRequest::new_builder()
.from(from_peer_id.as_bytes().pack())
.to(to_peer_id.as_bytes().pack())
.ttl(penetration::MAX_TTL.into())
.listen_addrs(listen_addrs)
.build();
packed::PenetrationMessage::new_builder()
.set(content)
.build()
};
let proto_id = SupportProtocols::Penetration.protocol_id();
self.penetrated_addrs
.write()
.insert(to_peer_id.clone(), Instant::now());
let _ignore_result = p2p_control.try_broadcast(false, None, proto_id, conn_req.as_bytes());
}
}

/// Used to handle global events of tentacle, such as session open/close
Expand Down Expand Up @@ -622,16 +728,14 @@ impl ServiceHandle for EventHandler {
async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
match error {
ServiceError::DialerError { address, error } => {
let mut public_addrs = self.network_state.public_addrs.write();

match error {
DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
SecioError::ConnectSelf,
)) => {
debug!("dial observed address success: {:?}", address);
if let Some(ip) = multiaddr_to_socketaddr(&address) {
if is_reachable(ip.ip()) {
public_addrs.insert(address);
self.network_state.public_addrs.write().insert(address);
}
}
return;
Expand All @@ -641,11 +745,18 @@ impl ServiceHandle for EventHandler {
{
warn!("DialerError({}) {}", address, e);
}
DialerErrorKind::TransportError(TransportErrorKind::Io(e))
if e.kind() == std::io::ErrorKind::TimedOut =>
{
warn!("DialerError({}) {}, try penetrating", address, e);
self.network_state
.try_penetrating(&context.control().clone().into(), address.clone());
}
_ => {
debug!("DialerError({}) {}", address, error);
}
}
public_addrs.remove(&address);
self.network_state.public_addrs.write().remove(&address);
self.network_state.dial_failed(&address);
}
ServiceError::ProtocolError {
Expand Down Expand Up @@ -941,6 +1052,20 @@ impl NetworkService {
protocol_metas.push(disconnect_message_meta);
}

// Penetration protocol
if config
.support_protocols
.contains(&SupportProtocol::Penetration)
{
let feeler_meta = SupportProtocols::Penetration.build_meta_with_service_handle({
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. name typo, it should penetration_meta
  2. build_meta_with_service_handle is not set compress to protocol. does this protocol need to be compressed?

let network_state = Arc::clone(&network_state);
move || {
ProtocolHandle::Callback(Box::new(Penetration::new(Arc::clone(&network_state))))
}
});
protocol_metas.push(feeler_meta);
}

let mut service_builder = ServiceBuilder::default();
let yamux_config = YamuxConfig {
max_stream_count: protocol_metas.len(),
Expand Down Expand Up @@ -1425,35 +1550,8 @@ impl NetworkController {
proto_id: ProtocolId,
data: Bytes,
) -> Result<(), SendErrorKind> {
let now = Instant::now();
loop {
let target = target
.map(TargetSession::Single)
.unwrap_or(TargetSession::All);
let result = if quick {
self.p2p_control
.quick_filter_broadcast(target, proto_id, data.clone())
} else {
self.p2p_control
.filter_broadcast(target, proto_id, data.clone())
};
match result {
Ok(()) => {
return Ok(());
}
Err(SendErrorKind::WouldBlock) => {
if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
warn!("Broadcast message to {} timeout", proto_id);
return Err(SendErrorKind::WouldBlock);
}
thread::sleep(P2P_TRY_SEND_INTERVAL);
}
Err(err) => {
warn!("Broadcast message to {} failed: {:?}", proto_id, err);
return Err(err);
}
}
}
self.p2p_control
.try_broadcast(quick, target, proto_id, data)
}

/// Broadcast a message to all connected peers
Expand Down Expand Up @@ -1557,3 +1655,62 @@ pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
})
.unwrap_or(TransportType::Tcp)
}

pub(crate) trait ServiceControlExt {
fn try_broadcast(
&self,
quick: bool,
target: Option<SessionId>,
proto_id: ProtocolId,
data: Bytes,
) -> Result<(), SendErrorKind>;

fn try_forward(
&self,
session_id: SessionId,
proto_id: ProtocolId,
data: Bytes,
) -> Result<(), SendErrorKind> {
self.try_broadcast(false, Some(session_id), proto_id, data)
}
}

impl ServiceControlExt for ServiceControl {
fn try_broadcast(
&self,
quick: bool,
target: Option<SessionId>,
proto_id: ProtocolId,
data: Bytes,
) -> Result<(), SendErrorKind> {
let now = Instant::now();
loop {
let target = target
.map(TargetSession::Single)
.unwrap_or(TargetSession::All);
let result = if quick {
self.quick_filter_broadcast(target, proto_id, data.clone())
} else {
self.filter_broadcast(target, proto_id, data.clone())
};
match result {
Ok(()) => {
trace!("Broadcast message to {} success", proto_id);
return Ok(());
}
Err(SendErrorKind::WouldBlock) => {
if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
warn!("Broadcast message to {} timeout", proto_id);
return Err(SendErrorKind::WouldBlock);
}
trace!("Broadcast message to {} pending for an interval", proto_id);
thread::sleep(P2P_TRY_SEND_INTERVAL);
}
Err(err) => {
warn!("Broadcast message to {} failed: {:?}", proto_id, err);
return Err(err);
}
}
}
}
}
7 changes: 7 additions & 0 deletions network/src/protocols/identify/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub trait Callback: Clone + Send {
fn add_remote_listen_addrs(&mut self, session: &SessionContext, addrs: Vec<Multiaddr>);
/// Add our address observed by remote peer
fn add_observed_addr(&mut self, addr: Multiaddr, ty: SessionType) -> MisbehaveResult;
/// Add all possible address observed by remote peer
fn add_possible_addr(&mut self, addr: Multiaddr);
/// Report misbehavior
fn misbehave(&mut self, session: &SessionContext, kind: Misbehavior) -> MisbehaveResult;
}
Expand Down Expand Up @@ -165,6 +167,7 @@ impl<T: Callback> IdentifyProtocol<T> {
.remote_infos
.get_mut(&session.id)
.expect("RemoteInfo must exists");
self.callback.add_possible_addr(observed.clone());
let global_ip_only = self.global_ip_only;
if multiaddr_to_socketaddr(&observed)
.map(|socket_addr| socket_addr.ip())
Expand Down Expand Up @@ -575,6 +578,10 @@ impl Callback for IdentifyCallback {
MisbehaveResult::Continue
}

fn add_possible_addr(&mut self, addr: Multiaddr) {
self.network_state.add_possible_addr(addr);
}

fn misbehave(&mut self, session: &SessionContext, reason: Misbehavior) -> MisbehaveResult {
error!(
"IdentifyProtocol detects abnormal behavior, session: {:?}, reason: {:?}",
Expand Down
1 change: 1 addition & 0 deletions network/src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub(crate) mod disconnect_message;
pub(crate) mod discovery;
pub(crate) mod feeler;
pub(crate) mod identify;
pub(crate) mod penetration;
pub(crate) mod ping;
pub(crate) mod support_protocols;

Expand Down
Loading