Skip to content

Commit 75a1a8d

Browse files
committed
New Timer & Peer IDs for Host:
Id: 1 - 7: Host Systems. id: 8 - 99: User Timers (Hope thats enough) Id: 100+ : DevP2P Session Peers
1 parent ee436fd commit 75a1a8d

File tree

2 files changed

+51
-18
lines changed

2 files changed

+51
-18
lines changed

crates/net/network-devp2p/src/host.rs

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,23 @@ const DISCOVERY_REFRESH: TimerToken = 4;
7070
const FAST_DISCOVERY_REFRESH: TimerToken = 5;
7171
const DISCOVERY_ROUND: TimerToken = 6;
7272
const NODE_TABLE: TimerToken = 7;
73-
const USER_TIMER: TimerToken = 8;
7473

75-
const FIRST_SESSION: StreamToken = 9;
74+
// Maximum count of peer mappings we remember. each node ID takes 64 bytes for nodeID, 8 bytes for peer id, and probabyl 3 * 8 bytes for internals. = about 100 bytes per entry.)
75+
// 10000 elements should take about 1 MB of memory.
76+
const MAX_NODE_TO_PEER_MAPPINGS: usize = 10000;
77+
78+
// the user timers are a collection of timers that are registered by the protocol handlers.
79+
// therefore comming from other modules.
80+
// to be not in conflict with the token ID system, we choose a very
81+
// high number for the user timers,
82+
// that is realisticly unreachable by the peer stream tokens,
83+
// but still have enough number space for user timers.
84+
const FIRST_USER_TIMER: TimerToken = 8;
85+
const MAX_USER_TIMERS: TimerToken = 91;
86+
const LAST_USER_TIMER: TimerToken = FIRST_USER_TIMER + MAX_USER_TIMERS;
87+
88+
const FIRST_SESSION: StreamToken = LAST_USER_TIMER + 1;
89+
const LAST_SESSION: StreamToken = StreamToken::MAX;
7690

7791
// Timeouts
7892
// for IDLE TimerToken
@@ -423,11 +437,11 @@ impl Host {
423437
discovery: Mutex::new(None),
424438
udp_socket: Mutex::new(None),
425439
tcp_listener: Mutex::new(tcp_listener),
426-
sessions: SessionContainer::new(FIRST_SESSION, MAX_SESSIONS),
440+
sessions: SessionContainer::new(FIRST_SESSION, MAX_SESSIONS, MAX_NODE_TO_PEER_MAPPINGS),
427441
nodes: RwLock::new(NodeTable::new(path)),
428442
handlers: RwLock::new(HashMap::new()),
429443
timers: RwLock::new(HashMap::new()),
430-
timer_counter: RwLock::new(USER_TIMER),
444+
timer_counter: RwLock::new(FIRST_USER_TIMER),
431445
reserved_nodes: RwLock::new(HashSet::new()),
432446
stopping: AtomicBool::new(false),
433447
filter,
@@ -1299,7 +1313,7 @@ impl IoHandler<NetworkIoMessage> for Host {
12991313
return;
13001314
}
13011315
match stream {
1302-
FIRST_SESSION.. => self.session_readable(stream, io),
1316+
FIRST_SESSION..LAST_SESSION => self.session_readable(stream, io),
13031317
DISCOVERY => self.discovery_readable(io),
13041318
TCP_ACCEPT => self.accept(io),
13051319
_ => panic!("Received unknown readable token"),
@@ -1311,22 +1325,20 @@ impl IoHandler<NetworkIoMessage> for Host {
13111325
return;
13121326
}
13131327
match stream {
1314-
FIRST_SESSION.. => self.session_writable(stream, io),
1328+
FIRST_SESSION..=LAST_SESSION => self.session_writable(stream, io),
13151329
DISCOVERY => self.discovery_writable(io),
13161330
_ => panic!("Received unknown writable token"),
13171331
}
13181332
}
13191333

13201334
fn timeout(&self, io: &IoContext<NetworkIoMessage>, token: TimerToken) {
1335+
trace!(target: "network", "HOST timout: {}", token);
13211336
if self.stopping.load(AtomicOrdering::SeqCst) {
13221337
return;
13231338
}
13241339
match token {
13251340
IDLE => self.maintain_network(io),
1326-
FIRST_SESSION.. => {
1327-
trace!(target: "network", "Timeout from Host impl: {}", token);
1328-
self.connection_timeout(token, io);
1329-
}
1341+
13301342
DISCOVERY_REFRESH => {
13311343
// Run the _slow_ discovery if enough peers are connected
13321344
if !self.has_enough_peers() {
@@ -1356,7 +1368,7 @@ impl IoHandler<NetworkIoMessage> for Host {
13561368
nodes.clear_useless();
13571369
nodes.save();
13581370
}
1359-
_ => match self.timers.read().get(&token).cloned() {
1371+
FIRST_USER_TIMER..=LAST_USER_TIMER => match self.timers.read().get(&token).cloned() {
13601372
Some(timer) => match self.handlers.read().get(&timer.protocol).cloned() {
13611373
None => {
13621374
warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol)
@@ -1380,6 +1392,13 @@ impl IoHandler<NetworkIoMessage> for Host {
13801392
warn!("Unknown timer token: {}", token);
13811393
} // timer is not registerd through us
13821394
},
1395+
FIRST_SESSION..=LAST_SESSION => {
1396+
trace!(target: "network", "Timeout from Host impl: {}", token);
1397+
self.connection_timeout(token, io);
1398+
}
1399+
_ => {
1400+
warn!(target: "network", "HOST: Unknown timer token tick: {}", token);
1401+
}
13831402
}
13841403
}
13851404

@@ -1419,20 +1438,33 @@ impl IoHandler<NetworkIoMessage> for Host {
14191438
ref token,
14201439
} => {
14211440
trace!(target: "network", "Adding timer for protocol: {:?}, delay: {:?}, token: {}", protocol, delay.as_millis(), token);
1441+
1442+
if token >= &MAX_USER_TIMERS {
1443+
warn!(target: "network", "Tried to register timer with token {} which is larger than MAX_USER_TIMERS {}", token, MAX_USER_TIMERS);
1444+
return;
1445+
}
1446+
14221447
let handler_token = {
14231448
let mut timer_counter = self.timer_counter.write();
14241449
let counter = &mut *timer_counter;
14251450
let handler_token = *counter;
14261451
*counter += 1;
14271452
handler_token
14281453
};
1454+
1455+
if handler_token > LAST_USER_TIMER {
1456+
warn!(target: "network", "Tried to register timer witch Index {token} what would be over the boundaries of usertimers: {LAST_USER_TIMER}");
1457+
return;
1458+
}
14291459
self.timers.write().insert(
14301460
handler_token,
14311461
ProtocolTimer {
14321462
protocol: *protocol,
14331463
token: *token,
14341464
},
14351465
);
1466+
1467+
trace!(target: "network", "Registering handler_token {} token {} for protocol {:?} with delay {:?}", handler_token, token, protocol, delay);
14361468
io.register_timer(handler_token, *delay)
14371469
.unwrap_or_else(|e| debug!("Error registering timer {}: {:?}", token, e));
14381470
}

crates/net/network-devp2p/src/session_container.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::host::HostInfo;
22
use ethereum_types::H256;
3+
use lru_cache::LruCache;
34
use mio::net::TcpStream;
45
use std::{collections::BTreeMap, sync::Arc, time::Duration};
56

@@ -19,16 +20,16 @@ pub struct SessionContainer {
1920
max_sessions: usize,
2021
sessions: Arc<RwLock<std::collections::BTreeMap<usize, SharedSession>>>,
2122
expired_sessions: Arc<RwLock<Vec<SharedSession>>>,
22-
node_id_to_session: Mutex<BTreeMap<ethereum_types::H512, usize>>, // used to map Node IDs to last used session tokens.
23+
node_id_to_session: Mutex<LruCache<ethereum_types::H512, usize>>, // used to map Node IDs to last used session tokens.
2324
sessions_token_max: Mutex<usize>, // Used to generate new session tokens
2425
}
2526

2627
impl SessionContainer {
27-
pub fn new(first_session_token: usize, max_sessions: usize) -> Self {
28+
pub fn new(first_session_token: usize, max_sessions: usize, max_node_mappings: usize) -> Self {
2829
SessionContainer {
2930
sessions: Arc::new(RwLock::new(std::collections::BTreeMap::new())),
3031
expired_sessions: Arc::new(RwLock::new(Vec::new())),
31-
node_id_to_session: Mutex::new(BTreeMap::new()),
32+
node_id_to_session: Mutex::new(LruCache::new(max_node_mappings)),
3233
sessions_token_max: Mutex::new(first_session_token),
3334
max_sessions,
3435
}
@@ -47,7 +48,7 @@ impl SessionContainer {
4748
fn create_token_id(
4849
&self,
4950
node_id: &NodeId,
50-
tokens: &mut BTreeMap<ethereum_types::H512, usize>,
51+
tokens: &mut LruCache<ethereum_types::H512, usize>,
5152
) -> usize {
5253
let mut session_token_max = self.sessions_token_max.lock();
5354
let next_id = session_token_max.clone();
@@ -132,7 +133,7 @@ impl SessionContainer {
132133

133134
if let Some(node_id) = id {
134135
// check if there is already a connection for the given node id.
135-
if let Some(existing_peer_id) = node_ids.get(node_id) {
136+
if let Some(existing_peer_id) = node_ids.get_mut(node_id) {
136137
let existing_session_mutex_o = sessions.get(existing_peer_id).clone();
137138

138139
if let Some(existing_session_mutex) = existing_session_mutex_o {
@@ -279,7 +280,7 @@ impl SessionContainer {
279280
pub fn get_session_for(&self, id: &NodeId) -> Option<SharedSession> {
280281
self.node_id_to_session
281282
.lock()
282-
.get(id)
283+
.get_mut(id)
283284
.cloned()
284285
.map_or(None, |peer_id| {
285286
let sessions = self.sessions.read();
@@ -294,7 +295,7 @@ impl SessionContainer {
294295
) -> Option<usize> {
295296
self.node_id_to_session
296297
.lock()
297-
.get(node_id)
298+
.get_mut(node_id)
298299
.map_or(None, |peer_id| {
299300
if !only_available_sessions {
300301
return Some(*peer_id);

0 commit comments

Comments
 (0)