diff --git a/Cargo.lock b/Cargo.lock index 4513ec8d74..cda7e16567 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,12 +27,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "ahash" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217" - [[package]] name = "ahash" version = "0.7.8" @@ -1368,6 +1362,7 @@ dependencies = [ "criterion", "faster-hex", "futures", + "governor", "hickory-resolver", "idb", "ipnetwork", @@ -2414,12 +2409,16 @@ dependencies = [ [[package]] name = "dashmap" -version = "4.0.2" +version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ "cfg-if", - "num_cpus", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core 0.9.10", ] [[package]] @@ -2967,8 +2966,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.13.3+wasi-0.2.2", + "wasm-bindgen", "windows-targets 0.52.6", ] @@ -3053,19 +3054,24 @@ dependencies = [ [[package]] name = "governor" -version = "0.3.2" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06c5d2f987ee8f6dff3fa1a352058dc59b990e447e4c7846aa7d804971314f7b" +checksum = "3cbe789d04bf14543f03c4b60cd494148aa79438c8440ae7d81a7778147745c3" dependencies = [ - "dashmap", - "futures", + "cfg-if", + "futures-sink", "futures-timer", - "no-std-compat", + "futures-util", + "getrandom 0.3.1", + "hashbrown 0.15.2", "nonzero_ext", - "parking_lot 0.11.2", + "parking_lot 0.12.3", + "portable-atomic", "quanta", - "rand 0.8.5", + "rand 0.9.0", "smallvec", + "spinning_top", + "web-time", ] [[package]] @@ -3097,16 +3103,6 @@ dependencies = [ "crunchy", ] -[[package]] -name = "hashbrown" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91b62f79061a0bc2e046024cb7ba44b08419ed238ecbd9adbd787434b9e8c25" -dependencies = [ - "ahash 0.3.8", - "autocfg", -] - [[package]] name = "hashbrown" version = "0.12.3" @@ -3122,6 +3118,12 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.2" @@ -4205,7 +4207,6 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" dependencies = [ - "hashbrown 0.8.2", "spin 0.5.2", ] @@ -4227,9 +4228,9 @@ dependencies = [ [[package]] name = "nonzero_ext" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44a1290799eababa63ea60af0cbc3f03363e328e58f32fb0294798ed3e85f444" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" [[package]] name = "num-bigint" @@ -4966,11 +4967,16 @@ checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" [[package]] name = "quanta" -version = "0.4.1" +version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d98dc777a7a39b76b1a26ae9d3f691f4c1bc0455090aa0b64dfa8cb7fc34c135" +checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e" dependencies = [ + "crossbeam-utils", "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", "winapi", ] @@ -5119,6 +5125,15 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "raw-cpuid" +version = "11.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146" +dependencies = [ + "bitflags 2.9.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -5907,6 +5922,15 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.7.3" diff --git a/Cargo.toml b/Cargo.toml index 1c5fc8237a..f995050f76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -201,7 +201,7 @@ crossbeam = "0.8.2" crossbeam-channel = "0.5.1" ctrlc = "3.1" daggy = "0.8.0" -dashmap = "4.0" +dashmap = "6.0" derive_more = { version = "1", default-features = false } eaglesong = "0.1" env_logger = "0.10" @@ -213,7 +213,11 @@ fs2 = "0.4.3" futures = "0.3" futures-util = "0.3.21" golomb-coded-set = "0.2.0" -governor = "0.3.1" +governor = { version = "0.10", default-features = false, features = [ + "std", + "jitter", + "quanta", +] } hex = "0.4" hickory-resolver = "0.24.2" http-body-util = "0.1" diff --git a/network/Cargo.toml b/network/Cargo.toml index 244b1d76c2..62248f3b1c 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -46,11 +46,10 @@ p2p = { workspace = true, default-features = false, features = [ "ws", ] } socket2 = "0.5" +governor.workspace = true [target.'cfg(target_family = "wasm")'.dependencies] -p2p = { workspace = true, default-features = false, features = [ - "wasm-timer", -] } +p2p = { workspace = true, default-features = false, features = ["wasm-timer"] } idb = "0.6" serde-wasm-bindgen = "0.6.5" diff --git a/network/src/network.rs b/network/src/network.rs index 334f657b1e..2c9cb05551 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -74,10 +74,9 @@ pub struct NetworkState { /// Node listened addresses pub(crate) listened_addrs: RwLock>, dialing_addrs: RwLock>, - /// Node public addresses, - /// includes manually public addrs and remote peer observed addrs - public_addrs: RwLock>, - pending_observed_addrs: RwLock>, + /// Node public addresses by config + public_addrs: HashSet, + observed_addrs: RwLock>, local_private_key: secio::SecioKeyPair, local_peer_id: PeerId, pub(crate) bootnodes: Vec, @@ -139,9 +138,9 @@ impl NetworkState { bootnodes, peer_registry: RwLock::new(peer_registry), dialing_addrs: RwLock::new(HashMap::default()), - public_addrs: RwLock::new(public_addrs), + public_addrs, listened_addrs: RwLock::new(Vec::new()), - pending_observed_addrs: RwLock::new(HashSet::default()), + observed_addrs: RwLock::new(HashMap::default()), local_private_key, local_peer_id, active: AtomicBool::new(true), @@ -187,9 +186,9 @@ impl NetworkState { bootnodes, peer_registry: RwLock::new(peer_registry), dialing_addrs: RwLock::new(HashMap::default()), - public_addrs: RwLock::new(public_addrs), + public_addrs, listened_addrs: RwLock::new(Vec::new()), - pending_observed_addrs: RwLock::new(HashSet::default()), + observed_addrs: RwLock::new(HashMap::default()), local_private_key, local_peer_id, active: AtomicBool::new(true), @@ -338,12 +337,14 @@ impl NetworkState { } pub(crate) fn public_addrs(&self, count: usize) -> Vec { - self.public_addrs - .read() - .iter() - .take(count) - .cloned() - .collect() + if self.public_addrs.len() <= count { + return self.public_addrs.iter().cloned().collect(); + } else { + self.public_addrs + .iter() + .cloned() + .choose_multiple(&mut rand::thread_rng(), count) + } } pub(crate) fn connection_status(&self) -> ConnectionStatus { @@ -392,7 +393,7 @@ impl NetworkState { trace!("Do not dial self: {:?}, {}", peer_id, addr); return false; } - if self.public_addrs.read().contains(addr) { + if self.public_addrs.contains(addr) { trace!( "Do not dial listened address(self): {:?}, {}", peer_id, addr @@ -500,40 +501,27 @@ impl NetworkState { } } - /// this method is intent to check observed addr by dial to self - pub(crate) fn try_dial_observed_addrs(&self, p2p_control: &ServiceControl) { - let mut pending_observed_addrs = self.pending_observed_addrs.write(); - if pending_observed_addrs.is_empty() { - let addrs = self.public_addrs.read(); - if addrs.is_empty() { - return; - } - // random get addr - if let Some(addr) = addrs.iter().choose(&mut rand::thread_rng()) { - if let Err(err) = p2p_control.dial( - addr.clone(), - TargetProtocol::Single(SupportProtocols::Identify.protocol_id()), - ) { - trace!("try_dial_observed_addrs {err} failed in public address") - } - } - } else { - for addr in pending_observed_addrs.drain() { - trace!("try dial observed addr: {:?}", addr); - if let Err(err) = p2p_control.dial( - addr, - TargetProtocol::Single(SupportProtocols::Identify.protocol_id()), - ) { - trace!("try_dial_observed_addrs {err} failed in pending observed addresses") - } - } - } + /// add observed address for identify protocol + pub(crate) fn add_observed_addr(&self, session_id: SessionId, addr: Multiaddr) { + let mut pending_observed_addrs = self.observed_addrs.write(); + pending_observed_addrs.insert(session_id, addr); } - /// add observed address for identify protocol - pub(crate) fn add_observed_addrs(&self, iter: impl Iterator) { - let mut pending_observed_addrs = self.pending_observed_addrs.write(); - pending_observed_addrs.extend(iter) + // randomly select count addresses from observed_addrs + pub(crate) fn observed_addrs(&self, count: usize) -> Vec { + let observed_addrs = self + .observed_addrs + .read() + .values() + .cloned() + .collect::>(); + if observed_addrs.len() <= count { + return observed_addrs.into_iter().collect(); + } else { + observed_addrs + .into_iter() + .choose_multiple(&mut rand::thread_rng(), count) + } } /// Network message processing controller, default is true, if false, discard any received messages @@ -613,19 +601,11 @@ impl ServiceHandle for EventHandler { async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) { match error { ServiceError::DialerError { address, error } => { - let mut public_addrs = self.network_state.public_addrs.write(); - match error { DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError( SecioError::ConnectSelf, )) => { debug!("dial observed address success: {:?}", address); - if let Some(ip) = multiaddr_to_socketaddr(&address) { - if is_reachable(ip.ip()) { - public_addrs.insert(address); - } - } - return; } DialerErrorKind::IoError(e) if e.kind() == std::io::ErrorKind::AddrNotAvailable => @@ -636,12 +616,6 @@ impl ServiceHandle for EventHandler { debug!("DialerError({}) {}", address, error); } } - if public_addrs.remove(&address) { - info!( - "Dial {} failed, remove it from network_state.public_addrs", - address - ); - } self.network_state.dial_failed(&address); } ServiceError::ProtocolError { @@ -815,6 +789,10 @@ impl ServiceHandle for EventHandler { peer_store.remove_disconnected_peer(&session_context.address); }); } + self.network_state + .observed_addrs + .write() + .remove(&session_context.id); } _ => { info!("p2p service event: {:?}", event); @@ -937,6 +915,22 @@ impl NetworkService { protocol_metas.push(disconnect_message_meta); } + // HolePunching protocol + #[cfg(not(target_family = "wasm"))] + if config + .support_protocols + .contains(&SupportProtocol::HolePunching) + { + let hole_punching_state = Arc::clone(&network_state); + let hole_punching_meta = + SupportProtocols::HolePunching.build_meta_with_service_handle(move || { + ProtocolHandle::Callback(Box::new( + crate::protocols::hole_punching::HolePunching::new(hole_punching_state), + )) + }); + protocol_metas.push(hole_punching_meta); + } + let mut service_builder = ServiceBuilder::default(); let yamux_config = YamuxConfig { max_stream_count: protocol_metas.len(), diff --git a/network/src/peer_store/peer_store_impl.rs b/network/src/peer_store/peer_store_impl.rs index 3b2f1f9f0d..cecf01da22 100644 --- a/network/src/peer_store/peer_store_impl.rs +++ b/network/src/peer_store/peer_store_impl.rs @@ -217,6 +217,32 @@ impl PeerStore { self.addr_manager.fetch_random(count, filter) } + /// Return address that we never connected to, used for hole punching. + pub fn fetch_nat_addrs(&mut self, count: usize, required_flags: Flags) -> Vec { + // Get info: + // 1. Never connected + // 2. Not already connected + // 3. Ip4 / Ip6 address only + + let peers = &self.connected_peers; + + let filter = |peer_addr: &AddrInfo| { + required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags)) + && extract_peer_id(&peer_addr.addr) + .map(|peer_id| !peers.contains_key(&peer_id)) + .unwrap_or_default() + && peer_addr.addr.iter().any(|p| { + matches!( + p, + p2p::multiaddr::Protocol::Ip4(_) | p2p::multiaddr::Protocol::Ip6(_) + ) + }) + && peer_addr.last_connected_at_ms == 0 + }; + + self.addr_manager.fetch_random(count, filter) + } + /// Return valid addrs that success connected, used for discovery. pub fn fetch_random_addrs(&mut self, count: usize, required_flags: Flags) -> Vec { // Get info: diff --git a/network/src/protocols/hole_punching/component/connection_request.rs b/network/src/protocols/hole_punching/component/connection_request.rs new file mode 100644 index 0000000000..88203b940b --- /dev/null +++ b/network/src/protocols/hole_punching/component/connection_request.rs @@ -0,0 +1,308 @@ +use std::borrow::Cow; + +use ckb_logger::debug; +use ckb_systemtime::unix_time_as_millis; +use ckb_types::{packed, prelude::*}; +use p2p::{ + multiaddr::{Multiaddr, Protocol}, + service::{ServiceAsyncControl, TargetSession}, + utils::{TransportType, extract_peer_id, find_type}, +}; + +use crate::{ + PeerId, PeerIndex, + protocols::{ + SupportProtocols, + hole_punching::{ + ADDRS_COUNT_LIMIT, HOLE_PUNCHING_INTERVAL, HolePunching, MAX_HOPS, + component::{forward_request, init_delivered}, + status::{Status, StatusCode}, + }, + }, +}; + +struct RequestContent { + from: PeerId, + to: PeerId, + listen_addrs: Vec, + route: Vec, + max_hops: u8, +} + +impl TryFrom<&packed::ConnectionRequestReader<'_>> for RequestContent { + type Error = Status; + + fn try_from(value: &packed::ConnectionRequestReader<'_>) -> Result { + let from = PeerId::from_bytes(value.from().raw_data().to_vec()).map_err(|_| { + StatusCode::InvalidFromPeerId.with_context("the from peer id is invalid") + })?; + let to = PeerId::from_bytes(value.to().raw_data().to_vec()) + .map_err(|_| StatusCode::InvalidToPeerId.with_context("the to peer id is invalid"))?; + let listen_addrs: Vec = value + .listen_addrs() + .iter() + .map( + |raw| match Multiaddr::try_from(raw.bytes().raw_data().to_vec()) { + Ok(mut addr) => { + if let Some(peer_id) = extract_peer_id(&addr) { + if peer_id != from { + return Err(StatusCode::InvalidListenAddrLen + .with_context("peer id in listen address is invalid")); + } + } else { + addr.push(Protocol::P2P(Cow::Borrowed(from.as_bytes()))); + } + Ok(addr) + } + Err(_) => Err(StatusCode::InvalidListenAddrLen + .with_context("the listen address is invalid")), + }, + ) + .collect::, _>>()?; + + let route: Vec = value + .route() + .iter() + .map(|raw| { + PeerId::from_bytes(raw.raw_data().to_vec()).map_err(|_| { + StatusCode::InvalidRoute.with_context("the route peer id is invalid") + }) + }) + .collect::, _>>()?; + + let max_hops: u8 = value.max_hops().into(); + + Ok(Self { + from, + to, + listen_addrs, + route, + max_hops, + }) + } +} + +pub(crate) struct ConnectionRequestProcess<'a> { + message: packed::ConnectionRequestReader<'a>, + protocol: &'a mut HolePunching, + peer: PeerIndex, + p2p_control: &'a ServiceAsyncControl, + msg_item_id: u32, +} + +impl<'a> ConnectionRequestProcess<'a> { + pub(crate) fn new( + message: packed::ConnectionRequestReader<'a>, + protocol: &'a mut HolePunching, + peer: PeerIndex, + p2p_control: &'a ServiceAsyncControl, + msg_item_id: u32, + ) -> Self { + Self { + message, + protocol, + peer, + p2p_control, + msg_item_id, + } + } + + pub(crate) async fn execute(mut self) -> Status { + let content = match RequestContent::try_from(&self.message) { + Ok(content) => content, + Err(status) => return status, + }; + if content.listen_addrs.len() > ADDRS_COUNT_LIMIT || content.listen_addrs.is_empty() { + return StatusCode::InvalidListenAddrLen + .with_context("the listen address count is too large or empty"); + } + + if content.max_hops > MAX_HOPS { + return StatusCode::InvalidMaxTTL.into(); + } + if content.route.len() > MAX_HOPS as usize { + return StatusCode::InvalidRoute.with_context("the route length is too long"); + } + + let self_peer_id = self.protocol.network_state.local_peer_id(); + if content.route.contains(self_peer_id) { + return StatusCode::Ignore.with_context("the message is passed, ignore it"); + } + + if self + .protocol + .forward_rate_limiter + .check_key(&(content.from.clone(), content.to.clone(), self.msg_item_id)) + .is_err() + { + debug!( + "from: {}, to {}, item_name: {}, rate limit is reached", + content.from, content.to, "ConnectionRequest", + ); + return StatusCode::TooManyRequests.with_context("ConnectionRequest"); + } + + if self_peer_id == &content.to { + self.respond_delivered(content.from, &content.to, content.listen_addrs) + .await + } else if content.max_hops == 0u8 { + StatusCode::ReachedMaxHops.into() + } else { + self.forward_message(self_peer_id, &content.to).await + } + } + + async fn respond_delivered( + &mut self, + from_peer_id: PeerId, + to_peer_id: &PeerId, + remote_listens: Vec, + ) -> Status { + if let Some((_, t)) = self.protocol.pending_delivered.get(&from_peer_id) { + let now = unix_time_as_millis(); + if now - t < HOLE_PUNCHING_INTERVAL { + return StatusCode::Ignore + .with_context("a same message is already replied in a moment ago"); + } + } + let listen_addrs = { + let public_addr = self.protocol.network_state.public_addrs(ADDRS_COUNT_LIMIT); + if public_addr.len() < ADDRS_COUNT_LIMIT { + let observed_addrs = self + .protocol + .network_state + .observed_addrs(ADDRS_COUNT_LIMIT - public_addr.len()); + let iter = public_addr + .iter() + .chain(observed_addrs.iter()) + .map(Multiaddr::to_vec) + .map(|v| packed::Address::new_builder().bytes(v.pack()).build()); + packed::AddressVec::new_builder().extend(iter).build() + } else { + let iter = public_addr + .iter() + .map(Multiaddr::to_vec) + .map(|v| packed::Address::new_builder().bytes(v.pack()).build()); + packed::AddressVec::new_builder().extend(iter).build() + } + }; + let content = init_delivered(self.message, listen_addrs); + let new_message = packed::HolePunchingMessage::new_builder() + .set(content) + .build() + .as_bytes(); + let proto_id = SupportProtocols::HolePunching.protocol_id(); + + let remote_listens: Vec = remote_listens + .into_iter() + .filter_map(|addr| match find_type(&addr) { + TransportType::Memory + | TransportType::Onion + | TransportType::Ws + | TransportType::Wss + | TransportType::Tls => None, + TransportType::Tcp => { + if addr + .iter() + .any(|p| matches!(p, Protocol::Ip4(_) | Protocol::Ip6(_))) + { + Some(addr) + } else { + None + } + } + }) + .collect(); + + if remote_listens.is_empty() { + return StatusCode::Ignore.with_context("remote listen address is empty"); + } + + debug!( + "current peer is the target peer {}, send a response back", + to_peer_id + ); + + if let Err(error) = self + .p2p_control + .send_message_to(self.peer, proto_id, new_message) + .await + { + return StatusCode::ForwardError.with_context(error); + } + + let now = unix_time_as_millis(); + self.protocol + .pending_delivered + .insert(from_peer_id, (remote_listens, now)); + + Status::ok() + } + + async fn forward_message(&self, self_peer_id: &PeerId, to_peer_id: &PeerId) -> Status { + let content = forward_request(self.message, self_peer_id); + let new_message = packed::HolePunchingMessage::new_builder() + .set(content) + .build() + .as_bytes(); + let proto_id = SupportProtocols::HolePunching.protocol_id(); + + let target_sid = self + .protocol + .network_state + .peer_registry + .read() + .get_key_by_peer_id(to_peer_id); + + match target_sid { + Some(to_peer) => { + debug!( + "target peer {} is found, forward the request to it", + to_peer_id + ); + if let Err(error) = self + .p2p_control + .send_message_to(to_peer, proto_id, new_message) + .await + { + StatusCode::ForwardError.with_context(error) + } else { + Status::ok() + } + } + None => { + debug!( + "target peer {} is not found, broadcast the request to more peers", + to_peer_id + ); + + // Broadcast to a number of nodes equal to the square root of the total connection count using gossip. + let sid = self.peer; + let mut total = self + .protocol + .network_state + .with_peer_registry(|p| p.peers().len()) + .isqrt(); + if let Err(error) = self + .p2p_control + .filter_broadcast( + TargetSession::Filter(Box::new(move |id| { + if id == &sid { + return false; + } + total = total.saturating_sub(1); + total != 0 + })), + proto_id, + new_message, + ) + .await + { + StatusCode::BroadcastError.with_context(error) + } else { + Status::ok() + } + } + } + } +} diff --git a/network/src/protocols/hole_punching/component/connection_request_delivered.rs b/network/src/protocols/hole_punching/component/connection_request_delivered.rs new file mode 100644 index 0000000000..32e851aac4 --- /dev/null +++ b/network/src/protocols/hole_punching/component/connection_request_delivered.rs @@ -0,0 +1,279 @@ +use std::{borrow::Cow, net::SocketAddr}; + +use ckb_logger::debug; +use ckb_systemtime::unix_time_as_millis; +use ckb_types::{packed, prelude::*}; +use futures::future::select_ok; +use p2p::{ + multiaddr::{Multiaddr, Protocol}, + runtime, + service::{RawSessionInfo, ServiceAsyncControl, TargetProtocol}, + utils::{TransportType, extract_peer_id, find_type}, +}; + +use crate::{ + PeerId, PeerIndex, + protocols::{ + SupportProtocols, + hole_punching::{ + ADDRS_COUNT_LIMIT, HolePunching, MAX_HOPS, + component::{forward_delivered, init_sync, try_nat_traversal}, + status::{Status, StatusCode}, + }, + }, +}; + +struct DeliverdContent { + from: PeerId, + to: PeerId, + route: Vec, + listen_addrs: Vec, + sync_route: Vec, +} + +impl TryFrom<&packed::ConnectionRequestDeliveredReader<'_>> for DeliverdContent { + type Error = Status; + + fn try_from(value: &packed::ConnectionRequestDeliveredReader<'_>) -> Result { + let from = PeerId::from_bytes(value.from().raw_data().to_vec()).map_err(|_| { + StatusCode::InvalidFromPeerId.with_context("the from peer id is invalid") + })?; + let to = PeerId::from_bytes(value.to().raw_data().to_vec()) + .map_err(|_| StatusCode::InvalidToPeerId.with_context("the to peer id is invalid"))?; + let route = value + .route() + .iter() + .map(|peer_id| { + PeerId::from_bytes(peer_id.raw_data().to_vec()) + .map_err(|_| StatusCode::InvalidRoute) + }) + .collect::, _>>()?; + let listen_addrs = value + .listen_addrs() + .iter() + .map( + |raw| match Multiaddr::try_from(raw.bytes().raw_data().to_vec()) { + Ok(mut addr) => { + if let Some(peer_id) = extract_peer_id(&addr) { + if peer_id != to { + return Err(StatusCode::InvalidListenAddrLen + .with_context("peer id in listen address is invalid")); + } + } else { + addr.push(Protocol::P2P(Cow::Borrowed(to.as_bytes()))); + } + Ok(addr) + } + Err(_) => Err(StatusCode::InvalidListenAddrLen + .with_context("the listen address is invalid")), + }, + ) + .collect::, _>>()?; + + let sync_route = value + .sync_route() + .iter() + .map(|peer_id| { + PeerId::from_bytes(peer_id.raw_data().to_vec()) + .map_err(|_| StatusCode::InvalidRoute) + }) + .collect::, _>>()?; + + Ok(DeliverdContent { + from, + to, + route, + listen_addrs, + sync_route, + }) + } +} + +pub struct ConnectionRequestDeliveredProcess<'a> { + message: packed::ConnectionRequestDeliveredReader<'a>, + protocol: &'a mut HolePunching, + p2p_control: &'a ServiceAsyncControl, + peer: PeerIndex, + bind_addr: Option, + msg_item_id: u32, +} + +impl<'a> ConnectionRequestDeliveredProcess<'a> { + pub(crate) fn new( + message: packed::ConnectionRequestDeliveredReader<'a>, + protocol: &'a mut HolePunching, + p2p_control: &'a ServiceAsyncControl, + peer: PeerIndex, + bind_addr: Option, + msg_item_id: u32, + ) -> Self { + Self { + message, + protocol, + p2p_control, + bind_addr, + peer, + msg_item_id, + } + } + + pub(crate) async fn execute(self) -> Status { + let content = match DeliverdContent::try_from(&self.message) { + Ok(content) => content, + Err(status) => return status, + }; + if content.listen_addrs.len() > ADDRS_COUNT_LIMIT || content.listen_addrs.is_empty() { + return StatusCode::InvalidListenAddrLen + .with_context("the listen address count is too large or empty"); + } + + if content.route.len() > MAX_HOPS as usize || content.sync_route.len() > MAX_HOPS as usize { + return StatusCode::InvalidRoute.with_context("the route length is too long"); + } + + if self + .protocol + .forward_rate_limiter + .check_key(&(content.from.clone(), content.to.clone(), self.msg_item_id)) + .is_err() + { + debug!( + "from: {}, to {}, item_name: {}, rate limit is reached", + content.from, content.to, "ConnectionRequestDelivered", + ); + return StatusCode::TooManyRequests.with_context("ConnectionRequestDelivered"); + } + + match content.route.last() { + Some(next_peer_id) => self.forward_delivered(next_peer_id).await, + None => { + let self_peer_id = self.protocol.network_state.local_peer_id(); + if self_peer_id != &content.from { + // forward the message to the `from` peer + self.forward_delivered(&content.from).await + } else { + // the current peer is the target peer, respond the sync back + let request_start = self.protocol.inflight_requests.remove(&content.to); + + match request_start { + Some(start) => { + let res = self.respond_sync(content.from).await; + if !res.is_ok() { + return res; + } + let now = unix_time_as_millis(); + let ttl = now - start; + + self.try_nat_traversal(ttl, content.listen_addrs); + + Status::ok() + } + None => StatusCode::Ignore.with_context("the request is not in flight"), + } + } + } + } + } + + async fn forward_delivered(&self, peer_id: &PeerId) -> Status { + let target_sid = self + .protocol + .network_state + .peer_registry + .read() + .get_key_by_peer_id(peer_id); + match target_sid { + Some(next_peer) => { + let content = forward_delivered(self.message); + let new_message = packed::HolePunchingMessage::new_builder() + .set(content) + .build() + .as_bytes(); + let proto_id = SupportProtocols::HolePunching.protocol_id(); + debug!( + "forward the delivery to next peer {} (id: {})", + next_peer, peer_id + ); + if let Err(error) = self + .p2p_control + .send_message_to(next_peer, proto_id, new_message) + .await + { + StatusCode::ForwardError.with_context(error) + } else { + Status::ok() + } + } + None => StatusCode::Ignore.with_context("the next peer in the route is disconnected"), + } + } + + async fn respond_sync(&self, from_peer_id: PeerId) -> Status { + let content = init_sync(self.message); + let new_message = packed::HolePunchingMessage::new_builder() + .set(content) + .build() + .as_bytes(); + let proto_id = SupportProtocols::HolePunching.protocol_id(); + debug!( + "current peer is the target peer {}, respond the sync back", + from_peer_id + ); + if let Err(error) = self + .p2p_control + .send_message_to(self.peer, proto_id, new_message) + .await + { + StatusCode::ForwardError.with_context(error) + } else { + Status::ok() + } + } + + fn try_nat_traversal(&self, ttl: u64, remote_addrs: Vec) { + let tasks = remote_addrs + .into_iter() + .filter_map(|listen_addr| match find_type(&listen_addr) { + TransportType::Tcp => { + if listen_addr + .iter() + .any(|p| matches!(p, Protocol::Ip4(_) | Protocol::Ip6(_))) + { + Some(Box::pin(try_nat_traversal(self.bind_addr, listen_addr))) + } else { + None + } + } + TransportType::Memory + | TransportType::Onion + | TransportType::Ws + | TransportType::Wss + | TransportType::Tls => None, + }) + .collect::>(); + + if tasks.is_empty() { + return; + } + + debug!("start NAT traversal"); + + let control = self.p2p_control.clone(); + + runtime::spawn(async move { + runtime::delay_for(std::time::Duration::from_millis(ttl / 2)).await; + if let Ok(((stream, addr), _)) = select_ok(tasks).await { + debug!("NAT traversal success, addr: {:?}", addr); + let _ignore = control + .raw_session( + stream, + addr, + RawSessionInfo::outbound(TargetProtocol::Single( + SupportProtocols::Identify.protocol_id(), + )), + ) + .await; + } + }); + } +} diff --git a/network/src/protocols/hole_punching/component/connection_sync.rs b/network/src/protocols/hole_punching/component/connection_sync.rs new file mode 100644 index 0000000000..8234bc001a --- /dev/null +++ b/network/src/protocols/hole_punching/component/connection_sync.rs @@ -0,0 +1,201 @@ +use std::net::SocketAddr; + +use ckb_logger::debug; +use ckb_types::{packed, prelude::*}; +use futures::future::select_ok; +use p2p::{ + runtime, + service::{RawSessionInfo, ServiceAsyncControl}, +}; + +use crate::{ + PeerId, + protocols::{ + SupportProtocols, + hole_punching::{ + HolePunching, MAX_HOPS, + component::{forward_sync, try_nat_traversal}, + status::{Status, StatusCode}, + }, + }, +}; + +struct SyncContent { + route: Vec, + from: PeerId, + to: PeerId, +} + +impl TryFrom<&packed::ConnectionSyncReader<'_>> for SyncContent { + type Error = Status; + + fn try_from(value: &packed::ConnectionSyncReader<'_>) -> Result { + let route = value + .route() + .iter() + .map(|id| { + PeerId::from_bytes(id.raw_data().to_vec()).map_err(|_| { + StatusCode::InvalidRoute.with_context("the route peer id is invalid") + }) + }) + .collect::, _>>()?; + let from = PeerId::from_bytes(value.from().raw_data().to_vec()).map_err(|_| { + StatusCode::InvalidFromPeerId.with_context("the from peer id is invalid") + })?; + let to = PeerId::from_bytes(value.to().raw_data().to_vec()) + .map_err(|_| StatusCode::InvalidToPeerId.with_context("the to peer id is invalid"))?; + Ok(SyncContent { route, from, to }) + } +} + +pub(crate) struct ConnectionSyncProcess<'a> { + message: packed::ConnectionSyncReader<'a>, + protocol: &'a HolePunching, + p2p_control: &'a ServiceAsyncControl, + bind_addr: Option, + msg_item_id: u32, +} + +impl<'a> ConnectionSyncProcess<'a> { + pub(crate) fn new( + message: packed::ConnectionSyncReader<'a>, + protocol: &'a HolePunching, + p2p_control: &'a ServiceAsyncControl, + bind_addr: Option, + msg_item_id: u32, + ) -> Self { + Self { + message, + protocol, + p2p_control, + bind_addr, + msg_item_id, + } + } + + pub(crate) async fn execute(self) -> Status { + let content = match SyncContent::try_from(&self.message) { + Ok(content) => content, + Err(status) => return status, + }; + + if content.route.len() > MAX_HOPS as usize { + return StatusCode::InvalidRoute.with_context("the route length is too long"); + } + if self + .protocol + .forward_rate_limiter + .check_key(&(content.from.clone(), content.to.clone(), self.msg_item_id)) + .is_err() + { + debug!( + "from: {}, to {}, item_name: {}, rate limit is reached", + content.from, content.to, "ConnectionSync", + ); + return StatusCode::TooManyRequests.with_context("ConnectionSync"); + } + + match content.route.last() { + Some(next_peer_id) => self.forward_sync(next_peer_id).await, + None => { + let self_peer_id = self.protocol.network_state.local_peer_id(); + if self_peer_id != &content.to { + // forward the message to the `to` peer + self.forward_sync(&content.to).await + } else { + // Current node should be the `to` target. + let listens_info = self + .protocol + .pending_delivered + .get(&content.from) + .map(|info| info.0.clone()); + + match listens_info { + Some(listens) => { + let tasks = listens + .into_iter() + .map(|listen_addr| { + Box::pin(try_nat_traversal(self.bind_addr, listen_addr)) + }) + .collect::>(); + + if tasks.is_empty() { + return StatusCode::Ignore.with_context("no valid listen address"); + } + + debug!( + "current peer is the target peer {}, start NAT traversal", + content.to + ); + + match self + .protocol + .network_state + .config + .listen_addresses + .first() + .cloned() + { + Some(listen_addr) => { + let control: ServiceAsyncControl = self.p2p_control.clone(); + runtime::spawn(async move { + if let Ok(((stream, addr), _)) = select_ok(tasks).await { + debug!("NAT traversal success, addr: {:?}", addr); + let _ignore = control + .raw_session( + stream, + addr, + RawSessionInfo::inbound(listen_addr), + ) + .await; + } + }); + Status::ok() + } + None => { + StatusCode::Ignore.with_context("no listen address configured") + } + } + } + None => StatusCode::Ignore + .with_context("the from peer id is not in the pending list"), + } + } + } + } + } + + async fn forward_sync(&self, peer_id: &PeerId) -> Status { + let target_sid = self + .protocol + .network_state + .peer_registry + .read() + .get_key_by_peer_id(peer_id); + + match target_sid { + Some(next_peer) => { + let content = forward_sync(self.message); + let new_message = packed::HolePunchingMessage::new_builder() + .set(content) + .build() + .as_bytes(); + let proto_id = SupportProtocols::HolePunching.protocol_id(); + debug!( + "forward the sync to next peer {} (id: {})", + next_peer, peer_id + ); + if let Err(error) = self + .p2p_control + .send_message_to(next_peer, proto_id, new_message) + .await + { + StatusCode::ForwardError.with_context(error) + } else { + Status::ok() + } + } + None => StatusCode::Ignore.with_context("the next peer in the route is disconnected"), + } + } +} diff --git a/network/src/protocols/hole_punching/component/mod.rs b/network/src/protocols/hole_punching/component/mod.rs new file mode 100644 index 0000000000..b7e7e3102d --- /dev/null +++ b/network/src/protocols/hole_punching/component/mod.rs @@ -0,0 +1,460 @@ +mod connection_request; +mod connection_request_delivered; +mod connection_sync; + +pub(crate) use connection_request::ConnectionRequestProcess; +pub(crate) use connection_request_delivered::ConnectionRequestDeliveredProcess; +pub(crate) use connection_sync::ConnectionSyncProcess; + +use std::{ + net::{IpAddr, SocketAddr}, + time::Duration, +}; + +use ckb_logger::debug; +use ckb_systemtime::Instant; +use ckb_types::{packed, prelude::*}; +use p2p::{multiaddr::Multiaddr, runtime, utils::multiaddr_to_socketaddr}; +use tokio::net::{TcpSocket, TcpStream}; + +use crate::{PeerId, protocols::hole_punching::MAX_HOPS}; + +// Attempt to establish a TCP connection with NAT traversal +// +// Why is random jitter time added in NAT traversal? +// +// 1. Prevents synchronization problems +// - Without jitter, both parties might always send connection requests simultaneously +// - When requests collide rather than complement each other, connection establishment fails +// +// 2. Avoids NAT filtering +// - NAT devices often restrict or block perfectly regular connection attempts +// - Random intervals make connection attempts appear more natural, avoiding detection +// - Helps bypass NAT devices that might interpret regular patterns as scanning or attacks +// +// 3. Compensates for network uncertainties +// - Real networks have inherent variations in packet delivery times +// - System scheduling and network congestion create unpredictable delays +// - Jitter accounts for these natural timing variations +// +// 4. Increases connection success probability +// - Different system clocks and startup times can cause connection attempts to miss each other +// - Random jitter expands the time window when connection attempts might overlap +// - This "window expansion" strategy improves connection success rates +// +// 5. Breaks repetitive failure patterns +// - If a specific timing pattern causes connection failure +// - Using the same fixed interval would repeat the same failure +// - Randomness helps break out of these failure modes +pub(crate) async fn try_nat_traversal( + bind_addr: Option, + addr: Multiaddr, +) -> Result<(TcpStream, Multiaddr), std::io::Error> { + let net_addr = match multiaddr_to_socketaddr(&addr) { + Some(addr) => addr, + None => { + debug!("Failed to convert multiaddr to socketaddr"); + return Err(std::io::ErrorKind::InvalidInput.into()); + } + }; + + // Use a fixed interval but add a small amount of randomness + let base_retry_interval = Duration::from_millis(200); + + // total time + let timeout_duration = Duration::from_secs(30); + let start_time = Instant::now(); + let mut retry_count = 0u32; + while start_time.elapsed() < timeout_duration { + retry_count += 1; + + // Add a small amount of random jitter (±25ms) to avoid conflicts + // caused by continuous precise synchronization + let jitter = Duration::from_millis(rand::random::() % 50); + let actual_interval = if rand::random::() { + base_retry_interval + jitter + } else { + base_retry_interval.saturating_sub(jitter) + }; + + let socket = create_socket(bind_addr, net_addr)?; + + match runtime::timeout( + std::time::Duration::from_millis(200), + socket.connect(net_addr), + ) + .await + { + Ok(Ok(stream)) => { + // try get the stored error in the underlying socket + // if the socket is not connected, it will return an error + if let Err(err) = check_connection(&stream) { + debug!("Failed to connect to NAT(base check): {}", err); + } + return Ok((stream, addr)); + } + Err(err) => { + debug!("Failed to connect to NAT(timeout): {}", err); + } + Ok(Err(err)) => { + if err.kind() == std::io::ErrorKind::AddrNotAvailable { + return Err(err); + } + debug!( + "Failed to connect to NAT(other error): {}, {}", + err.kind(), + err + ); + } + } + runtime::delay_for(actual_interval).await; + } + + debug!("Failed to connect to NAT after {} retries", retry_count); + Err(std::io::ErrorKind::TimedOut.into()) +} + +fn create_socket( + bind_addr: Option, + target_addr: SocketAddr, +) -> Result { + let socket = match bind_addr { + Some(listen_addr) => match (listen_addr.ip(), target_addr.ip()) { + (IpAddr::V4(_), IpAddr::V4(_)) => { + let socket = TcpSocket::new_v4()?; + socket.set_reuseaddr(true)?; + #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))] + socket.set_reuseport(true)?; + socket.bind(listen_addr)?; + socket + } + (IpAddr::V6(_), IpAddr::V6(_)) => { + let socket = TcpSocket::new_v6()?; + socket.set_reuseaddr(true)?; + #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))] + socket.set_reuseport(true)?; + socket.bind(listen_addr)?; + socket + } + (IpAddr::V4(_), IpAddr::V6(_)) => TcpSocket::new_v6()?, + (IpAddr::V6(_), IpAddr::V4(_)) => TcpSocket::new_v4()?, + }, + None => match target_addr.ip() { + IpAddr::V4(_) => TcpSocket::new_v4()?, + IpAddr::V6(_) => TcpSocket::new_v6()?, + }, + }; + Ok(socket) +} + +fn check_connection(stream: &TcpStream) -> Result<(), std::io::Error> { + match stream.take_error() { + Ok(Some(err)) => Err(err), + Ok(None) => Ok(()), + Err(err) => Err(err), + } +} + +pub(crate) fn init_request( + from: &PeerId, + to: &PeerId, + listen_addrs: packed::AddressVec, +) -> packed::ConnectionRequest { + packed::ConnectionRequest::new_builder() + .from(from.as_bytes().pack()) + .to(to.as_bytes().pack()) + .max_hops(MAX_HOPS.into()) + .listen_addrs(listen_addrs) + .build() +} + +pub(crate) fn forward_request( + request: packed::ConnectionRequestReader<'_>, + current_id: &PeerId, +) -> packed::ConnectionRequest { + let max_hops: u8 = request.max_hops().into(); + let message = request.to_entity(); + let new_route = message + .route() + .as_builder() + .push(current_id.as_bytes().pack()) + .build(); + message + .as_builder() + .max_hops((max_hops.saturating_sub(1)).into()) + .route(new_route) + .build() +} + +pub(crate) fn init_delivered( + request: packed::ConnectionRequestReader<'_>, + listen_addrs: packed::AddressVec, +) -> packed::ConnectionRequestDelivered { + let route = request.route(); + let message = request.to_entity(); + let new_route = packed::BytesVec::new_builder() + .extend( + message + .route() + .into_iter() + .take(route.len().saturating_sub(1)), + ) + .build(); + let sync_route = packed::BytesVec::new_builder() + .extend( + message + .route() + .into_iter() + .collect::>() + .into_iter() + .rev() + .collect::>(), + ) + .build(); + packed::ConnectionRequestDelivered::new_builder() + .from(message.from()) + .to(message.to()) + .route(new_route) + .sync_route(sync_route) + .listen_addrs(listen_addrs) + .build() +} + +pub(crate) fn forward_delivered( + delivered: packed::ConnectionRequestDeliveredReader<'_>, +) -> packed::ConnectionRequestDelivered { + let route = delivered.route(); + let message = delivered.to_entity(); + let new_route = if route.is_empty() { + packed::BytesVec::new_builder().build() + } else { + packed::BytesVec::new_builder() + .extend( + message + .route() + .into_iter() + .take(route.len().saturating_sub(1)), + ) + .build() + }; + message.as_builder().route(new_route).build() +} + +pub(crate) fn init_sync( + delivered: packed::ConnectionRequestDeliveredReader<'_>, +) -> packed::ConnectionSync { + let sync_route = delivered.sync_route(); + let message = delivered.to_entity(); + let new_route = packed::BytesVec::new_builder() + .extend( + message + .sync_route() + .into_iter() + .take(sync_route.len().saturating_sub(1)), + ) + .build(); + packed::ConnectionSync::new_builder() + .from(message.from()) + .to(message.to()) + .route(new_route) + .build() +} + +pub(crate) fn forward_sync(sync: packed::ConnectionSyncReader<'_>) -> packed::ConnectionSync { + let route = sync.route(); + let message = sync.to_entity(); + let new_route = if route.is_empty() { + packed::BytesVec::new_builder().build() + } else { + packed::BytesVec::new_builder() + .extend( + message + .route() + .into_iter() + .take(route.len().saturating_sub(1)), + ) + .build() + }; + message.as_builder().route(new_route).build() +} + +#[cfg(test)] +mod test { + use super::*; + use crate::protocols::hole_punching::MAX_HOPS; + use ckb_types::packed; + + #[test] + fn test_route() { + // Simulate the entire message flow from from to to, passing through forward_a, forward_b. + let from = PeerId::random(); + let to = PeerId::random(); + let forward_a = PeerId::random(); + let forward_b = PeerId::random(); + + // empty listen addrs + let listen_addrs = packed::AddressVec::new_builder().build(); + + let init_request = init_request(&from, &to, listen_addrs.clone()); + + assert_eq!(init_request.from(), from.as_bytes().pack()); + assert_eq!(init_request.to(), to.as_bytes().pack()); + assert_eq!(init_request.max_hops(), MAX_HOPS.into()); + // from is not in the route + assert_eq!( + init_request.route().as_bytes(), + packed::BytesVec::new_builder().build().as_bytes() + ); + + // in forward_a + let forward_request_a = forward_request(init_request.as_reader(), &forward_a); + assert_eq!(forward_request_a.from(), from.as_bytes().pack()); + assert_eq!(forward_request_a.to(), to.as_bytes().pack()); + assert_eq!(forward_request_a.max_hops(), (MAX_HOPS - 1).into()); + // forward_a is in the route + assert_eq!( + forward_request_a.route().as_bytes(), + packed::BytesVec::new_builder() + .push(forward_a.as_bytes().pack()) + .build() + .as_bytes() + ); + + // in forward_b + let forward_request_b = forward_request(forward_request_a.as_reader(), &forward_b); + assert_eq!(forward_request_b.from(), from.as_bytes().pack()); + assert_eq!(forward_request_b.to(), to.as_bytes().pack()); + assert_eq!(forward_request_b.max_hops(), (MAX_HOPS - 2).into()); + // forward_b is in the route + assert_eq!( + forward_request_b.route().as_bytes(), + packed::BytesVec::new_builder() + .push(forward_a.as_bytes().pack()) + .push(forward_b.as_bytes().pack()) + .build() + .as_bytes() + ); + + // in to + let init_delivered = init_delivered(forward_request_b.as_reader(), listen_addrs); + assert_eq!(init_delivered.from(), from.as_bytes().pack()); + assert_eq!(init_delivered.to(), to.as_bytes().pack()); + // forward_b is not in the route + assert_eq!( + init_delivered.route().as_bytes(), + packed::BytesVec::new_builder() + .push(forward_a.as_bytes().pack()) + .build() + .as_bytes() + ); + // sync route is forward_b <- forward_a + assert_eq!( + init_delivered.sync_route().as_bytes(), + packed::BytesVec::new_builder() + .push(forward_b.as_bytes().pack()) + .push(forward_a.as_bytes().pack()) + .build() + .as_bytes() + ); + + // now we can start to send back the delivered message to the from + + // in forward_b + assert_eq!( + init_delivered + .as_reader() + .route() + .iter() + .last() + .unwrap() + .as_slice(), + forward_a.as_bytes().pack().as_slice() + ); + let forward_delivered_b = forward_delivered(init_delivered.as_reader()); + assert_eq!(forward_delivered_b.from(), from.as_bytes().pack()); + assert_eq!(forward_delivered_b.to(), to.as_bytes().pack()); + assert_eq!( + forward_delivered_b.route().as_bytes(), + packed::BytesVec::new_builder().build().as_bytes() + ); + assert_eq!( + forward_delivered_b.sync_route().as_bytes(), + init_delivered.sync_route().as_bytes() + ); + + // in forward_a + assert!( + forward_delivered_b + .as_reader() + .route() + .iter() + .last() + .is_none() + ); + let forward_delivered_a = forward_delivered(forward_delivered_b.as_reader()); + assert_eq!(forward_delivered_a.from(), from.as_bytes().pack()); + assert_eq!(forward_delivered_a.to(), to.as_bytes().pack()); + assert_eq!( + forward_delivered_a.route().as_bytes(), + packed::BytesVec::new_builder().build().as_bytes() + ); + assert_eq!( + forward_delivered_a.sync_route().as_bytes(), + init_delivered.sync_route().as_bytes() + ); + + // in from + assert!( + forward_delivered_a + .as_reader() + .route() + .iter() + .last() + .is_none() + ); + let init_sync = init_sync(forward_delivered_a.as_reader()); + assert_eq!(init_sync.from(), from.as_bytes().pack()); + assert_eq!(init_sync.to(), to.as_bytes().pack()); + assert_eq!( + init_sync.route().as_bytes(), + packed::BytesVec::new_builder() + .push(forward_b.as_bytes().pack()) + .build() + .as_bytes() + ); + + // now we can start to send back the sync message to the to + + // in forward_a + assert_eq!( + init_sync + .as_reader() + .route() + .iter() + .last() + .unwrap() + .as_slice(), + forward_b.as_bytes().pack().as_slice() + ); + let forward_sync_a = forward_sync(init_sync.as_reader()); + assert_eq!(forward_sync_a.from(), from.as_bytes().pack()); + assert_eq!(forward_sync_a.to(), to.as_bytes().pack()); + assert_eq!( + forward_sync_a.route().as_bytes(), + packed::BytesVec::new_builder().build().as_bytes() + ); + + // in forward_b + assert!(forward_sync_a.as_reader().route().iter().last().is_none()); + let forward_sync_b = forward_sync(forward_sync_a.as_reader()); + assert_eq!(forward_sync_b.from(), from.as_bytes().pack()); + assert_eq!(forward_sync_b.to(), to.as_bytes().pack()); + assert_eq!( + forward_sync_b.route().as_bytes(), + packed::BytesVec::new_builder().build().as_bytes() + ); + + // in to + assert!(forward_sync_b.as_reader().route().iter().last().is_none()); + } +} diff --git a/network/src/protocols/hole_punching/mod.rs b/network/src/protocols/hole_punching/mod.rs new file mode 100644 index 0000000000..0c8cb9804b --- /dev/null +++ b/network/src/protocols/hole_punching/mod.rs @@ -0,0 +1,286 @@ +use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; + +use ckb_logger::{debug, error, trace, warn}; +use ckb_systemtime::unix_time_as_millis; +use ckb_types::{packed, prelude::*}; +use p2p::{ + async_trait, bytes, + context::{ProtocolContext, ProtocolContextMutRef}, + multiaddr::Multiaddr, + service::TargetSession, + traits::ServiceProtocol, + utils::extract_peer_id, +}; + +use crate::{ + PeerId, PeerIndex, SupportProtocols, network::NetworkState, + protocols::hole_punching::status::BAD_MESSAGE_BAN_TIME, +}; + +mod component; +pub(crate) mod status; + +pub(crate) const MAX_HOPS: u8 = 6; +pub(crate) const HOLE_PUNCHING_INTERVAL: u64 = 2 * 60 * 1000; // 2 minutes +const CHECK_INTERVAL: Duration = Duration::from_secs(5 * 60); +const CHECK_TOKEN: u64 = 0; +const ADDRS_COUNT_LIMIT: usize = 24; +const TIMEOUT: u64 = 5 * 60 * 1000; // 5 minutes + +type PendingDeliveredInfo = (Vec, u64); +type RateLimiter = governor::RateLimiter< + T, + governor::state::keyed::HashMapStateStore, + governor::clock::DefaultClock, +>; + +/// Hole Punching Protocol +pub(crate) struct HolePunching { + network_state: Arc, + bind_addr: Option, + // Request timestamp recorded + inflight_requests: HashMap, + // Delivered timestamp recorded + pending_delivered: HashMap, + rate_limiter: RateLimiter<(PeerIndex, u32)>, + forward_rate_limiter: RateLimiter<(PeerId, PeerId, u32)>, +} + +#[async_trait] +impl ServiceProtocol for HolePunching { + async fn init(&mut self, context: &mut ProtocolContext) { + context + .set_service_notify(context.proto_id, CHECK_INTERVAL, CHECK_TOKEN) + .await + .expect("set discovery notify fail") + } + + async fn connected(&mut self, context: ProtocolContextMutRef<'_>, version: &str) { + self.network_state.with_peer_registry_mut(|reg| { + reg.get_peer_mut(context.session.id).map(|peer| { + peer.protocols.insert(context.proto_id, version.to_owned()); + }) + }); + } + + async fn disconnected(&mut self, context: ProtocolContextMutRef<'_>) { + self.rate_limiter.retain_recent(); + self.forward_rate_limiter.retain_recent(); + debug!("HolePunching.disconnected session={}", context.session.id); + } + + async fn received(&mut self, context: ProtocolContextMutRef<'_>, data: bytes::Bytes) { + let session_id = context.session.id; + trace!("HolePunching.received session={}", session_id); + + let msg = match packed::HolePunchingMessageReader::from_slice(&data) { + Ok(msg) => msg.to_enum(), + _ => { + warn!( + "HolePunching.received a malformed message from {}", + session_id + ); + self.network_state.ban_session( + &context.control().clone().into(), + session_id, + BAD_MESSAGE_BAN_TIME, + String::from("send us a malformed message"), + ); + return; + } + }; + + let item_name = msg.item_name(); + + if self + .rate_limiter + .check_key(&(session_id, msg.item_id())) + .is_err() + { + debug!( + "process {} from {}; result is {}", + item_name, + session_id, + status::StatusCode::TooManyRequests.with_context(msg.item_name()) + ); + return; + } + + let status = match msg { + packed::HolePunchingMessageUnionReader::ConnectionRequest(reader) => { + component::ConnectionRequestProcess::new( + reader, + self, + context.session.id, + context.control(), + msg.item_id(), + ) + .execute() + .await + } + packed::HolePunchingMessageUnionReader::ConnectionRequestDelivered(reader) => { + component::ConnectionRequestDeliveredProcess::new( + reader, + self, + context.control(), + context.session.id, + self.bind_addr, + msg.item_id(), + ) + .execute() + .await + } + packed::HolePunchingMessageUnionReader::ConnectionSync(reader) => { + component::ConnectionSyncProcess::new( + reader, + self, + context.control(), + self.bind_addr, + msg.item_id(), + ) + .execute() + .await + } + }; + if let Some(ban_time) = status.should_ban() { + error!( + "process {} from {}; ban {:?} since result is {}", + item_name, session_id, ban_time, status + ); + self.network_state.ban_session( + &context.control().clone().into(), + session_id, + ban_time, + status.to_string(), + ); + } else if status.should_warn() { + warn!( + "process {} from {}; result is {}", + item_name, session_id, status + ); + } else if !status.is_ok() { + debug!( + "process {} from {}; result is {}", + item_name, session_id, status + ); + } + } + + async fn notify(&mut self, context: &mut ProtocolContext, _token: u64) { + let status = self.network_state.connection_status(); + + let now = unix_time_as_millis(); + self.pending_delivered + .retain(|_, (_, t)| (now - *t) < TIMEOUT); + self.inflight_requests.retain(|_, t| (now - *t) < TIMEOUT); + + if status.non_whitelist_outbound < status.max_outbound && status.total > 0 { + let target = &self.network_state.required_flags; + let addrs = self.network_state.with_peer_store_mut(|p| { + p.fetch_nat_addrs( + (status.max_outbound - status.non_whitelist_outbound) as usize, + *target, + ) + }); + + let from_peer_id = self.network_state.local_peer_id(); + let listen_addrs = { + let public_addr = self.network_state.public_addrs(ADDRS_COUNT_LIMIT); + if public_addr.len() < ADDRS_COUNT_LIMIT { + let observed_addrs = self + .network_state + .observed_addrs(ADDRS_COUNT_LIMIT - public_addr.len()); + let iter = public_addr + .iter() + .chain(observed_addrs.iter()) + .map(Multiaddr::to_vec) + .map(|v| packed::Address::new_builder().bytes(v.pack()).build()); + packed::AddressVec::new_builder().extend(iter).build() + } else { + let iter = public_addr + .iter() + .map(Multiaddr::to_vec) + .map(|v| packed::Address::new_builder().bytes(v.pack()).build()); + packed::AddressVec::new_builder().extend(iter).build() + } + }; + + let mut inflight = Vec::new(); + for i in addrs { + if let Some(to_peer_id) = extract_peer_id(&i.addr) { + let conn_req = { + let content = component::init_request( + from_peer_id, + &to_peer_id, + listen_addrs.clone(), + ); + packed::HolePunchingMessage::new_builder() + .set(content) + .build() + }; + let proto_id = SupportProtocols::HolePunching.protocol_id(); + + // Broadcast to a number of nodes equal to the square root of the total connection count using gossip. + let mut total = status.total.isqrt(); + let _ignore = context + .filter_broadcast( + TargetSession::Filter(Box::new(move |_| { + total = total.saturating_sub(1); + total != 0 + })), + proto_id, + conn_req.as_bytes(), + ) + .await; + inflight.push(to_peer_id); + } + } + + let now = unix_time_as_millis(); + for peer_id in inflight { + self.inflight_requests.insert(peer_id, now); + } + } + } +} + +impl HolePunching { + pub(crate) fn new(network_state: Arc) -> Self { + // setup a rate limiter keyed by peer and message type that lets through 30 requests per second + // current max rps is 10 (CHECK_TOKEN), 30 is a flexible hard cap with buffer + let quota = governor::Quota::per_second(std::num::NonZeroU32::new(30).unwrap()); + let rate_limiter = RateLimiter::hashmap(quota); + + // In the request forwarding process, the same group of from/to should not be received by the same + // node more than 1 times within one second. + let quota = governor::Quota::per_second(std::num::NonZeroU32::new(1).unwrap()); + let forward_rate_limiter = RateLimiter::hashmap(quota); + + Self { + #[cfg(not(target_os = "linux"))] + bind_addr: None, + #[cfg(target_os = "linux")] + bind_addr: { + let mut bind_addr = None; + if network_state.config.reuse_port_on_linux { + for multi_addr in &network_state.config.listen_addresses { + if let crate::network::TransportType::Tcp = + crate::network::find_type(multi_addr) + { + if let Some(addr) = p2p::utils::multiaddr_to_socketaddr(multi_addr) { + bind_addr = Some(addr); + break; + } + } + } + } + bind_addr + }, + network_state, + pending_delivered: HashMap::new(), + inflight_requests: HashMap::new(), + rate_limiter, + forward_rate_limiter, + } + } +} diff --git a/network/src/protocols/hole_punching/status.rs b/network/src/protocols/hole_punching/status.rs new file mode 100644 index 0000000000..77db3fe13f --- /dev/null +++ b/network/src/protocols/hole_punching/status.rs @@ -0,0 +1,118 @@ +use std::{fmt, time::Duration}; + +pub(crate) const BAD_MESSAGE_BAN_TIME: Duration = Duration::from_secs(60 * 60 * 24); + +/// StatusCodes indicate whether a specific operation has been successfully completed. +/// +/// The StatusCode element is a 3-digit integer. +/// +/// The first digest of the StatusCode defines the class of result: +/// - 1xx: Informational response – the request was received, continuing process. +/// - 2xx: Success - The action requested by the client was received, understood, and accepted. +/// - 4xx: Client errors - The error seems to have been caused by the client. +/// - 5xx: Server errors - The server failed to fulfil a request. +#[repr(u16)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum StatusCode { + /// OK + OK = 200, + + /// Generic rate limit error + TooManyRequests = 110, + + /// The max TTL is larger than the limit. + InvalidMaxTTL = 410, + /// The peer id of `from` peer is invalid. + InvalidFromPeerId = 411, + /// The peer id of `to` peer is invalid. + InvalidToPeerId = 412, + /// At least one peer id in route is invalid. + InvalidRoute = 413, + /// The listen address len is invalid. + InvalidListenAddrLen = 414, + + /// Ignore messages. + Ignore = 501, + /// Failed to broadcast a message. + BroadcastError = 502, + /// Failed to broadcast a message. + ForwardError = 503, + /// The max hops is reached. + ReachedMaxHops = 504, +} + +/// Process message status. +#[derive(Clone, Debug, Eq)] +pub struct Status { + code: StatusCode, + context: Option, +} + +impl PartialEq for Status { + fn eq(&self, other: &Self) -> bool { + self.code == other.code + } +} + +impl fmt::Display for Status { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.context { + Some(ref context) => write!(f, "{:?}({}): {}", self.code, self.code as u16, context), + None => write!(f, "{:?}({})", self.code, self.code as u16), + } + } +} + +impl From for Status { + fn from(code: StatusCode) -> Self { + Self::new::<&str>(code, None) + } +} + +impl StatusCode { + /// Convert a status code into a status which has a context. + pub fn with_context(self, context: S) -> Status { + Status::new(self, Some(context)) + } +} + +impl Status { + /// Creates a new status. + pub fn new(code: StatusCode, context: Option) -> Self { + Self { + code, + context: context.map(|c| c.to_string()), + } + } + + /// Returns a `OK` status. + pub fn ok() -> Self { + Self::new::<&str>(StatusCode::OK, None) + } + + /// Whether the code is `OK` or not. + pub fn is_ok(&self) -> bool { + self.code == StatusCode::OK + } + + /// Whether the session should be banned. + pub fn should_ban(&self) -> Option { + let code = self.code() as u16; + if (400..500).contains(&code) { + Some(BAD_MESSAGE_BAN_TIME) + } else { + None + } + } + + /// Whether a warning log should be output. + pub fn should_warn(&self) -> bool { + let code = self.code() as u16; + (500..600).contains(&code) + } + + /// Returns the status code. + pub fn code(&self) -> StatusCode { + self.code + } +} diff --git a/network/src/protocols/identify/mod.rs b/network/src/protocols/identify/mod.rs index bc939b8340..88d81d4e65 100644 --- a/network/src/protocols/identify/mod.rs +++ b/network/src/protocols/identify/mod.rs @@ -9,7 +9,7 @@ use p2p::{ bytes::Bytes, context::{ProtocolContext, ProtocolContextMutRef, SessionContext}, multiaddr::{Multiaddr, Protocol}, - service::{SessionType, TargetProtocol}, + service::TargetProtocol, traits::ServiceProtocol, utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr}, }; @@ -77,7 +77,7 @@ pub trait Callback: Clone + Send { /// Add remote peer's listen addresses fn add_remote_listen_addrs(&mut self, session: &SessionContext, addrs: Vec); /// Add our address observed by remote peer - fn add_observed_addr(&mut self, addr: Multiaddr, ty: SessionType) -> MisbehaveResult; + fn add_observed_addr(&mut self, addr: Multiaddr, session_id: SessionId) -> MisbehaveResult; /// Report misbehavior fn misbehave(&mut self, session: &SessionContext, kind: Misbehavior) -> MisbehaveResult; } @@ -164,16 +164,8 @@ impl IdentifyProtocol { .remote_infos .get_mut(&session.id) .expect("RemoteInfo must exists"); - let global_ip_only = self.global_ip_only; - if multiaddr_to_socketaddr(&observed) - .map(|socket_addr| socket_addr.ip()) - .filter(|ip_addr| !global_ip_only || is_reachable(*ip_addr)) - .is_none() - { - return MisbehaveResult::Continue; - } - - self.callback.add_observed_addr(observed, info.session.ty) + self.callback.add_observed_addr(observed, info.session.id); + MisbehaveResult::Continue } } @@ -500,7 +492,17 @@ impl Callback for IdentifyCallback { /// Get local listen addresses fn local_listen_addrs(&mut self) -> Vec { - self.listen_addrs() + let mut listens = self.listen_addrs(); + + if listens.len() < MAX_RETURN_LISTEN_ADDRS { + let observe_addrs = self + .network_state + .observed_addrs(MAX_RETURN_LISTEN_ADDRS - listens.len()); + listens.extend(observe_addrs); + listens + } else { + listens + } } fn add_remote_listen_addrs(&mut self, session: &SessionContext, addrs: Vec) { @@ -528,42 +530,14 @@ impl Callback for IdentifyCallback { }) } - fn add_observed_addr(&mut self, mut addr: Multiaddr, ty: SessionType) -> MisbehaveResult { - if ty.is_inbound() { - // The address already been discovered by other peer - return MisbehaveResult::Continue; - } - - // observed addr is not a reachable ip - if !multiaddr_to_socketaddr(&addr) - .map(|socket_addr| is_reachable(socket_addr.ip())) - .unwrap_or(false) - { - return MisbehaveResult::Continue; - } - + fn add_observed_addr(&mut self, mut addr: Multiaddr, session_id: SessionId) -> MisbehaveResult { if extract_peer_id(&addr).is_none() { addr.push(Protocol::P2P(Cow::Borrowed( self.network_state.local_peer_id().as_bytes(), ))) } - let source_addr = addr.clone(); - let observed_addrs_iter = self - .listen_addrs() - .into_iter() - .filter_map(|listen_addr| multiaddr_to_socketaddr(&listen_addr)) - .map(|socket_addr| { - addr.iter() - .map(|proto| match proto { - Protocol::Tcp(_) => Protocol::Tcp(socket_addr.port()), - value => value, - }) - .collect::() - }) - .chain(::std::iter::once(source_addr)); - - self.network_state.add_observed_addrs(observed_addrs_iter); + self.network_state.add_observed_addr(session_id, addr); // NOTE: for future usage MisbehaveResult::Continue } diff --git a/network/src/protocols/mod.rs b/network/src/protocols/mod.rs index 1ed98d633a..aa18f89583 100644 --- a/network/src/protocols/mod.rs +++ b/network/src/protocols/mod.rs @@ -5,6 +5,9 @@ pub(crate) mod identify; pub(crate) mod ping; pub(crate) mod support_protocols; +#[cfg(not(target_family = "wasm"))] +pub(crate) mod hole_punching; + #[cfg(test)] mod tests; diff --git a/network/src/protocols/support_protocols.rs b/network/src/protocols/support_protocols.rs index 8c56b3ab88..6515f436b6 100644 --- a/network/src/protocols/support_protocols.rs +++ b/network/src/protocols/support_protocols.rs @@ -56,6 +56,8 @@ pub enum SupportProtocols { LightClient, /// Filter: A protocol used for client side block data filtering. Filter, + /// HolePunching: A protocol used to connect peers behind firewalls or NAT routers. + HolePunching, } impl SupportProtocols { @@ -74,6 +76,7 @@ impl SupportProtocols { SupportProtocols::Alert => 110, SupportProtocols::LightClient => 120, SupportProtocols::Filter => 121, + SupportProtocols::HolePunching => 130, } .into() } @@ -93,6 +96,7 @@ impl SupportProtocols { SupportProtocols::Alert => "/ckb/alt", SupportProtocols::LightClient => "/ckb/lightclient", SupportProtocols::Filter => "/ckb/filter", + SupportProtocols::HolePunching => "/ckb/holepunching", } .to_owned() } @@ -117,6 +121,7 @@ impl SupportProtocols { SupportProtocols::RelayV3 => vec!["2".to_owned(), LASTEST_VERSION.to_owned()], SupportProtocols::LightClient => vec!["2".to_owned(), LASTEST_VERSION.to_owned()], SupportProtocols::Filter => vec!["2".to_owned(), LASTEST_VERSION.to_owned()], + SupportProtocols::HolePunching => vec!["2".to_owned(), LASTEST_VERSION.to_owned()], } } @@ -134,6 +139,7 @@ impl SupportProtocols { SupportProtocols::Alert => 128 * 1024, // 128 KB SupportProtocols::LightClient => 2 * 1024 * 1024, // 2 MB SupportProtocols::Filter => 2 * 1024 * 1024, // 2 MB + SupportProtocols::HolePunching => 512 * 1024, // 512 KB } } diff --git a/network/src/services/outbound_peer.rs b/network/src/services/outbound_peer.rs index ee4f6b023c..5fe8db0bee 100644 --- a/network/src/services/outbound_peer.rs +++ b/network/src/services/outbound_peer.rs @@ -196,11 +196,6 @@ impl OutboundPeerService { } } - fn try_dial_observed(&self) { - self.network_state - .try_dial_observed_addrs(&self.p2p_control); - } - fn update_outbound_connected_ms(&mut self) { if self.update_outbound_connected_count > 10 { let connected_outbounds: Vec = @@ -256,8 +251,6 @@ impl Future for OutboundPeerService { self.dial_feeler(); // keep outbound peer is enough self.try_dial_peers(); - // try dial observed addrs - self.try_dial_observed(); // Keep connected nodes up to date in the peer store self.update_outbound_connected_ms(); } diff --git a/resource/ckb.toml b/resource/ckb.toml index 95240520df..43a046023c 100644 --- a/resource/ckb.toml +++ b/resource/ckb.toml @@ -105,7 +105,7 @@ discovery_local_address = false # {{ bootnode_mode = false # Supported protocols list, only "Sync" and "Identify" are mandatory, others are optional -support_protocols = ["Ping", "Discovery", "Identify", "Feeler", "DisconnectMessage", "Sync", "Relay", "Time", "Alert", "LightClient", "Filter"] +support_protocols = ["Ping", "Discovery", "Identify", "Feeler", "DisconnectMessage", "Sync", "Relay", "Time", "Alert", "LightClient", "Filter", "HolePunching"] # [network.sync.header_map] # memory_limit = "256MB" diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index 9d8acd3009..f4410a75f9 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -44,7 +44,6 @@ use ckb_types::{ packed::{self, Byte32, ProposalShortId}, prelude::*, }; -use ckb_util::Mutex; use itertools::Itertools; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -60,7 +59,7 @@ pub const MAX_RELAY_TXS_BYTES_PER_BATCH: usize = 1024 * 1024; type RateLimiter = governor::RateLimiter< T, - governor::state::keyed::DefaultKeyedStateStore, + governor::state::keyed::HashMapStateStore, governor::clock::DefaultClock, >; @@ -76,7 +75,7 @@ pub enum ReconstructionResult { pub struct Relayer { chain: ChainController, pub(crate) shared: Arc, - rate_limiter: Arc>>, + rate_limiter: RateLimiter<(PeerIndex, u32)>, v3: bool, } @@ -88,7 +87,7 @@ impl Relayer { // setup a rate limiter keyed by peer and message type that lets through 30 requests per second // current max rps is 10 (ASK_FOR_TXS_TOKEN / TX_PROPOSAL_TOKEN), 30 is a flexible hard cap with buffer let quota = governor::Quota::per_second(std::num::NonZeroU32::new(30).unwrap()); - let rate_limiter = Arc::new(Mutex::new(RateLimiter::keyed(quota))); + let rate_limiter = RateLimiter::hashmap(quota); Relayer { chain, @@ -122,7 +121,6 @@ impl Relayer { if should_check_rate && self .rate_limiter - .lock() .check_key(&(peer, message.item_id())) .is_err() { @@ -961,7 +959,7 @@ impl CKBProtocolHandler for Relayer { peer_index ); // Retains all keys in the rate limiter that were used recently enough. - self.rate_limiter.lock().retain_recent(); + self.rate_limiter.retain_recent(); } async fn notify(&mut self, nc: Arc, token: u64) { diff --git a/util/app-config/src/configs/network.rs b/util/app-config/src/configs/network.rs index b29de05146..f9c2792e34 100644 --- a/util/app-config/src/configs/network.rs +++ b/util/app-config/src/configs/network.rs @@ -155,6 +155,7 @@ pub enum SupportProtocol { Alert, LightClient, Filter, + HolePunching, } #[allow(missing_docs)] @@ -171,6 +172,7 @@ pub fn default_support_all_protocols() -> Vec { SupportProtocol::Alert, SupportProtocol::LightClient, SupportProtocol::Filter, + SupportProtocol::HolePunching, ] } diff --git a/util/gen-types/schemas/protocols.mol b/util/gen-types/schemas/protocols.mol index 8515599237..4e1897081e 100644 --- a/util/gen-types/schemas/protocols.mol +++ b/util/gen-types/schemas/protocols.mol @@ -81,3 +81,47 @@ table IdentifyMessage { // Custom message to indicate self ability, such as list protocols supported identify: Bytes, } + +union HolePunchingMessage { + // A node is to attempt to connect to another node through a side channel. + ConnectionRequest, + // If a connection request is delivered, reply this response to the sender. + ConnectionRequestDelivered, + // If sender received delivered message, reply this response to remote. + ConnectionSync, +} + +table ConnectionRequest { + // Peer Id. + from: Bytes, + // Peer Id. + to: Bytes, + // Limit the max count of hops: the max count of peers in the message delivery route. + max_hops: byte, + // The message delivery route (type: `Vec`). + route: BytesVec, + // These are the addresses on which the "from" peer is listening as multi-addresses. + listen_addrs: AddressVec, +} + +table ConnectionRequestDelivered { + // Peer Id. + from: Bytes, + // Peer Id. + to: Bytes, + // The message delivery route (type: `Vec`). + route: BytesVec, + // The message sync route (type: `Vec`). + sync_route: BytesVec, + // These are the addresses on which the "to" peer is listening as multi-addresses. + listen_addrs: AddressVec, +} + +table ConnectionSync { + // Peer Id. + from: Bytes, + // Peer Id. + to: Bytes, + // The message sync route (type: `Vec`). + route: BytesVec, +} diff --git a/util/gen-types/src/generated/protocols.rs b/util/gen-types/src/generated/protocols.rs index 030d4e6b48..2542ccc4ec 100644 --- a/util/gen-types/src/generated/protocols.rs +++ b/util/gen-types/src/generated/protocols.rs @@ -5028,3 +5028,1314 @@ impl molecule::prelude::Builder for IdentifyMessageBuilder { IdentifyMessage::new_unchecked(inner.into()) } } +#[derive(Clone)] +pub struct HolePunchingMessage(molecule::bytes::Bytes); +impl ::core::fmt::LowerHex for HolePunchingMessage { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + use molecule::hex_string; + if f.alternate() { + write!(f, "0x")?; + } + write!(f, "{}", hex_string(self.as_slice())) + } +} +impl ::core::fmt::Debug for HolePunchingMessage { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{}({:#x})", Self::NAME, self) + } +} +impl ::core::fmt::Display for HolePunchingMessage { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{}(", Self::NAME)?; + self.to_enum().display_inner(f)?; + write!(f, ")") + } +} +impl ::core::default::Default for HolePunchingMessage { + fn default() -> Self { + let v = molecule::bytes::Bytes::from_static(&Self::DEFAULT_VALUE); + HolePunchingMessage::new_unchecked(v) + } +} +impl HolePunchingMessage { + const DEFAULT_VALUE: [u8; 45] = [ + 0, 0, 0, 0, 41, 0, 0, 0, 24, 0, 0, 0, 28, 0, 0, 0, 32, 0, 0, 0, 33, 0, 0, 0, 37, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, + ]; + pub const ITEMS_COUNT: usize = 3; + pub fn item_id(&self) -> molecule::Number { + molecule::unpack_number(self.as_slice()) + } + pub fn to_enum(&self) -> HolePunchingMessageUnion { + let inner = self.0.slice(molecule::NUMBER_SIZE..); + match self.item_id() { + 0 => ConnectionRequest::new_unchecked(inner).into(), + 1 => ConnectionRequestDelivered::new_unchecked(inner).into(), + 2 => ConnectionSync::new_unchecked(inner).into(), + _ => panic!("{}: invalid data", Self::NAME), + } + } + pub fn as_reader<'r>(&'r self) -> HolePunchingMessageReader<'r> { + HolePunchingMessageReader::new_unchecked(self.as_slice()) + } +} +impl molecule::prelude::Entity for HolePunchingMessage { + type Builder = HolePunchingMessageBuilder; + const NAME: &'static str = "HolePunchingMessage"; + fn new_unchecked(data: molecule::bytes::Bytes) -> Self { + HolePunchingMessage(data) + } + fn as_bytes(&self) -> molecule::bytes::Bytes { + self.0.clone() + } + fn as_slice(&self) -> &[u8] { + &self.0[..] + } + fn from_slice(slice: &[u8]) -> molecule::error::VerificationResult { + HolePunchingMessageReader::from_slice(slice).map(|reader| reader.to_entity()) + } + fn from_compatible_slice(slice: &[u8]) -> molecule::error::VerificationResult { + HolePunchingMessageReader::from_compatible_slice(slice).map(|reader| reader.to_entity()) + } + fn new_builder() -> Self::Builder { + ::core::default::Default::default() + } + fn as_builder(self) -> Self::Builder { + Self::new_builder().set(self.to_enum()) + } +} +#[derive(Clone, Copy)] +pub struct HolePunchingMessageReader<'r>(&'r [u8]); +impl<'r> ::core::fmt::LowerHex for HolePunchingMessageReader<'r> { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + use molecule::hex_string; + if f.alternate() { + write!(f, "0x")?; + } + write!(f, "{}", hex_string(self.as_slice())) + } +} +impl<'r> ::core::fmt::Debug for HolePunchingMessageReader<'r> { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{}({:#x})", Self::NAME, self) + } +} +impl<'r> ::core::fmt::Display for HolePunchingMessageReader<'r> { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{}(", Self::NAME)?; + self.to_enum().display_inner(f)?; + write!(f, ")") + } +} +impl<'r> HolePunchingMessageReader<'r> { + pub const ITEMS_COUNT: usize = 3; + pub fn item_id(&self) -> molecule::Number { + molecule::unpack_number(self.as_slice()) + } + pub fn to_enum(&self) -> HolePunchingMessageUnionReader<'r> { + let inner = &self.as_slice()[molecule::NUMBER_SIZE..]; + match self.item_id() { + 0 => ConnectionRequestReader::new_unchecked(inner).into(), + 1 => ConnectionRequestDeliveredReader::new_unchecked(inner).into(), + 2 => ConnectionSyncReader::new_unchecked(inner).into(), + _ => panic!("{}: invalid data", Self::NAME), + } + } +} +impl<'r> molecule::prelude::Reader<'r> for HolePunchingMessageReader<'r> { + type Entity = HolePunchingMessage; + const NAME: &'static str = "HolePunchingMessageReader"; + fn to_entity(&self) -> Self::Entity { + Self::Entity::new_unchecked(self.as_slice().to_owned().into()) + } + fn new_unchecked(slice: &'r [u8]) -> Self { + HolePunchingMessageReader(slice) + } + fn as_slice(&self) -> &'r [u8] { + self.0 + } + fn verify(slice: &[u8], compatible: bool) -> molecule::error::VerificationResult<()> { + use molecule::verification_error as ve; + let slice_len = slice.len(); + if slice_len < molecule::NUMBER_SIZE { + return ve!(Self, HeaderIsBroken, molecule::NUMBER_SIZE, slice_len); + } + let item_id = molecule::unpack_number(slice); + let inner_slice = &slice[molecule::NUMBER_SIZE..]; + match item_id { + 0 => ConnectionRequestReader::verify(inner_slice, compatible), + 1 => ConnectionRequestDeliveredReader::verify(inner_slice, compatible), + 2 => ConnectionSyncReader::verify(inner_slice, compatible), + _ => ve!(Self, UnknownItem, Self::ITEMS_COUNT, item_id), + }?; + Ok(()) + } +} +#[derive(Debug, Default)] +pub struct HolePunchingMessageBuilder(pub(crate) HolePunchingMessageUnion); +impl HolePunchingMessageBuilder { + pub const ITEMS_COUNT: usize = 3; + pub fn set(mut self, v: I) -> Self + where + I: ::core::convert::Into, + { + self.0 = v.into(); + self + } +} +impl molecule::prelude::Builder for HolePunchingMessageBuilder { + type Entity = HolePunchingMessage; + const NAME: &'static str = "HolePunchingMessageBuilder"; + fn expected_length(&self) -> usize { + molecule::NUMBER_SIZE + self.0.as_slice().len() + } + fn write(&self, writer: &mut W) -> molecule::io::Result<()> { + writer.write_all(&molecule::pack_number(self.0.item_id()))?; + writer.write_all(self.0.as_slice()) + } + fn build(&self) -> Self::Entity { + let mut inner = Vec::with_capacity(self.expected_length()); + self.write(&mut inner) + .unwrap_or_else(|_| panic!("{} build should be ok", Self::NAME)); + HolePunchingMessage::new_unchecked(inner.into()) + } +} +#[derive(Debug, Clone)] +pub enum HolePunchingMessageUnion { + ConnectionRequest(ConnectionRequest), + ConnectionRequestDelivered(ConnectionRequestDelivered), + ConnectionSync(ConnectionSync), +} +#[derive(Debug, Clone, Copy)] +pub enum HolePunchingMessageUnionReader<'r> { + ConnectionRequest(ConnectionRequestReader<'r>), + ConnectionRequestDelivered(ConnectionRequestDeliveredReader<'r>), + ConnectionSync(ConnectionSyncReader<'r>), +} +impl ::core::default::Default for HolePunchingMessageUnion { + fn default() -> Self { + HolePunchingMessageUnion::ConnectionRequest(::core::default::Default::default()) + } +} +impl ::core::fmt::Display for HolePunchingMessageUnion { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + match self { + HolePunchingMessageUnion::ConnectionRequest(ref item) => { + write!(f, "{}::{}({})", Self::NAME, ConnectionRequest::NAME, item) + } + HolePunchingMessageUnion::ConnectionRequestDelivered(ref item) => { + write!( + f, + "{}::{}({})", + Self::NAME, + ConnectionRequestDelivered::NAME, + item + ) + } + HolePunchingMessageUnion::ConnectionSync(ref item) => { + write!(f, "{}::{}({})", Self::NAME, ConnectionSync::NAME, item) + } + } + } +} +impl<'r> ::core::fmt::Display for HolePunchingMessageUnionReader<'r> { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + match self { + HolePunchingMessageUnionReader::ConnectionRequest(ref item) => { + write!(f, "{}::{}({})", Self::NAME, ConnectionRequest::NAME, item) + } + HolePunchingMessageUnionReader::ConnectionRequestDelivered(ref item) => { + write!( + f, + "{}::{}({})", + Self::NAME, + ConnectionRequestDelivered::NAME, + item + ) + } + HolePunchingMessageUnionReader::ConnectionSync(ref item) => { + write!(f, "{}::{}({})", Self::NAME, ConnectionSync::NAME, item) + } + } + } +} +impl HolePunchingMessageUnion { + pub(crate) fn display_inner(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + match self { + HolePunchingMessageUnion::ConnectionRequest(ref item) => write!(f, "{}", item), + HolePunchingMessageUnion::ConnectionRequestDelivered(ref item) => write!(f, "{}", item), + HolePunchingMessageUnion::ConnectionSync(ref item) => write!(f, "{}", item), + } + } +} +impl<'r> HolePunchingMessageUnionReader<'r> { + pub(crate) fn display_inner(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + match self { + HolePunchingMessageUnionReader::ConnectionRequest(ref item) => write!(f, "{}", item), + HolePunchingMessageUnionReader::ConnectionRequestDelivered(ref item) => { + write!(f, "{}", item) + } + HolePunchingMessageUnionReader::ConnectionSync(ref item) => write!(f, "{}", item), + } + } +} +impl ::core::convert::From for HolePunchingMessageUnion { + fn from(item: ConnectionRequest) -> Self { + HolePunchingMessageUnion::ConnectionRequest(item) + } +} +impl ::core::convert::From for HolePunchingMessageUnion { + fn from(item: ConnectionRequestDelivered) -> Self { + HolePunchingMessageUnion::ConnectionRequestDelivered(item) + } +} +impl ::core::convert::From for HolePunchingMessageUnion { + fn from(item: ConnectionSync) -> Self { + HolePunchingMessageUnion::ConnectionSync(item) + } +} +impl<'r> ::core::convert::From> for HolePunchingMessageUnionReader<'r> { + fn from(item: ConnectionRequestReader<'r>) -> Self { + HolePunchingMessageUnionReader::ConnectionRequest(item) + } +} +impl<'r> ::core::convert::From> + for HolePunchingMessageUnionReader<'r> +{ + fn from(item: ConnectionRequestDeliveredReader<'r>) -> Self { + HolePunchingMessageUnionReader::ConnectionRequestDelivered(item) + } +} +impl<'r> ::core::convert::From> for HolePunchingMessageUnionReader<'r> { + fn from(item: ConnectionSyncReader<'r>) -> Self { + HolePunchingMessageUnionReader::ConnectionSync(item) + } +} +impl HolePunchingMessageUnion { + pub const NAME: &'static str = "HolePunchingMessageUnion"; + pub fn as_bytes(&self) -> molecule::bytes::Bytes { + match self { + HolePunchingMessageUnion::ConnectionRequest(item) => item.as_bytes(), + HolePunchingMessageUnion::ConnectionRequestDelivered(item) => item.as_bytes(), + HolePunchingMessageUnion::ConnectionSync(item) => item.as_bytes(), + } + } + pub fn as_slice(&self) -> &[u8] { + match self { + HolePunchingMessageUnion::ConnectionRequest(item) => item.as_slice(), + HolePunchingMessageUnion::ConnectionRequestDelivered(item) => item.as_slice(), + HolePunchingMessageUnion::ConnectionSync(item) => item.as_slice(), + } + } + pub fn item_id(&self) -> molecule::Number { + match self { + HolePunchingMessageUnion::ConnectionRequest(_) => 0, + HolePunchingMessageUnion::ConnectionRequestDelivered(_) => 1, + HolePunchingMessageUnion::ConnectionSync(_) => 2, + } + } + pub fn item_name(&self) -> &str { + match self { + HolePunchingMessageUnion::ConnectionRequest(_) => "ConnectionRequest", + HolePunchingMessageUnion::ConnectionRequestDelivered(_) => "ConnectionRequestDelivered", + HolePunchingMessageUnion::ConnectionSync(_) => "ConnectionSync", + } + } + pub fn as_reader<'r>(&'r self) -> HolePunchingMessageUnionReader<'r> { + match self { + HolePunchingMessageUnion::ConnectionRequest(item) => item.as_reader().into(), + HolePunchingMessageUnion::ConnectionRequestDelivered(item) => item.as_reader().into(), + HolePunchingMessageUnion::ConnectionSync(item) => item.as_reader().into(), + } + } +} +impl<'r> HolePunchingMessageUnionReader<'r> { + pub const NAME: &'r str = "HolePunchingMessageUnionReader"; + pub fn as_slice(&self) -> &'r [u8] { + match self { + HolePunchingMessageUnionReader::ConnectionRequest(item) => item.as_slice(), + HolePunchingMessageUnionReader::ConnectionRequestDelivered(item) => item.as_slice(), + HolePunchingMessageUnionReader::ConnectionSync(item) => item.as_slice(), + } + } + pub fn item_id(&self) -> molecule::Number { + match self { + HolePunchingMessageUnionReader::ConnectionRequest(_) => 0, + HolePunchingMessageUnionReader::ConnectionRequestDelivered(_) => 1, + HolePunchingMessageUnionReader::ConnectionSync(_) => 2, + } + } + pub fn item_name(&self) -> &str { + match self { + HolePunchingMessageUnionReader::ConnectionRequest(_) => "ConnectionRequest", + HolePunchingMessageUnionReader::ConnectionRequestDelivered(_) => { + "ConnectionRequestDelivered" + } + HolePunchingMessageUnionReader::ConnectionSync(_) => "ConnectionSync", + } + } +} +#[derive(Clone)] +pub struct ConnectionRequest(molecule::bytes::Bytes); +impl ::core::fmt::LowerHex for ConnectionRequest { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + use molecule::hex_string; + if f.alternate() { + write!(f, "0x")?; + } + write!(f, "{}", hex_string(self.as_slice())) + } +} +impl ::core::fmt::Debug for ConnectionRequest { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{}({:#x})", Self::NAME, self) + } +} +impl ::core::fmt::Display for ConnectionRequest { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{} {{ ", Self::NAME)?; + write!(f, "{}: {}", "from", self.from())?; + write!(f, ", {}: {}", "to", self.to())?; + write!(f, ", {}: {}", "max_hops", self.max_hops())?; + write!(f, ", {}: {}", "route", self.route())?; + write!(f, ", {}: {}", "listen_addrs", self.listen_addrs())?; + let extra_count = self.count_extra_fields(); + if extra_count != 0 { + write!(f, ", .. ({} fields)", extra_count)?; + } + write!(f, " }}") + } +} +impl ::core::default::Default for ConnectionRequest { + fn default() -> Self { + let v = molecule::bytes::Bytes::from_static(&Self::DEFAULT_VALUE); + ConnectionRequest::new_unchecked(v) + } +} +impl ConnectionRequest { + const DEFAULT_VALUE: [u8; 41] = [ + 41, 0, 0, 0, 24, 0, 0, 0, 28, 0, 0, 0, 32, 0, 0, 0, 33, 0, 0, 0, 37, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, + ]; + pub const FIELD_COUNT: usize = 5; + pub fn total_size(&self) -> usize { + molecule::unpack_number(self.as_slice()) as usize + } + pub fn field_count(&self) -> usize { + if self.total_size() == molecule::NUMBER_SIZE { + 0 + } else { + (molecule::unpack_number(&self.as_slice()[molecule::NUMBER_SIZE..]) as usize / 4) - 1 + } + } + pub fn count_extra_fields(&self) -> usize { + self.field_count() - Self::FIELD_COUNT + } + pub fn has_extra_fields(&self) -> bool { + Self::FIELD_COUNT != self.field_count() + } + pub fn from(&self) -> Bytes { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[4..]) as usize; + let end = molecule::unpack_number(&slice[8..]) as usize; + Bytes::new_unchecked(self.0.slice(start..end)) + } + pub fn to(&self) -> Bytes { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[8..]) as usize; + let end = molecule::unpack_number(&slice[12..]) as usize; + Bytes::new_unchecked(self.0.slice(start..end)) + } + pub fn max_hops(&self) -> Byte { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[12..]) as usize; + let end = molecule::unpack_number(&slice[16..]) as usize; + Byte::new_unchecked(self.0.slice(start..end)) + } + pub fn route(&self) -> BytesVec { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[16..]) as usize; + let end = molecule::unpack_number(&slice[20..]) as usize; + BytesVec::new_unchecked(self.0.slice(start..end)) + } + pub fn listen_addrs(&self) -> AddressVec { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[20..]) as usize; + if self.has_extra_fields() { + let end = molecule::unpack_number(&slice[24..]) as usize; + AddressVec::new_unchecked(self.0.slice(start..end)) + } else { + AddressVec::new_unchecked(self.0.slice(start..)) + } + } + pub fn as_reader<'r>(&'r self) -> ConnectionRequestReader<'r> { + ConnectionRequestReader::new_unchecked(self.as_slice()) + } +} +impl molecule::prelude::Entity for ConnectionRequest { + type Builder = ConnectionRequestBuilder; + const NAME: &'static str = "ConnectionRequest"; + fn new_unchecked(data: molecule::bytes::Bytes) -> Self { + ConnectionRequest(data) + } + fn as_bytes(&self) -> molecule::bytes::Bytes { + self.0.clone() + } + fn as_slice(&self) -> &[u8] { + &self.0[..] + } + fn from_slice(slice: &[u8]) -> molecule::error::VerificationResult { + ConnectionRequestReader::from_slice(slice).map(|reader| reader.to_entity()) + } + fn from_compatible_slice(slice: &[u8]) -> molecule::error::VerificationResult { + ConnectionRequestReader::from_compatible_slice(slice).map(|reader| reader.to_entity()) + } + fn new_builder() -> Self::Builder { + ::core::default::Default::default() + } + fn as_builder(self) -> Self::Builder { + Self::new_builder() + .from(self.from()) + .to(self.to()) + .max_hops(self.max_hops()) + .route(self.route()) + .listen_addrs(self.listen_addrs()) + } +} +#[derive(Clone, Copy)] +pub struct ConnectionRequestReader<'r>(&'r [u8]); +impl<'r> ::core::fmt::LowerHex for ConnectionRequestReader<'r> { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + use molecule::hex_string; + if f.alternate() { + write!(f, "0x")?; + } + write!(f, "{}", hex_string(self.as_slice())) + } +} +impl<'r> ::core::fmt::Debug for ConnectionRequestReader<'r> { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{}({:#x})", Self::NAME, self) + } +} +impl<'r> ::core::fmt::Display for ConnectionRequestReader<'r> { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{} {{ ", Self::NAME)?; + write!(f, "{}: {}", "from", self.from())?; + write!(f, ", {}: {}", "to", self.to())?; + write!(f, ", {}: {}", "max_hops", self.max_hops())?; + write!(f, ", {}: {}", "route", self.route())?; + write!(f, ", {}: {}", "listen_addrs", self.listen_addrs())?; + let extra_count = self.count_extra_fields(); + if extra_count != 0 { + write!(f, ", .. ({} fields)", extra_count)?; + } + write!(f, " }}") + } +} +impl<'r> ConnectionRequestReader<'r> { + pub const FIELD_COUNT: usize = 5; + pub fn total_size(&self) -> usize { + molecule::unpack_number(self.as_slice()) as usize + } + pub fn field_count(&self) -> usize { + if self.total_size() == molecule::NUMBER_SIZE { + 0 + } else { + (molecule::unpack_number(&self.as_slice()[molecule::NUMBER_SIZE..]) as usize / 4) - 1 + } + } + pub fn count_extra_fields(&self) -> usize { + self.field_count() - Self::FIELD_COUNT + } + pub fn has_extra_fields(&self) -> bool { + Self::FIELD_COUNT != self.field_count() + } + pub fn from(&self) -> BytesReader<'r> { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[4..]) as usize; + let end = molecule::unpack_number(&slice[8..]) as usize; + BytesReader::new_unchecked(&self.as_slice()[start..end]) + } + pub fn to(&self) -> BytesReader<'r> { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[8..]) as usize; + let end = molecule::unpack_number(&slice[12..]) as usize; + BytesReader::new_unchecked(&self.as_slice()[start..end]) + } + pub fn max_hops(&self) -> ByteReader<'r> { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[12..]) as usize; + let end = molecule::unpack_number(&slice[16..]) as usize; + ByteReader::new_unchecked(&self.as_slice()[start..end]) + } + pub fn route(&self) -> BytesVecReader<'r> { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[16..]) as usize; + let end = molecule::unpack_number(&slice[20..]) as usize; + BytesVecReader::new_unchecked(&self.as_slice()[start..end]) + } + pub fn listen_addrs(&self) -> AddressVecReader<'r> { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[20..]) as usize; + if self.has_extra_fields() { + let end = molecule::unpack_number(&slice[24..]) as usize; + AddressVecReader::new_unchecked(&self.as_slice()[start..end]) + } else { + AddressVecReader::new_unchecked(&self.as_slice()[start..]) + } + } +} +impl<'r> molecule::prelude::Reader<'r> for ConnectionRequestReader<'r> { + type Entity = ConnectionRequest; + const NAME: &'static str = "ConnectionRequestReader"; + fn to_entity(&self) -> Self::Entity { + Self::Entity::new_unchecked(self.as_slice().to_owned().into()) + } + fn new_unchecked(slice: &'r [u8]) -> Self { + ConnectionRequestReader(slice) + } + fn as_slice(&self) -> &'r [u8] { + self.0 + } + fn verify(slice: &[u8], compatible: bool) -> molecule::error::VerificationResult<()> { + use molecule::verification_error as ve; + let slice_len = slice.len(); + if slice_len < molecule::NUMBER_SIZE { + return ve!(Self, HeaderIsBroken, molecule::NUMBER_SIZE, slice_len); + } + let total_size = molecule::unpack_number(slice) as usize; + if slice_len != total_size { + return ve!(Self, TotalSizeNotMatch, total_size, slice_len); + } + if slice_len < molecule::NUMBER_SIZE * 2 { + return ve!(Self, HeaderIsBroken, molecule::NUMBER_SIZE * 2, slice_len); + } + let offset_first = molecule::unpack_number(&slice[molecule::NUMBER_SIZE..]) as usize; + if offset_first % molecule::NUMBER_SIZE != 0 || offset_first < molecule::NUMBER_SIZE * 2 { + return ve!(Self, OffsetsNotMatch); + } + if slice_len < offset_first { + return ve!(Self, HeaderIsBroken, offset_first, slice_len); + } + let field_count = offset_first / molecule::NUMBER_SIZE - 1; + if field_count < Self::FIELD_COUNT { + return ve!(Self, FieldCountNotMatch, Self::FIELD_COUNT, field_count); + } else if !compatible && field_count > Self::FIELD_COUNT { + return ve!(Self, FieldCountNotMatch, Self::FIELD_COUNT, field_count); + }; + let mut offsets: Vec = slice[molecule::NUMBER_SIZE..offset_first] + .chunks_exact(molecule::NUMBER_SIZE) + .map(|x| molecule::unpack_number(x) as usize) + .collect(); + offsets.push(total_size); + if offsets.windows(2).any(|i| i[0] > i[1]) { + return ve!(Self, OffsetsNotMatch); + } + BytesReader::verify(&slice[offsets[0]..offsets[1]], compatible)?; + BytesReader::verify(&slice[offsets[1]..offsets[2]], compatible)?; + ByteReader::verify(&slice[offsets[2]..offsets[3]], compatible)?; + BytesVecReader::verify(&slice[offsets[3]..offsets[4]], compatible)?; + AddressVecReader::verify(&slice[offsets[4]..offsets[5]], compatible)?; + Ok(()) + } +} +#[derive(Debug, Default)] +pub struct ConnectionRequestBuilder { + pub(crate) from: Bytes, + pub(crate) to: Bytes, + pub(crate) max_hops: Byte, + pub(crate) route: BytesVec, + pub(crate) listen_addrs: AddressVec, +} +impl ConnectionRequestBuilder { + pub const FIELD_COUNT: usize = 5; + pub fn from(mut self, v: Bytes) -> Self { + self.from = v; + self + } + pub fn to(mut self, v: Bytes) -> Self { + self.to = v; + self + } + pub fn max_hops(mut self, v: Byte) -> Self { + self.max_hops = v; + self + } + pub fn route(mut self, v: BytesVec) -> Self { + self.route = v; + self + } + pub fn listen_addrs(mut self, v: AddressVec) -> Self { + self.listen_addrs = v; + self + } +} +impl molecule::prelude::Builder for ConnectionRequestBuilder { + type Entity = ConnectionRequest; + const NAME: &'static str = "ConnectionRequestBuilder"; + fn expected_length(&self) -> usize { + molecule::NUMBER_SIZE * (Self::FIELD_COUNT + 1) + + self.from.as_slice().len() + + self.to.as_slice().len() + + self.max_hops.as_slice().len() + + self.route.as_slice().len() + + self.listen_addrs.as_slice().len() + } + fn write(&self, writer: &mut W) -> molecule::io::Result<()> { + let mut total_size = molecule::NUMBER_SIZE * (Self::FIELD_COUNT + 1); + let mut offsets = Vec::with_capacity(Self::FIELD_COUNT); + offsets.push(total_size); + total_size += self.from.as_slice().len(); + offsets.push(total_size); + total_size += self.to.as_slice().len(); + offsets.push(total_size); + total_size += self.max_hops.as_slice().len(); + offsets.push(total_size); + total_size += self.route.as_slice().len(); + offsets.push(total_size); + total_size += self.listen_addrs.as_slice().len(); + writer.write_all(&molecule::pack_number(total_size as molecule::Number))?; + for offset in offsets.into_iter() { + writer.write_all(&molecule::pack_number(offset as molecule::Number))?; + } + writer.write_all(self.from.as_slice())?; + writer.write_all(self.to.as_slice())?; + writer.write_all(self.max_hops.as_slice())?; + writer.write_all(self.route.as_slice())?; + writer.write_all(self.listen_addrs.as_slice())?; + Ok(()) + } + fn build(&self) -> Self::Entity { + let mut inner = Vec::with_capacity(self.expected_length()); + self.write(&mut inner) + .unwrap_or_else(|_| panic!("{} build should be ok", Self::NAME)); + ConnectionRequest::new_unchecked(inner.into()) + } +} +#[derive(Clone)] +pub struct ConnectionRequestDelivered(molecule::bytes::Bytes); +impl ::core::fmt::LowerHex for ConnectionRequestDelivered { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + use molecule::hex_string; + if f.alternate() { + write!(f, "0x")?; + } + write!(f, "{}", hex_string(self.as_slice())) + } +} +impl ::core::fmt::Debug for ConnectionRequestDelivered { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{}({:#x})", Self::NAME, self) + } +} +impl ::core::fmt::Display for ConnectionRequestDelivered { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{} {{ ", Self::NAME)?; + write!(f, "{}: {}", "from", self.from())?; + write!(f, ", {}: {}", "to", self.to())?; + write!(f, ", {}: {}", "route", self.route())?; + write!(f, ", {}: {}", "sync_route", self.sync_route())?; + write!(f, ", {}: {}", "listen_addrs", self.listen_addrs())?; + let extra_count = self.count_extra_fields(); + if extra_count != 0 { + write!(f, ", .. ({} fields)", extra_count)?; + } + write!(f, " }}") + } +} +impl ::core::default::Default for ConnectionRequestDelivered { + fn default() -> Self { + let v = molecule::bytes::Bytes::from_static(&Self::DEFAULT_VALUE); + ConnectionRequestDelivered::new_unchecked(v) + } +} +impl ConnectionRequestDelivered { + const DEFAULT_VALUE: [u8; 44] = [ + 44, 0, 0, 0, 24, 0, 0, 0, 28, 0, 0, 0, 32, 0, 0, 0, 36, 0, 0, 0, 40, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, + ]; + pub const FIELD_COUNT: usize = 5; + pub fn total_size(&self) -> usize { + molecule::unpack_number(self.as_slice()) as usize + } + pub fn field_count(&self) -> usize { + if self.total_size() == molecule::NUMBER_SIZE { + 0 + } else { + (molecule::unpack_number(&self.as_slice()[molecule::NUMBER_SIZE..]) as usize / 4) - 1 + } + } + pub fn count_extra_fields(&self) -> usize { + self.field_count() - Self::FIELD_COUNT + } + pub fn has_extra_fields(&self) -> bool { + Self::FIELD_COUNT != self.field_count() + } + pub fn from(&self) -> Bytes { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[4..]) as usize; + let end = molecule::unpack_number(&slice[8..]) as usize; + Bytes::new_unchecked(self.0.slice(start..end)) + } + pub fn to(&self) -> Bytes { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[8..]) as usize; + let end = molecule::unpack_number(&slice[12..]) as usize; + Bytes::new_unchecked(self.0.slice(start..end)) + } + pub fn route(&self) -> BytesVec { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[12..]) as usize; + let end = molecule::unpack_number(&slice[16..]) as usize; + BytesVec::new_unchecked(self.0.slice(start..end)) + } + pub fn sync_route(&self) -> BytesVec { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[16..]) as usize; + let end = molecule::unpack_number(&slice[20..]) as usize; + BytesVec::new_unchecked(self.0.slice(start..end)) + } + pub fn listen_addrs(&self) -> AddressVec { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[20..]) as usize; + if self.has_extra_fields() { + let end = molecule::unpack_number(&slice[24..]) as usize; + AddressVec::new_unchecked(self.0.slice(start..end)) + } else { + AddressVec::new_unchecked(self.0.slice(start..)) + } + } + pub fn as_reader<'r>(&'r self) -> ConnectionRequestDeliveredReader<'r> { + ConnectionRequestDeliveredReader::new_unchecked(self.as_slice()) + } +} +impl molecule::prelude::Entity for ConnectionRequestDelivered { + type Builder = ConnectionRequestDeliveredBuilder; + const NAME: &'static str = "ConnectionRequestDelivered"; + fn new_unchecked(data: molecule::bytes::Bytes) -> Self { + ConnectionRequestDelivered(data) + } + fn as_bytes(&self) -> molecule::bytes::Bytes { + self.0.clone() + } + fn as_slice(&self) -> &[u8] { + &self.0[..] + } + fn from_slice(slice: &[u8]) -> molecule::error::VerificationResult { + ConnectionRequestDeliveredReader::from_slice(slice).map(|reader| reader.to_entity()) + } + fn from_compatible_slice(slice: &[u8]) -> molecule::error::VerificationResult { + ConnectionRequestDeliveredReader::from_compatible_slice(slice) + .map(|reader| reader.to_entity()) + } + fn new_builder() -> Self::Builder { + ::core::default::Default::default() + } + fn as_builder(self) -> Self::Builder { + Self::new_builder() + .from(self.from()) + .to(self.to()) + .route(self.route()) + .sync_route(self.sync_route()) + .listen_addrs(self.listen_addrs()) + } +} +#[derive(Clone, Copy)] +pub struct ConnectionRequestDeliveredReader<'r>(&'r [u8]); +impl<'r> ::core::fmt::LowerHex for ConnectionRequestDeliveredReader<'r> { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + use molecule::hex_string; + if f.alternate() { + write!(f, "0x")?; + } + write!(f, "{}", hex_string(self.as_slice())) + } +} +impl<'r> ::core::fmt::Debug for ConnectionRequestDeliveredReader<'r> { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{}({:#x})", Self::NAME, self) + } +} +impl<'r> ::core::fmt::Display for ConnectionRequestDeliveredReader<'r> { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{} {{ ", Self::NAME)?; + write!(f, "{}: {}", "from", self.from())?; + write!(f, ", {}: {}", "to", self.to())?; + write!(f, ", {}: {}", "route", self.route())?; + write!(f, ", {}: {}", "sync_route", self.sync_route())?; + write!(f, ", {}: {}", "listen_addrs", self.listen_addrs())?; + let extra_count = self.count_extra_fields(); + if extra_count != 0 { + write!(f, ", .. ({} fields)", extra_count)?; + } + write!(f, " }}") + } +} +impl<'r> ConnectionRequestDeliveredReader<'r> { + pub const FIELD_COUNT: usize = 5; + pub fn total_size(&self) -> usize { + molecule::unpack_number(self.as_slice()) as usize + } + pub fn field_count(&self) -> usize { + if self.total_size() == molecule::NUMBER_SIZE { + 0 + } else { + (molecule::unpack_number(&self.as_slice()[molecule::NUMBER_SIZE..]) as usize / 4) - 1 + } + } + pub fn count_extra_fields(&self) -> usize { + self.field_count() - Self::FIELD_COUNT + } + pub fn has_extra_fields(&self) -> bool { + Self::FIELD_COUNT != self.field_count() + } + pub fn from(&self) -> BytesReader<'r> { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[4..]) as usize; + let end = molecule::unpack_number(&slice[8..]) as usize; + BytesReader::new_unchecked(&self.as_slice()[start..end]) + } + pub fn to(&self) -> BytesReader<'r> { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[8..]) as usize; + let end = molecule::unpack_number(&slice[12..]) as usize; + BytesReader::new_unchecked(&self.as_slice()[start..end]) + } + pub fn route(&self) -> BytesVecReader<'r> { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[12..]) as usize; + let end = molecule::unpack_number(&slice[16..]) as usize; + BytesVecReader::new_unchecked(&self.as_slice()[start..end]) + } + pub fn sync_route(&self) -> BytesVecReader<'r> { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[16..]) as usize; + let end = molecule::unpack_number(&slice[20..]) as usize; + BytesVecReader::new_unchecked(&self.as_slice()[start..end]) + } + pub fn listen_addrs(&self) -> AddressVecReader<'r> { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[20..]) as usize; + if self.has_extra_fields() { + let end = molecule::unpack_number(&slice[24..]) as usize; + AddressVecReader::new_unchecked(&self.as_slice()[start..end]) + } else { + AddressVecReader::new_unchecked(&self.as_slice()[start..]) + } + } +} +impl<'r> molecule::prelude::Reader<'r> for ConnectionRequestDeliveredReader<'r> { + type Entity = ConnectionRequestDelivered; + const NAME: &'static str = "ConnectionRequestDeliveredReader"; + fn to_entity(&self) -> Self::Entity { + Self::Entity::new_unchecked(self.as_slice().to_owned().into()) + } + fn new_unchecked(slice: &'r [u8]) -> Self { + ConnectionRequestDeliveredReader(slice) + } + fn as_slice(&self) -> &'r [u8] { + self.0 + } + fn verify(slice: &[u8], compatible: bool) -> molecule::error::VerificationResult<()> { + use molecule::verification_error as ve; + let slice_len = slice.len(); + if slice_len < molecule::NUMBER_SIZE { + return ve!(Self, HeaderIsBroken, molecule::NUMBER_SIZE, slice_len); + } + let total_size = molecule::unpack_number(slice) as usize; + if slice_len != total_size { + return ve!(Self, TotalSizeNotMatch, total_size, slice_len); + } + if slice_len < molecule::NUMBER_SIZE * 2 { + return ve!(Self, HeaderIsBroken, molecule::NUMBER_SIZE * 2, slice_len); + } + let offset_first = molecule::unpack_number(&slice[molecule::NUMBER_SIZE..]) as usize; + if offset_first % molecule::NUMBER_SIZE != 0 || offset_first < molecule::NUMBER_SIZE * 2 { + return ve!(Self, OffsetsNotMatch); + } + if slice_len < offset_first { + return ve!(Self, HeaderIsBroken, offset_first, slice_len); + } + let field_count = offset_first / molecule::NUMBER_SIZE - 1; + if field_count < Self::FIELD_COUNT { + return ve!(Self, FieldCountNotMatch, Self::FIELD_COUNT, field_count); + } else if !compatible && field_count > Self::FIELD_COUNT { + return ve!(Self, FieldCountNotMatch, Self::FIELD_COUNT, field_count); + }; + let mut offsets: Vec = slice[molecule::NUMBER_SIZE..offset_first] + .chunks_exact(molecule::NUMBER_SIZE) + .map(|x| molecule::unpack_number(x) as usize) + .collect(); + offsets.push(total_size); + if offsets.windows(2).any(|i| i[0] > i[1]) { + return ve!(Self, OffsetsNotMatch); + } + BytesReader::verify(&slice[offsets[0]..offsets[1]], compatible)?; + BytesReader::verify(&slice[offsets[1]..offsets[2]], compatible)?; + BytesVecReader::verify(&slice[offsets[2]..offsets[3]], compatible)?; + BytesVecReader::verify(&slice[offsets[3]..offsets[4]], compatible)?; + AddressVecReader::verify(&slice[offsets[4]..offsets[5]], compatible)?; + Ok(()) + } +} +#[derive(Debug, Default)] +pub struct ConnectionRequestDeliveredBuilder { + pub(crate) from: Bytes, + pub(crate) to: Bytes, + pub(crate) route: BytesVec, + pub(crate) sync_route: BytesVec, + pub(crate) listen_addrs: AddressVec, +} +impl ConnectionRequestDeliveredBuilder { + pub const FIELD_COUNT: usize = 5; + pub fn from(mut self, v: Bytes) -> Self { + self.from = v; + self + } + pub fn to(mut self, v: Bytes) -> Self { + self.to = v; + self + } + pub fn route(mut self, v: BytesVec) -> Self { + self.route = v; + self + } + pub fn sync_route(mut self, v: BytesVec) -> Self { + self.sync_route = v; + self + } + pub fn listen_addrs(mut self, v: AddressVec) -> Self { + self.listen_addrs = v; + self + } +} +impl molecule::prelude::Builder for ConnectionRequestDeliveredBuilder { + type Entity = ConnectionRequestDelivered; + const NAME: &'static str = "ConnectionRequestDeliveredBuilder"; + fn expected_length(&self) -> usize { + molecule::NUMBER_SIZE * (Self::FIELD_COUNT + 1) + + self.from.as_slice().len() + + self.to.as_slice().len() + + self.route.as_slice().len() + + self.sync_route.as_slice().len() + + self.listen_addrs.as_slice().len() + } + fn write(&self, writer: &mut W) -> molecule::io::Result<()> { + let mut total_size = molecule::NUMBER_SIZE * (Self::FIELD_COUNT + 1); + let mut offsets = Vec::with_capacity(Self::FIELD_COUNT); + offsets.push(total_size); + total_size += self.from.as_slice().len(); + offsets.push(total_size); + total_size += self.to.as_slice().len(); + offsets.push(total_size); + total_size += self.route.as_slice().len(); + offsets.push(total_size); + total_size += self.sync_route.as_slice().len(); + offsets.push(total_size); + total_size += self.listen_addrs.as_slice().len(); + writer.write_all(&molecule::pack_number(total_size as molecule::Number))?; + for offset in offsets.into_iter() { + writer.write_all(&molecule::pack_number(offset as molecule::Number))?; + } + writer.write_all(self.from.as_slice())?; + writer.write_all(self.to.as_slice())?; + writer.write_all(self.route.as_slice())?; + writer.write_all(self.sync_route.as_slice())?; + writer.write_all(self.listen_addrs.as_slice())?; + Ok(()) + } + fn build(&self) -> Self::Entity { + let mut inner = Vec::with_capacity(self.expected_length()); + self.write(&mut inner) + .unwrap_or_else(|_| panic!("{} build should be ok", Self::NAME)); + ConnectionRequestDelivered::new_unchecked(inner.into()) + } +} +#[derive(Clone)] +pub struct ConnectionSync(molecule::bytes::Bytes); +impl ::core::fmt::LowerHex for ConnectionSync { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + use molecule::hex_string; + if f.alternate() { + write!(f, "0x")?; + } + write!(f, "{}", hex_string(self.as_slice())) + } +} +impl ::core::fmt::Debug for ConnectionSync { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{}({:#x})", Self::NAME, self) + } +} +impl ::core::fmt::Display for ConnectionSync { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{} {{ ", Self::NAME)?; + write!(f, "{}: {}", "from", self.from())?; + write!(f, ", {}: {}", "to", self.to())?; + write!(f, ", {}: {}", "route", self.route())?; + let extra_count = self.count_extra_fields(); + if extra_count != 0 { + write!(f, ", .. ({} fields)", extra_count)?; + } + write!(f, " }}") + } +} +impl ::core::default::Default for ConnectionSync { + fn default() -> Self { + let v = molecule::bytes::Bytes::from_static(&Self::DEFAULT_VALUE); + ConnectionSync::new_unchecked(v) + } +} +impl ConnectionSync { + const DEFAULT_VALUE: [u8; 28] = [ + 28, 0, 0, 0, 16, 0, 0, 0, 20, 0, 0, 0, 24, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, + ]; + pub const FIELD_COUNT: usize = 3; + pub fn total_size(&self) -> usize { + molecule::unpack_number(self.as_slice()) as usize + } + pub fn field_count(&self) -> usize { + if self.total_size() == molecule::NUMBER_SIZE { + 0 + } else { + (molecule::unpack_number(&self.as_slice()[molecule::NUMBER_SIZE..]) as usize / 4) - 1 + } + } + pub fn count_extra_fields(&self) -> usize { + self.field_count() - Self::FIELD_COUNT + } + pub fn has_extra_fields(&self) -> bool { + Self::FIELD_COUNT != self.field_count() + } + pub fn from(&self) -> Bytes { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[4..]) as usize; + let end = molecule::unpack_number(&slice[8..]) as usize; + Bytes::new_unchecked(self.0.slice(start..end)) + } + pub fn to(&self) -> Bytes { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[8..]) as usize; + let end = molecule::unpack_number(&slice[12..]) as usize; + Bytes::new_unchecked(self.0.slice(start..end)) + } + pub fn route(&self) -> BytesVec { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[12..]) as usize; + if self.has_extra_fields() { + let end = molecule::unpack_number(&slice[16..]) as usize; + BytesVec::new_unchecked(self.0.slice(start..end)) + } else { + BytesVec::new_unchecked(self.0.slice(start..)) + } + } + pub fn as_reader<'r>(&'r self) -> ConnectionSyncReader<'r> { + ConnectionSyncReader::new_unchecked(self.as_slice()) + } +} +impl molecule::prelude::Entity for ConnectionSync { + type Builder = ConnectionSyncBuilder; + const NAME: &'static str = "ConnectionSync"; + fn new_unchecked(data: molecule::bytes::Bytes) -> Self { + ConnectionSync(data) + } + fn as_bytes(&self) -> molecule::bytes::Bytes { + self.0.clone() + } + fn as_slice(&self) -> &[u8] { + &self.0[..] + } + fn from_slice(slice: &[u8]) -> molecule::error::VerificationResult { + ConnectionSyncReader::from_slice(slice).map(|reader| reader.to_entity()) + } + fn from_compatible_slice(slice: &[u8]) -> molecule::error::VerificationResult { + ConnectionSyncReader::from_compatible_slice(slice).map(|reader| reader.to_entity()) + } + fn new_builder() -> Self::Builder { + ::core::default::Default::default() + } + fn as_builder(self) -> Self::Builder { + Self::new_builder() + .from(self.from()) + .to(self.to()) + .route(self.route()) + } +} +#[derive(Clone, Copy)] +pub struct ConnectionSyncReader<'r>(&'r [u8]); +impl<'r> ::core::fmt::LowerHex for ConnectionSyncReader<'r> { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + use molecule::hex_string; + if f.alternate() { + write!(f, "0x")?; + } + write!(f, "{}", hex_string(self.as_slice())) + } +} +impl<'r> ::core::fmt::Debug for ConnectionSyncReader<'r> { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{}({:#x})", Self::NAME, self) + } +} +impl<'r> ::core::fmt::Display for ConnectionSyncReader<'r> { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + write!(f, "{} {{ ", Self::NAME)?; + write!(f, "{}: {}", "from", self.from())?; + write!(f, ", {}: {}", "to", self.to())?; + write!(f, ", {}: {}", "route", self.route())?; + let extra_count = self.count_extra_fields(); + if extra_count != 0 { + write!(f, ", .. ({} fields)", extra_count)?; + } + write!(f, " }}") + } +} +impl<'r> ConnectionSyncReader<'r> { + pub const FIELD_COUNT: usize = 3; + pub fn total_size(&self) -> usize { + molecule::unpack_number(self.as_slice()) as usize + } + pub fn field_count(&self) -> usize { + if self.total_size() == molecule::NUMBER_SIZE { + 0 + } else { + (molecule::unpack_number(&self.as_slice()[molecule::NUMBER_SIZE..]) as usize / 4) - 1 + } + } + pub fn count_extra_fields(&self) -> usize { + self.field_count() - Self::FIELD_COUNT + } + pub fn has_extra_fields(&self) -> bool { + Self::FIELD_COUNT != self.field_count() + } + pub fn from(&self) -> BytesReader<'r> { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[4..]) as usize; + let end = molecule::unpack_number(&slice[8..]) as usize; + BytesReader::new_unchecked(&self.as_slice()[start..end]) + } + pub fn to(&self) -> BytesReader<'r> { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[8..]) as usize; + let end = molecule::unpack_number(&slice[12..]) as usize; + BytesReader::new_unchecked(&self.as_slice()[start..end]) + } + pub fn route(&self) -> BytesVecReader<'r> { + let slice = self.as_slice(); + let start = molecule::unpack_number(&slice[12..]) as usize; + if self.has_extra_fields() { + let end = molecule::unpack_number(&slice[16..]) as usize; + BytesVecReader::new_unchecked(&self.as_slice()[start..end]) + } else { + BytesVecReader::new_unchecked(&self.as_slice()[start..]) + } + } +} +impl<'r> molecule::prelude::Reader<'r> for ConnectionSyncReader<'r> { + type Entity = ConnectionSync; + const NAME: &'static str = "ConnectionSyncReader"; + fn to_entity(&self) -> Self::Entity { + Self::Entity::new_unchecked(self.as_slice().to_owned().into()) + } + fn new_unchecked(slice: &'r [u8]) -> Self { + ConnectionSyncReader(slice) + } + fn as_slice(&self) -> &'r [u8] { + self.0 + } + fn verify(slice: &[u8], compatible: bool) -> molecule::error::VerificationResult<()> { + use molecule::verification_error as ve; + let slice_len = slice.len(); + if slice_len < molecule::NUMBER_SIZE { + return ve!(Self, HeaderIsBroken, molecule::NUMBER_SIZE, slice_len); + } + let total_size = molecule::unpack_number(slice) as usize; + if slice_len != total_size { + return ve!(Self, TotalSizeNotMatch, total_size, slice_len); + } + if slice_len < molecule::NUMBER_SIZE * 2 { + return ve!(Self, HeaderIsBroken, molecule::NUMBER_SIZE * 2, slice_len); + } + let offset_first = molecule::unpack_number(&slice[molecule::NUMBER_SIZE..]) as usize; + if offset_first % molecule::NUMBER_SIZE != 0 || offset_first < molecule::NUMBER_SIZE * 2 { + return ve!(Self, OffsetsNotMatch); + } + if slice_len < offset_first { + return ve!(Self, HeaderIsBroken, offset_first, slice_len); + } + let field_count = offset_first / molecule::NUMBER_SIZE - 1; + if field_count < Self::FIELD_COUNT { + return ve!(Self, FieldCountNotMatch, Self::FIELD_COUNT, field_count); + } else if !compatible && field_count > Self::FIELD_COUNT { + return ve!(Self, FieldCountNotMatch, Self::FIELD_COUNT, field_count); + }; + let mut offsets: Vec = slice[molecule::NUMBER_SIZE..offset_first] + .chunks_exact(molecule::NUMBER_SIZE) + .map(|x| molecule::unpack_number(x) as usize) + .collect(); + offsets.push(total_size); + if offsets.windows(2).any(|i| i[0] > i[1]) { + return ve!(Self, OffsetsNotMatch); + } + BytesReader::verify(&slice[offsets[0]..offsets[1]], compatible)?; + BytesReader::verify(&slice[offsets[1]..offsets[2]], compatible)?; + BytesVecReader::verify(&slice[offsets[2]..offsets[3]], compatible)?; + Ok(()) + } +} +#[derive(Debug, Default)] +pub struct ConnectionSyncBuilder { + pub(crate) from: Bytes, + pub(crate) to: Bytes, + pub(crate) route: BytesVec, +} +impl ConnectionSyncBuilder { + pub const FIELD_COUNT: usize = 3; + pub fn from(mut self, v: Bytes) -> Self { + self.from = v; + self + } + pub fn to(mut self, v: Bytes) -> Self { + self.to = v; + self + } + pub fn route(mut self, v: BytesVec) -> Self { + self.route = v; + self + } +} +impl molecule::prelude::Builder for ConnectionSyncBuilder { + type Entity = ConnectionSync; + const NAME: &'static str = "ConnectionSyncBuilder"; + fn expected_length(&self) -> usize { + molecule::NUMBER_SIZE * (Self::FIELD_COUNT + 1) + + self.from.as_slice().len() + + self.to.as_slice().len() + + self.route.as_slice().len() + } + fn write(&self, writer: &mut W) -> molecule::io::Result<()> { + let mut total_size = molecule::NUMBER_SIZE * (Self::FIELD_COUNT + 1); + let mut offsets = Vec::with_capacity(Self::FIELD_COUNT); + offsets.push(total_size); + total_size += self.from.as_slice().len(); + offsets.push(total_size); + total_size += self.to.as_slice().len(); + offsets.push(total_size); + total_size += self.route.as_slice().len(); + writer.write_all(&molecule::pack_number(total_size as molecule::Number))?; + for offset in offsets.into_iter() { + writer.write_all(&molecule::pack_number(offset as molecule::Number))?; + } + writer.write_all(self.from.as_slice())?; + writer.write_all(self.to.as_slice())?; + writer.write_all(self.route.as_slice())?; + Ok(()) + } + fn build(&self) -> Self::Entity { + let mut inner = Vec::with_capacity(self.expected_length()); + self.write(&mut inner) + .unwrap_or_else(|_| panic!("{} build should be ok", Self::NAME)); + ConnectionSync::new_unchecked(inner.into()) + } +}