Skip to content

Commit 792e99f

Browse files
committed
transport-manager: remove disconnected peers
1 parent 991aa12 commit 792e99f

File tree

4 files changed

+130
-22
lines changed

4 files changed

+130
-22
lines changed

src/transport/manager/handle.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,10 @@ mod tests {
725725
peers.insert(
726726
peer,
727727
PeerContext {
728-
state: PeerState::Disconnected { dial_record: None },
728+
state: PeerState::Disconnected {
729+
dial_record: None,
730+
disconnected_at: std::time::Instant::now()
731+
},
729732
addresses: AddressStore::new(),
730733
},
731734
);
@@ -759,6 +762,7 @@ mod tests {
759762
.with(Protocol::P2p(Multihash::from(peer))),
760763
ConnectionId::from(0),
761764
)),
765+
disconnected_at: std::time::Instant::now()
762766
},
763767

764768
addresses: AddressStore::from_iter(

src/transport/manager/mod.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ pub(crate) mod handle;
7373
/// Logging target for the file.
7474
const LOG_TARGET: &str = "litep2p::transport-manager";
7575

76+
/// Interval duration after which we remove the peer entry from the transport manager.
77+
const INTERVAL_DURATION: Duration = Duration::from_secs(60 * 60); // 60 minutes
78+
7679
/// The connection established result.
7780
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
7881
enum ConnectionEstablishedResult {
@@ -1026,7 +1029,10 @@ impl TransportManager {
10261029
?err,
10271030
"failed to negotiate connection",
10281031
);
1029-
context.state = PeerState::Disconnected { dial_record: None };
1032+
context.state = PeerState::Disconnected {
1033+
dial_record: None,
1034+
disconnected_at: std::time::Instant::now()
1035+
};
10301036
Err(Error::InvalidState)
10311037
}
10321038
}
@@ -1087,10 +1093,43 @@ impl TransportManager {
10871093
Ok(None)
10881094
}
10891095

1096+
fn cleanup_disconnected_peers(&mut self) {
1097+
let mut peers = self.peers.write();
1098+
let cutoff = std::time::Instant::now() - INTERVAL_DURATION;
1099+
1100+
let before_count = peers.len();
1101+
1102+
peers.retain(|_peer_id, peer_context| {
1103+
match &peer_context.state {
1104+
// keep if in `Disconnected` state that is more recent than the cutoff
1105+
PeerState::Disconnected { disconnected_at, .. } => {
1106+
*disconnected_at > cutoff
1107+
}
1108+
// keep all rest of states
1109+
_ => true,
1110+
}
1111+
});
1112+
1113+
let removed = before_count - peers.len();
1114+
if removed > 0 {
1115+
tracing::debug!(
1116+
target: LOG_TARGET,
1117+
removed,
1118+
"Cleaned up disconnected peers"
1119+
);
1120+
}
1121+
}
1122+
10901123
/// Poll next event from [`crate::transport::manager::TransportManager`].
10911124
pub async fn next(&mut self) -> Option<TransportEvent> {
1125+
let mut cleanup_interval = tokio::time::interval(INTERVAL_DURATION);
1126+
10921127
loop {
10931128
tokio::select! {
1129+
_ = cleanup_interval.tick() => {
1130+
self.cleanup_disconnected_peers();
1131+
}
1132+
10941133
event = self.event_rx.recv() => {
10951134
let Some(event) = event else {
10961135
tracing::error!(
@@ -1895,7 +1934,10 @@ mod tests {
18951934
manager.peers.write().insert(
18961935
peer,
18971936
PeerContext {
1898-
state: PeerState::Disconnected { dial_record: None },
1937+
state: PeerState::Disconnected {
1938+
dial_record: None,
1939+
disconnected_at: std::time::Instant::now()
1940+
},
18991941
addresses: AddressStore::new(),
19001942
},
19011943
);

src/transport/manager/peer_state.rs

Lines changed: 77 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ pub enum PeerState {
127127
/// [`crate::transport::manager::TransportManager`] must be prepared to handle the dial
128128
/// failure even after the connection has been closed.
129129
dial_record: Option<ConnectionRecord>,
130+
// timestamp when the peer was disconnected.
131+
disconnected_at: std::time::Instant,
130132
},
131133
}
132134

@@ -161,9 +163,10 @@ impl PeerState {
161163
| Self::Opening { .. }
162164
| Self::Disconnected {
163165
dial_record: Some(_),
166+
..
164167
} => StateDialResult::DialingInProgress,
165168

166-
Self::Disconnected { dial_record: None } => StateDialResult::Ok,
169+
Self::Disconnected { dial_record: None, .. } => StateDialResult::Ok,
167170
}
168171
}
169172

@@ -212,7 +215,10 @@ impl PeerState {
212215
// Clear the dial record if the connection ID matches.
213216
Self::Dialing { dial_record } =>
214217
if dial_record.connection_id == connection_id {
215-
*self = Self::Disconnected { dial_record: None };
218+
*self = Self::Disconnected {
219+
dial_record: None,
220+
disconnected_at: std::time::Instant::now()
221+
};
216222
return true;
217223
},
218224

@@ -230,9 +236,13 @@ impl PeerState {
230236

231237
Self::Disconnected {
232238
dial_record: Some(dial_record),
239+
..
233240
} =>
234241
if dial_record.connection_id == connection_id {
235-
*self = Self::Disconnected { dial_record: None };
242+
*self = Self::Disconnected {
243+
dial_record: None,
244+
disconnected_at: std::time::Instant::now()
245+
};
236246
return true;
237247
},
238248

@@ -277,6 +287,7 @@ impl PeerState {
277287
Self::Dialing { dial_record }
278288
| Self::Disconnected {
279289
dial_record: Some(dial_record),
290+
..
280291
} =>
281292
if dial_record.connection_id == connection.connection_id {
282293
*self = Self::Connected {
@@ -292,7 +303,10 @@ impl PeerState {
292303
return true;
293304
},
294305

295-
Self::Disconnected { dial_record: None } => {
306+
Self::Disconnected {
307+
dial_record: None,
308+
..
309+
} => {
296310
*self = Self::Connected {
297311
record: connection,
298312
secondary: None,
@@ -347,12 +361,16 @@ impl PeerState {
347361
Some(SecondaryOrDialing::Dialing(dial_record)) => {
348362
*self = Self::Disconnected {
349363
dial_record: Some(dial_record.clone()),
364+
disconnected_at: std::time::Instant::now()
350365
};
351366

352367
return true;
353368
}
354369
None => {
355-
*self = Self::Disconnected { dial_record: None };
370+
*self = Self::Disconnected {
371+
dial_record: None,
372+
disconnected_at: std::time::Instant::now(),
373+
};
356374

357375
return true;
358376
}
@@ -387,7 +405,10 @@ impl PeerState {
387405
transports.remove(&transport);
388406

389407
if transports.is_empty() {
390-
*self = Self::Disconnected { dial_record: None };
408+
*self = Self::Disconnected {
409+
dial_record: None,
410+
disconnected_at: std::time::Instant::now()
411+
};
391412
return true;
392413
}
393414

@@ -494,7 +515,10 @@ mod tests {
494515

495516
#[test]
496517
fn state_can_dial() {
497-
let state = PeerState::Disconnected { dial_record: None };
518+
let state = PeerState::Disconnected {
519+
dial_record: None,
520+
disconnected_at: std::time::Instant::now()
521+
};
498522
assert_eq!(state.can_dial(), StateDialResult::Ok);
499523

500524
let record = ConnectionRecord::new(
@@ -505,6 +529,7 @@ mod tests {
505529

506530
let state = PeerState::Disconnected {
507531
dial_record: Some(record.clone()),
532+
disconnected_at: std::time::Instant::now(),
508533
};
509534
assert_eq!(state.can_dial(), StateDialResult::DialingInProgress);
510535

@@ -535,7 +560,10 @@ mod tests {
535560
ConnectionId::from(0),
536561
);
537562

538-
let mut state = PeerState::Disconnected { dial_record: None };
563+
let mut state = PeerState::Disconnected {
564+
dial_record: None,
565+
disconnected_at: std::time::Instant::now()
566+
};
539567
assert_eq!(
540568
state.dial_single_address(record.clone()),
541569
StateDialResult::Ok
@@ -550,7 +578,10 @@ mod tests {
550578

551579
#[test]
552580
fn state_dial_addresses() {
553-
let mut state = PeerState::Disconnected { dial_record: None };
581+
let mut state = PeerState::Disconnected {
582+
dial_record: None,
583+
disconnected_at: std::time::Instant::now()
584+
};
554585
assert_eq!(
555586
state.dial_addresses(
556587
ConnectionId::from(0),
@@ -589,7 +620,10 @@ mod tests {
589620

590621
// Check with the same connection ID.
591622
state.on_dial_failure(ConnectionId::from(0));
592-
assert_eq!(state, PeerState::Disconnected { dial_record: None });
623+
assert_eq!(state, PeerState::Disconnected {
624+
dial_record: None,
625+
disconnected_at: std::time::Instant::now()
626+
});
593627
}
594628

595629
// Check from the connected state without dialing state.
@@ -636,6 +670,7 @@ mod tests {
636670
{
637671
let mut state = PeerState::Disconnected {
638672
dial_record: Some(record.clone()),
673+
disconnected_at: std::time::Instant::now()
639674
};
640675
let previous_state = state.clone();
641676
// Check with different connection ID.
@@ -644,7 +679,10 @@ mod tests {
644679

645680
// Check with the same connection ID.
646681
state.on_dial_failure(ConnectionId::from(0));
647-
assert_eq!(state, PeerState::Disconnected { dial_record: None });
682+
assert_eq!(state, PeerState::Disconnected {
683+
dial_record: None,
684+
disconnected_at: std::time::Instant::now()
685+
});
648686
}
649687
}
650688

@@ -726,6 +764,7 @@ mod tests {
726764
{
727765
let mut state = PeerState::Disconnected {
728766
dial_record: Some(record.clone()),
767+
disconnected_at: std::time::Instant::now()
729768
};
730769
assert!(state.on_connection_established(record.clone()));
731770
assert_eq!(
@@ -741,6 +780,7 @@ mod tests {
741780
{
742781
let mut state = PeerState::Disconnected {
743782
dial_record: Some(record.clone()),
783+
disconnected_at: std::time::Instant::now(),
744784
};
745785
assert!(state.on_connection_established(second_record.clone()));
746786
assert_eq!(
@@ -754,7 +794,10 @@ mod tests {
754794

755795
// Disconnected without dial record.
756796
{
757-
let mut state = PeerState::Disconnected { dial_record: None };
797+
let mut state = PeerState::Disconnected {
798+
dial_record: None,
799+
disconnected_at: std::time::Instant::now()
800+
};
758801
assert!(state.on_connection_established(record.clone()));
759802
assert_eq!(
760803
state,
@@ -816,7 +859,10 @@ mod tests {
816859
secondary: None,
817860
};
818861
assert!(state.on_connection_closed(ConnectionId::from(0)));
819-
assert_eq!(state, PeerState::Disconnected { dial_record: None });
862+
assert_eq!(state, PeerState::Disconnected {
863+
dial_record: None,
864+
disconnected_at: std::time::Instant::now(),
865+
});
820866
}
821867

822868
// Primary is closed with secondary promoted
@@ -846,7 +892,8 @@ mod tests {
846892
assert_eq!(
847893
state,
848894
PeerState::Disconnected {
849-
dial_record: Some(second_record.clone())
895+
dial_record: Some(second_record.clone()),
896+
disconnected_at: std::time::Instant::now()
850897
}
851898
);
852899
}
@@ -862,7 +909,10 @@ mod tests {
862909

863910
// This is the last protocol
864911
assert!(state.on_open_failure(SupportedTransport::Tcp));
865-
assert_eq!(state, PeerState::Disconnected { dial_record: None });
912+
assert_eq!(state, PeerState::Disconnected {
913+
dial_record: None,
914+
disconnected_at: std::time::Instant::now()
915+
});
866916
}
867917

868918
#[test]
@@ -890,7 +940,10 @@ mod tests {
890940
ConnectionId::from(0),
891941
);
892942

893-
let mut state = PeerState::Disconnected { dial_record: None };
943+
let mut state = PeerState::Disconnected {
944+
dial_record: None,
945+
disconnected_at: std::time::Instant::now()
946+
};
894947
// Dialing.
895948
assert_eq!(
896949
state.dial_single_address(record.clone()),
@@ -905,7 +958,10 @@ mod tests {
905958

906959
// Dialing failed.
907960
state.on_dial_failure(ConnectionId::from(0));
908-
assert_eq!(state, PeerState::Disconnected { dial_record: None });
961+
assert_eq!(state, PeerState::Disconnected {
962+
dial_record: None,
963+
disconnected_at: std::time::Instant::now()
964+
});
909965

910966
// Opening.
911967
assert_eq!(
@@ -919,7 +975,10 @@ mod tests {
919975

920976
// Open failure.
921977
assert!(state.on_open_failure(SupportedTransport::Tcp));
922-
assert_eq!(state, PeerState::Disconnected { dial_record: None });
978+
assert_eq!(state, PeerState::Disconnected {
979+
dial_record: None,
980+
disconnected_at: std::time::Instant::now()
981+
});
923982

924983
// Dial again.
925984
assert_eq!(

src/transport/manager/types.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ pub struct PeerContext {
5252
impl Default for PeerContext {
5353
fn default() -> Self {
5454
Self {
55-
state: PeerState::Disconnected { dial_record: None },
55+
state: PeerState::Disconnected {
56+
dial_record: None,
57+
disconnected_at: std::time::Instant::now()
58+
},
5659
addresses: AddressStore::new(),
5760
}
5861
}

0 commit comments

Comments
 (0)