Skip to content

Commit f23efc9

Browse files
committed
alignment of token ids.
system streams go now before the user session streams. Todo: - respect track max number of connections for generating new peer_id - use new lookup strategy instead of iterating over nodes.
1 parent 585e16a commit f23efc9

File tree

1 file changed

+105
-83
lines changed
  • crates/net/network-devp2p/src

1 file changed

+105
-83
lines changed

crates/net/network-devp2p/src/host.rs

Lines changed: 105 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,18 @@ const MAX_HANDSHAKES: usize = 1024;
6363

6464
const DEFAULT_PORT: u16 = 30303;
6565

66+
const SYS_TIMER: TimerToken = 0;
6667
// StreamToken/TimerToken
67-
const TCP_ACCEPT: StreamToken = SYS_TIMER + 1;
68-
const IDLE: TimerToken = SYS_TIMER + 2;
69-
const DISCOVERY: StreamToken = SYS_TIMER + 3;
70-
const DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 4;
71-
const FAST_DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 5;
72-
const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 6;
73-
const NODE_TABLE: TimerToken = SYS_TIMER + 7;
74-
const FIRST_SESSION: StreamToken = 0;
75-
const LAST_SESSION: StreamToken = FIRST_SESSION + MAX_SESSIONS - 1;
76-
const USER_TIMER: TimerToken = LAST_SESSION + 256;
77-
const SYS_TIMER: TimerToken = LAST_SESSION + 1;
68+
const TCP_ACCEPT: StreamToken = 1;
69+
const IDLE: TimerToken = 2;
70+
const DISCOVERY: StreamToken = 3;
71+
const DISCOVERY_REFRESH: TimerToken = 4;
72+
const FAST_DISCOVERY_REFRESH: TimerToken = 5;
73+
const DISCOVERY_ROUND: TimerToken = 6;
74+
const NODE_TABLE: TimerToken = 7;
75+
const USER_TIMER: TimerToken = 8;
76+
77+
const FIRST_SESSION: StreamToken = 9;
7878

7979
// Timeouts
8080
// for IDLE TimerToken
@@ -366,6 +366,8 @@ struct ProtocolTimer {
366366

367367
struct SessionContainer {
368368
sessions: Arc<RwLock<std::collections::BTreeMap<usize, SharedSession>>>,
369+
expired_sessions: Arc<RwLock<Vec<SharedSession>>>,
370+
discovery_session: Arc<RwLock<Option<SharedSession>>>,
369371
node_id_to_session: Mutex<BTreeMap<ethereum_types::H512, usize>>, // used to map Node IDs to last used session tokens.
370372
sessions_token_max: Mutex<usize>, // Used to generate new session tokens
371373
}
@@ -374,8 +376,10 @@ impl SessionContainer {
374376
pub fn new() -> Self {
375377
SessionContainer {
376378
sessions: Arc::new(RwLock::new(std::collections::BTreeMap::new())),
379+
expired_sessions: Arc::new(RwLock::new(Vec::new())),
377380
node_id_to_session: Mutex::new(BTreeMap::new()),
378381
sessions_token_max: Mutex::new(0),
382+
discovery_session: Arc::new(RwLock::new(None)),
379383
}
380384
}
381385

@@ -415,65 +419,77 @@ impl SessionContainer {
415419
// creating a connection is a very rare event.
416420

417421
// always lock the node_id_to_session first, then the sessions.
422+
let mut expired_session = self.expired_sessions.write();
418423
let mut node_ids = self.node_id_to_session.lock();
419424
let mut sessions = self.sessions.write();
425+
420426

421427
if let Some(node_id) = id {
422428
// check if there is already a connection for the given node id.
423429
if let Some(existing_peer_id) = node_ids.get(node_id) {
424-
if let Some(existing_session_mutex) = sessions.get(existing_peer_id) {
425-
let session = existing_session_mutex.lock();
426-
if let Some(id_from_session) = &session.info.id {
427-
if session.info.id == Some(*node_id) {
428-
// we got already got a session for the specified node.
429-
// maybe the old session is already scheduled for getting deleted.
430-
if session.expired() {
431-
// if the session is expired, we will just create n new session for this node.
432-
let new_session = Session::new(
433-
io,
434-
socket,
435-
existing_peer_id.clone(),
436-
id,
437-
nonce,
438-
host,
439-
);
440-
match new_session {
441-
Ok(session) => {
442-
//let mut session_write = RwLockUpgradableReadGuard::upgrade(sessions);
443-
let _old_session = sessions.insert(
444-
*existing_peer_id,
445-
Arc::new(Mutex::new(session)),
446-
);
447-
// node_ids does not need to get updated, since it uses the same value.
448-
449-
// in this context, the stream might already be unregistered ?!
450-
// we can just register the stream again.
451-
if let Err(err) = io.register_stream(*existing_peer_id) {
452-
// todo: research this topic, watch out for this message,
453-
// maybe we can keep track of stream registrations as well somehow.
454-
debug!(target: "network", "Failed to register stream for token: {} : {}", existing_peer_id, err);
455-
}
456-
457-
debug!(target: "network", "Created new session for node id: {}", node_id);
458-
return Ok(*existing_peer_id);
459-
}
460-
Err(e) => {
461-
error!(target: "network", "Failed to create session for node id: {}", node_id);
462-
return Err(e);
463-
}
464-
}
430+
let existing_session_mutex_o = sessions.get(existing_peer_id).clone();
431+
432+
if let Some(existing_session_mutex) = existing_session_mutex_o {
433+
let session_expired = {
434+
let session = existing_session_mutex.lock();
435+
if let Some(id_from_session) = &session.info.id {
436+
if session.info.id == Some(*node_id) {
437+
// we got already got a session for the specified node.
438+
// maybe the old session is already scheduled for getting deleted.
439+
session.expired()
465440
} else {
466-
// this might happen if 2 nodes try to connect to each other at the same time.
467-
debug!(target: "network", "Session already exists for node id: {}", node_id);
468-
return Err(ErrorKind::AlreadyExists.into());
441+
error!(target: "network", "host cache inconsistency: Session node id missmatch. expected: {} is {}.", existing_peer_id, id_from_session);
442+
return Err(ErrorKind::HostCacheInconsistency.into());
469443
}
470444
} else {
471-
error!(target: "network", "host cache inconsistency: Session node id missmatch. expected: {} is {}.", existing_peer_id, id_from_session);
445+
error!(target: "network", "host cache inconsistency: Session has no Node_id defined where it should for {}", existing_peer_id);
472446
return Err(ErrorKind::HostCacheInconsistency.into());
473447
}
448+
}; // session guard is dropped here
449+
450+
if session_expired {
451+
// if the session is expired, we will just create n new session for this node.
452+
let new_session =
453+
Session::new(io, socket, existing_peer_id.clone(), id, nonce, host);
454+
match new_session {
455+
Ok(session) => {
456+
let new_session_arc = Arc::new(Mutex::new(session));
457+
let old_session_o =
458+
sessions.insert(*existing_peer_id, new_session_arc);
459+
460+
match old_session_o {
461+
Some(old) => {
462+
// we remember the expired session, so it can get closed and cleaned up in a nice way later.
463+
expired_session.push(old);
464+
},
465+
None => {
466+
// we have a cache missmatch.
467+
// but the only thing is missing is a clean ending of the old session.
468+
// nothing mission critical.
469+
error!(target: "network", "host cache inconsistency: Session for node id {} was not found in sessions map, but it should be there.", node_id);
470+
},
471+
}
472+
473+
// in this context, the stream might already be unregistered ?!
474+
// we can just register the stream again.
475+
if let Err(err) = io.register_stream(*existing_peer_id) {
476+
// todo: research this topic, watch out for this message,
477+
// maybe we can keep track of stream registrations as well somehow.
478+
debug!(target: "network", "Failed to register stream for token: {} : {}", existing_peer_id, err);
479+
}
480+
481+
debug!(target: "network", "Created new session for node id: {}", node_id);
482+
return Ok(*existing_peer_id);
483+
}
484+
Err(e) => {
485+
error!(target: "network", "Failed to create session for node id: {}", node_id);
486+
return Err(e);
487+
}
488+
}
474489
} else {
475-
error!(target: "network", "host cache inconsistency: Session has no Node_id defined where it should for {}", existing_peer_id);
476-
return Err(ErrorKind::HostCacheInconsistency.into());
490+
// this might happen if 2 nodes try to connect to each other at the same time.
491+
debug!(target: "network", "Session already exists for node id: {}", node_id);
492+
return Err(ErrorKind::AlreadyExists.into());
477493
}
478494
} else {
479495
// we have a node id, but there is no session for it (anymore)
@@ -524,6 +540,15 @@ impl SessionContainer {
524540
// if we dont know a NodeID
525541
// debug!(target: "network", "Session create error: {:?}", e);
526542
}
543+
544+
fn get_session_for(&self, id: &NodeId) -> Option<SharedSession> {
545+
546+
self.node_id_to_session.lock().get(id).map_or(None, |peer_id| {
547+
let sessions = self.sessions.read();
548+
sessions.get(peer_id).cloned()
549+
})
550+
551+
}
527552
}
528553

529554
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
@@ -822,11 +847,8 @@ impl Host {
822847
}
823848

824849
fn have_session(&self, id: &NodeId) -> bool {
825-
self.sessions
826-
.sessions
827-
.read()
828-
.iter()
829-
.any(|e| e.1.lock().info.id == Some(*id))
850+
851+
self.sessions.get_session_for(id).is_some()
830852
}
831853

832854
// returns (handshakes, egress, ingress)
@@ -1331,7 +1353,7 @@ impl Host {
13311353
let mut failure_id = None;
13321354
let mut deregister = false;
13331355
let mut expired_session = None;
1334-
if let FIRST_SESSION..=LAST_SESSION = token {
1356+
if token >= FIRST_SESSION {
13351357
let sessions = self.sessions.sessions.read();
13361358
if let Some(session) = sessions.get(&token).cloned() {
13371359
expired_session = Some(session.clone());
@@ -1449,18 +1471,19 @@ impl IoHandler<NetworkIoMessage> for Host {
14491471

14501472
fn stream_hup(&self, io: &IoContext<NetworkIoMessage>, stream: StreamToken) {
14511473
trace!(target: "network", "Hup: {}", stream);
1452-
match stream {
1453-
FIRST_SESSION..=LAST_SESSION => self.connection_closed(stream, io),
1454-
_ => warn!(target: "network", "Unexpected hup"),
1455-
};
1474+
if stream >= FIRST_SESSION {
1475+
self.connection_closed(stream, io)
1476+
} else {
1477+
warn!(target: "network", "Unexpected hup for session {}", stream);
1478+
}
14561479
}
14571480

14581481
fn stream_readable(&self, io: &IoContext<NetworkIoMessage>, stream: StreamToken) {
14591482
if self.stopping.load(AtomicOrdering::SeqCst) {
14601483
return;
14611484
}
14621485
match stream {
1463-
FIRST_SESSION..=LAST_SESSION => self.session_readable(stream, io),
1486+
FIRST_SESSION.. => self.session_readable(stream, io),
14641487
DISCOVERY => self.discovery_readable(io),
14651488
TCP_ACCEPT => self.accept(io),
14661489
_ => panic!("Received unknown readable token"),
@@ -1472,7 +1495,7 @@ impl IoHandler<NetworkIoMessage> for Host {
14721495
return;
14731496
}
14741497
match stream {
1475-
FIRST_SESSION..=LAST_SESSION => self.session_writable(stream, io),
1498+
FIRST_SESSION.. => self.session_writable(stream, io),
14761499
DISCOVERY => self.discovery_writable(io),
14771500
_ => panic!("Received unknown writable token"),
14781501
}
@@ -1484,7 +1507,7 @@ impl IoHandler<NetworkIoMessage> for Host {
14841507
}
14851508
match token {
14861509
IDLE => self.maintain_network(io),
1487-
FIRST_SESSION..=LAST_SESSION => {
1510+
FIRST_SESSION.. => {
14881511
trace!(target: "network", "Timeout from Host impl: {}", token);
14891512
self.connection_timeout(token, io);
14901513
}
@@ -1635,7 +1658,7 @@ impl IoHandler<NetworkIoMessage> for Host {
16351658
event_loop: &mut EventLoop<IoManager<NetworkIoMessage>>,
16361659
) {
16371660
match stream {
1638-
FIRST_SESSION..=LAST_SESSION => {
1661+
FIRST_SESSION.. => {
16391662
let session = { self.sessions.sessions.read().get(&stream).cloned() };
16401663
if let Some(session) = session {
16411664
session
@@ -1670,7 +1693,7 @@ impl IoHandler<NetworkIoMessage> for Host {
16701693
event_loop: &mut EventLoop<IoManager<NetworkIoMessage>>,
16711694
) {
16721695
match stream {
1673-
FIRST_SESSION..=LAST_SESSION => {
1696+
FIRST_SESSION.. => {
16741697
let mut connections = self.sessions.sessions.write();
16751698
if let Some(connection) = connections.get(&stream).cloned() {
16761699
let c = connection.lock();
@@ -1694,15 +1717,6 @@ impl IoHandler<NetworkIoMessage> for Host {
16941717
event_loop: &mut EventLoop<IoManager<NetworkIoMessage>>,
16951718
) {
16961719
match stream {
1697-
FIRST_SESSION..=LAST_SESSION => {
1698-
let connection = { self.sessions.sessions.read().get(&&stream).cloned() };
1699-
if let Some(connection) = connection {
1700-
connection
1701-
.lock()
1702-
.update_socket(reg, event_loop)
1703-
.expect("Error updating socket");
1704-
}
1705-
}
17061720
DISCOVERY => match (
17071721
self.udp_socket.lock().as_ref(),
17081722
self.discovery.lock().as_ref(),
@@ -1727,7 +1741,15 @@ impl IoHandler<NetworkIoMessage> for Host {
17271741
PollOpt::edge(),
17281742
)
17291743
.expect("Error reregistering stream"),
1730-
_ => warn!("Unexpected stream update"),
1744+
_ => {
1745+
let connection = { self.sessions.sessions.read().get(&&stream).cloned() };
1746+
if let Some(connection) = connection {
1747+
connection
1748+
.lock()
1749+
.update_socket(reg, event_loop)
1750+
.expect("Error updating socket");
1751+
}
1752+
},
17311753
}
17321754
}
17331755
}

0 commit comments

Comments
 (0)