Skip to content

Commit 5819d05

Browse files
committed
WIP: new session management.
1 parent 97ac7b1 commit 5819d05

File tree

1 file changed

+91
-46
lines changed
  • crates/net/network-devp2p/src

1 file changed

+91
-46
lines changed

crates/net/network-devp2p/src/host.rs

Lines changed: 91 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@ use crypto::publickey::{Generator, KeyPair, Random, Secret};
1919
use ethereum_types::H256;
2020
use hash::keccak;
2121
use rlp::{Encodable, RlpStream};
22+
use slab::Index;
2223
use std::{
23-
cmp::{max, min},
24-
collections::{BTreeSet, HashMap, HashSet},
24+
cmp::{max, min, Ordering},
25+
collections::{BTreeMap, HashMap, HashSet},
2526
fs,
2627
io::{self, Read, Write},
2728
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
2829
ops::*,
2930
path::{Path, PathBuf},
3031
str::FromStr,
3132
sync::{
32-
atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering}, Arc
33+
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering as AtomicOrdering}, Arc
3334
},
3435
time::Duration,
3536
};
@@ -49,7 +50,7 @@ use network::{
4950
client_version::ClientVersion,
5051
};
5152
use parity_path::restrict_permissions_owner;
52-
use parking_lot::{Mutex, RwLock};
53+
use parking_lot::{lock_api::RwLockReadGuard, Mutex, RwLock};
5354
use stats::{PrometheusMetrics, PrometheusRegistry};
5455

5556
type Slab<T> = ::slab::Slab<T, usize>;
@@ -360,14 +361,53 @@ struct ProtocolTimer {
360361
pub token: TimerToken, // Handler level token
361362
}
362363

364+
365+
366+
struct SessionContainer {
367+
sessions: Arc<RwLock<std::collections::BTreeMap<usize, SharedSession>>>,
368+
node_id_to_session: BTreeMap<H256, usize>,
369+
sessions_tokens: Mutex<usize>, // Used to generate new session tokens
370+
}
371+
372+
impl SessionContainer {
373+
374+
pub fn new() -> Self {
375+
SessionContainer {
376+
sessions: Arc::new(RwLock::new(std::collections::BTreeMap::new())),
377+
node_id_to_session: BTreeMap::new(),
378+
sessions_tokens: Mutex::new(0),
379+
}
380+
}
381+
382+
/// Returns a reference to the sessions map.
383+
pub fn sessions(&self) -> &Arc<RwLock<std::collections::BTreeMap<usize, SharedSession>>> {
384+
&self.sessions
385+
}
386+
387+
/// gets the next token ID and store this information
388+
fn next_token_id(&self) -> usize {
389+
let lock = self.sessions_tokens.lock();
390+
391+
let next_id = 0;
392+
// if (lock.as_usize() > usize::MAX - 1) {
393+
394+
// }
395+
//let sessions = self.sessions.read();
396+
397+
398+
todo!("Implement next_token_id logic");
399+
400+
}
401+
}
402+
363403
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
364404
///
365405
/// NOTE: must keep the lock in order of: reserved_nodes (rwlock) -> session (mutex, from sessions)
366406
pub struct Host {
367407
pub info: RwLock<HostInfo>,
368408
udp_socket: Mutex<Option<UdpSocket>>,
369409
tcp_listener: Mutex<TcpListener>,
370-
sessions: Arc<RwLock<std::collections::BTreeMap<usize, SharedSession>>>,
410+
sessions: SessionContainer,
371411
discovery: Mutex<Option<Discovery<'static>>>,
372412
nodes: RwLock<NodeTable>,
373413
handlers: RwLock<HashMap<ProtocolId, Arc<dyn NetworkProtocolHandler + Sync>>>,
@@ -437,10 +477,7 @@ impl Host {
437477
discovery: Mutex::new(None),
438478
udp_socket: Mutex::new(None),
439479
tcp_listener: Mutex::new(tcp_listener),
440-
sessions: Arc::new(RwLock::new(Slab::new_starting_at(
441-
FIRST_SESSION,
442-
MAX_SESSIONS,
443-
))),
480+
sessions: SessionContainer::new(),
444481
nodes: RwLock::new(NodeTable::new(path)),
445482
handlers: RwLock::new(HashMap::new()),
446483
timers: RwLock::new(HashMap::new()),
@@ -515,7 +552,7 @@ impl Host {
515552
// disconnect all non-reserved peers here.
516553
let reserved: HashSet<NodeId> = self.reserved_nodes.read().clone();
517554
let mut to_kill = Vec::new();
518-
for e in self.sessions.read().iter() {
555+
for e in self.sessions.sessions.read().iter() {
519556
let mut s = e.1.lock();
520557
{
521558
let id = s.id();
@@ -557,7 +594,7 @@ impl Host {
557594
pub fn stop(&self, io: &IoContext<NetworkIoMessage>) {
558595
self.stopping.store(true, AtomicOrdering::SeqCst);
559596
let mut to_kill = Vec::new();
560-
for e in self.sessions.read().iter() {
597+
for e in self.sessions.sessions.read().iter() {
561598
let mut s = e.1.lock();
562599
s.disconnect(io, DisconnectReason::ClientQuit);
563600
to_kill.push(s.token());
@@ -571,10 +608,10 @@ impl Host {
571608

572609
/// Get all connected peers.
573610
pub fn connected_peers(&self) -> Vec<PeerId> {
574-
let sessions = self.sessions.read();
611+
let sessions = self.sessions.sessions.read();
575612
let sessions = &*sessions;
576613

577-
let mut peers = Vec::with_capacity(sessions.count());
614+
let mut peers = Vec::with_capacity(sessions.len());
578615
for i in (0..MAX_SESSIONS).map(|x| x + FIRST_SESSION) {
579616
if sessions.get(&i).is_some() {
580617
peers.push(i);
@@ -659,7 +696,7 @@ impl Host {
659696
}
660697

661698
fn have_session(&self, id: &NodeId) -> bool {
662-
self.sessions
699+
self.sessions.sessions
663700
.read()
664701
.iter()
665702
.any(|e| e.1.lock().info.id == Some(*id))
@@ -670,8 +707,8 @@ impl Host {
670707
let mut handshakes = 0;
671708
let mut egress = 0;
672709
let mut ingress = 0;
673-
for s in self.sessions.read().iter() {
674-
match s.try_lock() {
710+
for s in self.sessions.sessions.read().iter() {
711+
match s.1.try_lock() {
675712
Some(ref s) if s.is_ready() && s.info.originated => egress += 1,
676713
Some(ref s) if s.is_ready() && !s.info.originated => ingress += 1,
677714
_ => handshakes += 1,
@@ -686,9 +723,9 @@ impl Host {
686723
let mut egress = 0;
687724
let mut ingress = 0;
688725

689-
if let Some(lock) = self.sessions.try_read_for(lock_duration) {
726+
if let Some(lock) = self.sessions.sessions.try_read_for(lock_duration) {
690727
for s in lock.iter() {
691-
match s.try_lock() {
728+
match s.1.try_lock() {
692729
Some(ref s) if s.is_ready() && s.info.originated => egress += 1,
693730
Some(ref s) if s.is_ready() && !s.info.originated => ingress += 1,
694731
_ => handshakes += 1,
@@ -701,15 +738,16 @@ impl Host {
701738
}
702739

703740
fn connecting_to(&self, id: &NodeId) -> bool {
704-
self.sessions
741+
// todo: we can use the mapping here for faster access.
742+
self.sessions.sessions
705743
.read()
706744
.iter()
707745
.any(|e| e.1.lock().id() == Some(id))
708746
}
709747

710748
fn keep_alive(&self, io: &IoContext<NetworkIoMessage>) {
711749
let mut to_kill = Vec::new();
712-
for e in self.sessions.read().iter() {
750+
for e in self.sessions.sessions.read().iter() {
713751
let mut s = e.1.lock();
714752
if !s.keep_alive(io) {
715753
s.disconnect(io, DisconnectReason::PingTimeout);
@@ -846,21 +884,26 @@ impl Host {
846884
io: &IoContext<NetworkIoMessage>,
847885
) -> Result<(), Error> {
848886
let nonce = self.info.write().next_nonce();
849-
let mut sessions = self.sessions.write();
887+
let mut sessions = self.sessions.sessions.write();
888+
850889

890+
// even a try without success will give a new token.
891+
let token = self.session.next_token_id();
851892
// we can add now the new session.
852893
// if no NodeID is provided, we use the smalles used number.
853894

854-
let token = sessions.insert_with_opt(|token| {
855-
trace!(target: "network", "{}: Initiating session {:?}", token, id);
856-
match Session::new(io, socket, token, id, &nonce, &self.info.read()) {
857-
Ok(s) => Some(Arc::new(Mutex::new(s))),
858-
Err(e) => {
859-
debug!(target: "network", "Session create error: {:?}", e);
860-
None
861-
}
895+
let existing = sessions.get(&token);
896+
897+
898+
899+
trace!(target: "network", "{}: Initiating session {:?}", token, id);
900+
let session = match Session::new(io, socket, token, id, &nonce, &self.info.read()) {
901+
Ok(s) => Some(Arc::new(Mutex::new(s))),
902+
Err(e) => {
903+
debug!(target: "network", "Session create error: {:?}", e);
904+
None
862905
}
863-
});
906+
};
864907

865908
match token {
866909
Some(t) => io.register_stream(t).map(|_| ()).map_err(Into::into),
@@ -890,7 +933,7 @@ impl Host {
890933
}
891934

892935
fn session_writable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
893-
let session = { self.sessions.read().get(&token).cloned() };
936+
let session = { self.sessions.sessions.read().get(&token).cloned() };
894937

895938
if let Some(session) = session {
896939
let mut s = session.lock();
@@ -913,7 +956,7 @@ impl Host {
913956
let mut ready_data: Vec<ProtocolId> = Vec::new();
914957
let mut packet_data: Vec<(ProtocolId, PacketId, Vec<u8>)> = Vec::new();
915958
let mut kill = false;
916-
let session = { self.sessions.read().get(&token).cloned() };
959+
let session = { self.sessions.sessions.read().get(&token).cloned() };
917960
let mut ready_id = None;
918961
if let Some(session) = session.clone() {
919962
{
@@ -1047,6 +1090,7 @@ impl Host {
10471090
let handlers = self.handlers.read();
10481091
if !ready_data.is_empty() {
10491092
let duplicates: Vec<usize> = self
1093+
.sessions
10501094
.sessions
10511095
.read()
10521096
.iter()
@@ -1077,7 +1121,7 @@ impl Host {
10771121
io,
10781122
p,
10791123
Some(session.clone()),
1080-
self.sessions.clone(),
1124+
self.sessions.sessions.clone(),
10811125
&reserved,
10821126
&self.statistics,
10831127
),
@@ -1098,7 +1142,7 @@ impl Host {
10981142
io,
10991143
p,
11001144
Some(session.clone()),
1101-
self.sessions.clone(),
1145+
self.sessions.sessions.clone(),
11021146
&reserved,
11031147
&self.statistics,
11041148
),
@@ -1186,7 +1230,7 @@ impl Host {
11861230
let mut deregister = false;
11871231
let mut expired_session = None;
11881232
if let FIRST_SESSION..=LAST_SESSION = token {
1189-
let sessions = self.sessions.read();
1233+
let sessions = self.sessions.sessions.read();
11901234
if let Some(session) = sessions.get(&token).cloned() {
11911235
expired_session = Some(session.clone());
11921236
let mut s = session.lock();
@@ -1217,7 +1261,7 @@ impl Host {
12171261
io,
12181262
p,
12191263
expired_session.clone(),
1220-
self.sessions.clone(),
1264+
self.sessions.sessions.clone(),
12211265
&reserved,
12221266
&self.statistics,
12231267
),
@@ -1234,7 +1278,7 @@ impl Host {
12341278
fn update_nodes(&self, _io: &IoContext<NetworkIoMessage>, node_changes: TableUpdates) {
12351279
let mut to_remove: Vec<PeerId> = Vec::new();
12361280
{
1237-
let sessions = self.sessions.read();
1281+
let sessions = self.sessions.sessions.read();
12381282
for c in sessions.iter() {
12391283
let s = c.1.lock();
12401284
if let Some(id) = s.id() {
@@ -1261,7 +1305,7 @@ impl Host {
12611305
io,
12621306
protocol,
12631307
None,
1264-
self.sessions.clone(),
1308+
self.sessions.sessions.clone(),
12651309
&reserved,
12661310
&self.statistics,
12671311
);
@@ -1283,12 +1327,13 @@ impl Host {
12831327
io,
12841328
protocol,
12851329
None,
1286-
self.sessions.clone(),
1330+
self.sessions.sessions.clone(),
12871331
&reserved,
12881332
&self.statistics,
12891333
);
12901334
action(&context)
12911335
}
1336+
12921337
}
12931338

12941339
impl IoHandler<NetworkIoMessage> for Host {
@@ -1383,7 +1428,7 @@ impl IoHandler<NetworkIoMessage> for Host {
13831428
io,
13841429
timer.protocol,
13851430
None,
1386-
self.sessions.clone(),
1431+
self.sessions.sessions.clone(),
13871432
&reserved,
13881433
&self.statistics,
13891434
),
@@ -1414,7 +1459,7 @@ impl IoHandler<NetworkIoMessage> for Host {
14141459
io,
14151460
*protocol,
14161461
None,
1417-
self.sessions.clone(),
1462+
self.sessions.sessions.clone(),
14181463
&reserved,
14191464
&self.statistics,
14201465
));
@@ -1451,7 +1496,7 @@ impl IoHandler<NetworkIoMessage> for Host {
14511496
.unwrap_or_else(|e| debug!("Error registering timer {}: {:?}", token, e));
14521497
}
14531498
NetworkIoMessage::Disconnect(ref peer) => {
1454-
let session = { self.sessions.read().get(&*peer).cloned() };
1499+
let session = { self.sessions.sessions.read().get(&*peer).cloned() };
14551500
if let Some(session) = session {
14561501
session
14571502
.lock()
@@ -1461,7 +1506,7 @@ impl IoHandler<NetworkIoMessage> for Host {
14611506
self.kill_connection(*peer, io, false);
14621507
}
14631508
NetworkIoMessage::DisablePeer(ref peer) => {
1464-
let session = { self.sessions.read().get(&*peer).cloned() };
1509+
let session = { self.sessions.sessions.read().get(&*peer).cloned() };
14651510
if let Some(session) = session {
14661511
session
14671512
.lock()
@@ -1490,7 +1535,7 @@ impl IoHandler<NetworkIoMessage> for Host {
14901535
) {
14911536
match stream {
14921537
FIRST_SESSION..=LAST_SESSION => {
1493-
let session = { self.sessions.read().get(&stream).cloned() };
1538+
let session = { self.sessions.sessions.read().get(&stream).cloned() };
14941539
if let Some(session) = session {
14951540
session
14961541
.lock()
@@ -1525,7 +1570,7 @@ impl IoHandler<NetworkIoMessage> for Host {
15251570
) {
15261571
match stream {
15271572
FIRST_SESSION..=LAST_SESSION => {
1528-
let mut connections = self.sessions.write();
1573+
let mut connections = self.sessions.sessions.write();
15291574
if let Some(connection) = connections.get(&stream).cloned() {
15301575
let c = connection.lock();
15311576
if c.expired() {
@@ -1549,7 +1594,7 @@ impl IoHandler<NetworkIoMessage> for Host {
15491594
) {
15501595
match stream {
15511596
FIRST_SESSION..=LAST_SESSION => {
1552-
let connection = { self.sessions.read().get(&&stream).cloned() };
1597+
let connection = { self.sessions.sessions.read().get(&&stream).cloned() };
15531598
if let Some(connection) = connection {
15541599
connection
15551600
.lock()

0 commit comments

Comments
 (0)