Skip to content

Commit 0cb882f

Browse files
committed
WIP: changing to BTree instead of Slab for session management.
1 parent ee56b0a commit 0cb882f

File tree

1 file changed

+23
-21
lines changed
  • crates/net/network-devp2p/src

1 file changed

+23
-21
lines changed

crates/net/network-devp2p/src/host.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,15 @@ use hash::keccak;
2121
use rlp::{Encodable, RlpStream};
2222
use std::{
2323
cmp::{max, min},
24-
collections::{HashMap, HashSet},
24+
collections::{BTreeSet, HashMap, HashSet},
2525
fs,
2626
io::{self, Read, Write},
2727
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
2828
ops::*,
2929
path::{Path, PathBuf},
3030
str::FromStr,
3131
sync::{
32-
Arc,
33-
atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering},
32+
atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering}, Arc
3433
},
3534
time::Duration,
3635
};
@@ -108,7 +107,7 @@ impl Encodable for CapabilityInfo {
108107
pub struct NetworkContext<'s> {
109108
io: &'s IoContext<NetworkIoMessage>,
110109
protocol: ProtocolId,
111-
sessions: Arc<RwLock<Slab<SharedSession>>>,
110+
sessions: Arc<RwLock<std::collections::BTreeSet<usize, SharedSession>>>,
112111
session: Option<SharedSession>,
113112
session_id: Option<StreamToken>,
114113
reserved_peers: &'s HashSet<NodeId>,
@@ -161,7 +160,7 @@ impl<'s> NetworkContext<'s> {
161160
io: &'s IoContext<NetworkIoMessage>,
162161
protocol: ProtocolId,
163162
session: Option<SharedSession>,
164-
sessions: Arc<RwLock<Slab<SharedSession>>>,
163+
sessions: Arc<RwLock<std::collections::BTreeMap<usize, SharedSession>>>,
165164
reserved_peers: &'s HashSet<NodeId>,
166165
statistics: &'s NetworkingStatistics,
167166
) -> NetworkContext<'s> {
@@ -180,7 +179,7 @@ impl<'s> NetworkContext<'s> {
180179
fn resolve_session(&self, peer: PeerId) -> Option<SharedSession> {
181180
match self.session_id {
182181
Some(id) if id == peer => self.session.clone(),
183-
_ => self.sessions.read().get(peer).cloned(),
182+
_ => self.sessions.read().get(&peer).cloned(),
184183
}
185184
}
186185
}
@@ -366,7 +365,7 @@ pub struct Host {
366365
pub info: RwLock<HostInfo>,
367366
udp_socket: Mutex<Option<UdpSocket>>,
368367
tcp_listener: Mutex<TcpListener>,
369-
sessions: Arc<RwLock<Slab<SharedSession>>>,
368+
sessions: Arc<RwLock<std::collections::BTreeMap<usize, SharedSession>>>,
370369
discovery: Mutex<Option<Discovery<'static>>>,
371370
nodes: RwLock<NodeTable>,
372371
handlers: RwLock<HashMap<ProtocolId, Arc<dyn NetworkProtocolHandler + Sync>>>,
@@ -575,7 +574,7 @@ impl Host {
575574

576575
let mut peers = Vec::with_capacity(sessions.count());
577576
for i in (0..MAX_SESSIONS).map(|x| x + FIRST_SESSION) {
578-
if sessions.get(i).is_some() {
577+
if sessions.get(&i).is_some() {
579578
peers.push(i);
580579
}
581580
}
@@ -703,13 +702,13 @@ impl Host {
703702
self.sessions
704703
.read()
705704
.iter()
706-
.any(|e| e.lock().id() == Some(id))
705+
.any(|e| e.1.lock().id() == Some(id))
707706
}
708707

709708
fn keep_alive(&self, io: &IoContext<NetworkIoMessage>) {
710709
let mut to_kill = Vec::new();
711710
for e in self.sessions.read().iter() {
712-
let mut s = e.lock();
711+
let mut s = e.1.lock();
713712
if !s.keep_alive(io) {
714713
s.disconnect(io, DisconnectReason::PingTimeout);
715714
to_kill.push(s.token());
@@ -847,6 +846,9 @@ impl Host {
847846
let nonce = self.info.write().next_nonce();
848847
let mut sessions = self.sessions.write();
849848

849+
// we can add now the new session.
850+
// if no NodeID is provided, we use the smalles used number.
851+
850852
let token = sessions.insert_with_opt(|token| {
851853
trace!(target: "network", "{}: Initiating session {:?}", token, id);
852854
match Session::new(io, socket, token, id, &nonce, &self.info.read()) {
@@ -886,7 +888,7 @@ impl Host {
886888
}
887889

888890
fn session_writable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
889-
let session = { self.sessions.read().get(token).cloned() };
891+
let session = { self.sessions.read().get(&token).cloned() };
890892

891893
if let Some(session) = session {
892894
let mut s = session.lock();
@@ -909,7 +911,7 @@ impl Host {
909911
let mut ready_data: Vec<ProtocolId> = Vec::new();
910912
let mut packet_data: Vec<(ProtocolId, PacketId, Vec<u8>)> = Vec::new();
911913
let mut kill = false;
912-
let session = { self.sessions.read().get(token).cloned() };
914+
let session = { self.sessions.read().get(&token).cloned() };
913915
let mut ready_id = None;
914916
if let Some(session) = session.clone() {
915917
{
@@ -1047,7 +1049,7 @@ impl Host {
10471049
.read()
10481050
.iter()
10491051
.filter_map(|e| {
1050-
let session = e.lock();
1052+
let session = e.1.lock();
10511053
if session.token() != token && session.info.id == ready_id {
10521054
return Some(session.token());
10531055
} else {
@@ -1183,7 +1185,7 @@ impl Host {
11831185
let mut expired_session = None;
11841186
if let FIRST_SESSION..=LAST_SESSION = token {
11851187
let sessions = self.sessions.read();
1186-
if let Some(session) = sessions.get(token).cloned() {
1188+
if let Some(session) = sessions.get(&token).cloned() {
11871189
expired_session = Some(session.clone());
11881190
let mut s = session.lock();
11891191
if !s.expired() {
@@ -1232,7 +1234,7 @@ impl Host {
12321234
{
12331235
let sessions = self.sessions.read();
12341236
for c in sessions.iter() {
1235-
let s = c.lock();
1237+
let s = c.1.lock();
12361238
if let Some(id) = s.id() {
12371239
if node_changes.removed.contains(id) {
12381240
to_remove.push(s.token());
@@ -1447,7 +1449,7 @@ impl IoHandler<NetworkIoMessage> for Host {
14471449
.unwrap_or_else(|e| debug!("Error registering timer {}: {:?}", token, e));
14481450
}
14491451
NetworkIoMessage::Disconnect(ref peer) => {
1450-
let session = { self.sessions.read().get(*peer).cloned() };
1452+
let session = { self.sessions.read().get(&*peer).cloned() };
14511453
if let Some(session) = session {
14521454
session
14531455
.lock()
@@ -1457,7 +1459,7 @@ impl IoHandler<NetworkIoMessage> for Host {
14571459
self.kill_connection(*peer, io, false);
14581460
}
14591461
NetworkIoMessage::DisablePeer(ref peer) => {
1460-
let session = { self.sessions.read().get(*peer).cloned() };
1462+
let session = { self.sessions.read().get(&*peer).cloned() };
14611463
if let Some(session) = session {
14621464
session
14631465
.lock()
@@ -1486,7 +1488,7 @@ impl IoHandler<NetworkIoMessage> for Host {
14861488
) {
14871489
match stream {
14881490
FIRST_SESSION..=LAST_SESSION => {
1489-
let session = { self.sessions.read().get(stream).cloned() };
1491+
let session = { self.sessions.read().get(&stream).cloned() };
14901492
if let Some(session) = session {
14911493
session
14921494
.lock()
@@ -1522,13 +1524,13 @@ impl IoHandler<NetworkIoMessage> for Host {
15221524
match stream {
15231525
FIRST_SESSION..=LAST_SESSION => {
15241526
let mut connections = self.sessions.write();
1525-
if let Some(connection) = connections.get(stream).cloned() {
1527+
if let Some(connection) = connections.get(&stream).cloned() {
15261528
let c = connection.lock();
15271529
if c.expired() {
15281530
// make sure it is the same connection that the event was generated for
15291531
c.deregister_socket(event_loop)
15301532
.expect("Error deregistering socket");
1531-
connections.remove(stream);
1533+
connections.remove(&stream);
15321534
}
15331535
}
15341536
}
@@ -1545,7 +1547,7 @@ impl IoHandler<NetworkIoMessage> for Host {
15451547
) {
15461548
match stream {
15471549
FIRST_SESSION..=LAST_SESSION => {
1548-
let connection = { self.sessions.read().get(stream).cloned() };
1550+
let connection = { self.sessions.read().get(&&stream).cloned() };
15491551
if let Some(connection) = connection {
15501552
connection
15511553
.lock()

0 commit comments

Comments
 (0)