Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit 7a02ef7

Browse files
lijunwangsyihau
authored andcommitted
Backport connection stream counter to v1.17 (#991)
* Backport ConnectionStreamCounter * Addressed a feedback from Pankaj
1 parent 3a66644 commit 7a02ef7

File tree

3 files changed

+109
-41
lines changed

3 files changed

+109
-41
lines changed

streamer/src/nonblocking/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod quic;
22
pub mod recvmmsg;
33
pub mod sendmmsg;
4+
mod stream_throttle;

streamer/src/nonblocking/quic.rs

Lines changed: 61 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use {
2+
super::stream_throttle::ConnectionStreamCounter,
23
crate::{
34
quic::{configure_server, QuicServerError, StreamStats},
45
streamer::StakedNodes,
@@ -49,7 +50,7 @@ use {
4950
// introduce any other awaits while holding the RwLock.
5051
sync::{Mutex, MutexGuard},
5152
task::JoinHandle,
52-
time::timeout,
53+
time::{sleep, timeout},
5354
},
5455
};
5556

@@ -72,7 +73,6 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre
7273

7374
const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
7475
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";
75-
const STREAM_STOP_CODE_THROTTLING: u32 = 15;
7676

7777
/// Limit to 500K PPS
7878
pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 500;
@@ -360,14 +360,16 @@ fn handle_and_cache_new_connection(
360360
remote_addr,
361361
);
362362

363-
if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection(
364-
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
365-
remote_addr.port(),
366-
Some(connection.clone()),
367-
params.stake,
368-
timing::timestamp(),
369-
params.max_connections_per_peer,
370-
) {
363+
if let Some((last_update, stream_exit, stream_counter)) = connection_table_l
364+
.try_add_connection(
365+
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
366+
remote_addr.port(),
367+
Some(connection.clone()),
368+
params.stake,
369+
timing::timestamp(),
370+
params.max_connections_per_peer,
371+
)
372+
{
371373
let peer_type = connection_table_l.peer_type;
372374
drop(connection_table_l);
373375

@@ -387,6 +389,7 @@ fn handle_and_cache_new_connection(
387389
wait_for_chunk_timeout,
388390
max_unstaked_connections,
389391
max_streams_per_ms,
392+
stream_counter,
390393
));
391394
Ok(())
392395
} else {
@@ -770,15 +773,6 @@ fn max_streams_for_connection_in_100ms(
770773
}
771774
}
772775

773-
fn reset_throttling_params_if_needed(last_instant: &mut tokio::time::Instant) -> bool {
774-
if tokio::time::Instant::now().duration_since(*last_instant) > STREAM_THROTTLING_INTERVAL {
775-
*last_instant = tokio::time::Instant::now();
776-
true
777-
} else {
778-
false
779-
}
780-
}
781-
782776
#[allow(clippy::too_many_arguments)]
783777
async fn handle_connection(
784778
connection: Connection,
@@ -791,6 +785,7 @@ async fn handle_connection(
791785
wait_for_chunk_timeout: Duration,
792786
max_unstaked_connections: usize,
793787
max_streams_per_ms: u64,
788+
stream_counter: Arc<ConnectionStreamCounter>,
794789
) {
795790
let stats = params.stats;
796791
debug!(
@@ -801,41 +796,54 @@ async fn handle_connection(
801796
);
802797
let stable_id = connection.stable_id();
803798
stats.total_connections.fetch_add(1, Ordering::Relaxed);
804-
let max_streams_per_100ms = max_streams_for_connection_in_100ms(
799+
let max_streams_per_throttling_interval = max_streams_for_connection_in_100ms(
805800
peer_type,
806801
params.stake,
807802
params.total_stake,
808803
max_unstaked_connections,
809804
max_streams_per_ms,
810805
);
811-
let mut last_throttling_instant = tokio::time::Instant::now();
812-
let mut streams_in_current_interval = 0;
806+
813807
while !stream_exit.load(Ordering::Relaxed) {
814808
if let Ok(stream) =
815809
tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await
816810
{
817811
match stream {
818812
Ok(mut stream) => {
819-
if reset_throttling_params_if_needed(&mut last_throttling_instant) {
820-
streams_in_current_interval = 0;
821-
} else if streams_in_current_interval >= max_streams_per_100ms {
822-
stats.throttled_streams.fetch_add(1, Ordering::Relaxed);
823-
match peer_type {
824-
ConnectionPeerType::Unstaked => {
825-
stats
826-
.throttled_unstaked_streams
827-
.fetch_add(1, Ordering::Relaxed);
828-
}
829-
ConnectionPeerType::Staked => {
830-
stats
831-
.throttled_staked_streams
832-
.fetch_add(1, Ordering::Relaxed);
813+
let throttle_interval_start: tokio::time::Instant =
814+
stream_counter.reset_throttling_params_if_needed();
815+
let streams_read_in_throttle_interval =
816+
stream_counter.stream_count.load(Ordering::Relaxed);
817+
818+
if streams_read_in_throttle_interval >= max_streams_per_throttling_interval {
819+
// The peer is sending faster than we're willing to read. Sleep for what's
820+
// left of this read interval so the peer backs off.
821+
let throttle_duration = STREAM_THROTTLING_INTERVAL
822+
.saturating_sub(throttle_interval_start.elapsed());
823+
if !throttle_duration.is_zero() {
824+
debug!("Throttling stream from {remote_addr:?}, peer type: {:?}, total stake: {}, \
825+
max_streams_per_interval: {max_streams_per_throttling_interval}, read_interval_streams: {streams_read_in_throttle_interval} \
826+
throttle_duration: {throttle_duration:?}",
827+
peer_type, params.total_stake);
828+
stats.throttled_streams.fetch_add(1, Ordering::Relaxed);
829+
830+
match peer_type {
831+
ConnectionPeerType::Unstaked => {
832+
stats
833+
.throttled_unstaked_streams
834+
.fetch_add(1, Ordering::Relaxed);
835+
}
836+
ConnectionPeerType::Staked => {
837+
stats
838+
.throttled_staked_streams
839+
.fetch_add(1, Ordering::Relaxed);
840+
}
833841
}
842+
sleep(throttle_duration).await;
834843
}
835-
let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING));
836-
continue;
837844
}
838-
streams_in_current_interval = streams_in_current_interval.saturating_add(1);
845+
stream_counter.stream_count.fetch_add(1, Ordering::Relaxed);
846+
839847
stats.total_streams.fetch_add(1, Ordering::Relaxed);
840848
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
841849
let stream_exit = stream_exit.clone();
@@ -1041,6 +1049,7 @@ struct ConnectionEntry {
10411049
last_update: Arc<AtomicU64>,
10421050
port: u16,
10431051
connection: Option<Connection>,
1052+
stream_counter: Arc<ConnectionStreamCounter>,
10441053
}
10451054

10461055
impl ConnectionEntry {
@@ -1050,13 +1059,15 @@ impl ConnectionEntry {
10501059
last_update: Arc<AtomicU64>,
10511060
port: u16,
10521061
connection: Option<Connection>,
1062+
stream_counter: Arc<ConnectionStreamCounter>,
10531063
) -> Self {
10541064
Self {
10551065
exit,
10561066
stake,
10571067
last_update,
10581068
port,
10591069
connection,
1070+
stream_counter,
10601071
}
10611072
}
10621073

@@ -1167,7 +1178,11 @@ impl ConnectionTable {
11671178
stake: u64,
11681179
last_update: u64,
11691180
max_connections_per_peer: usize,
1170-
) -> Option<(Arc<AtomicU64>, Arc<AtomicBool>)> {
1181+
) -> Option<(
1182+
Arc<AtomicU64>,
1183+
Arc<AtomicBool>,
1184+
Arc<ConnectionStreamCounter>,
1185+
)> {
11711186
let connection_entry = self.table.entry(key).or_default();
11721187
let has_connection_capacity = connection_entry
11731188
.len()
@@ -1177,15 +1192,20 @@ impl ConnectionTable {
11771192
if has_connection_capacity {
11781193
let exit = Arc::new(AtomicBool::new(false));
11791194
let last_update = Arc::new(AtomicU64::new(last_update));
1195+
let stream_counter = connection_entry
1196+
.first()
1197+
.map(|entry| entry.stream_counter.clone())
1198+
.unwrap_or(Arc::new(ConnectionStreamCounter::new()));
11801199
connection_entry.push(ConnectionEntry::new(
11811200
exit.clone(),
11821201
stake,
11831202
last_update.clone(),
11841203
port,
11851204
connection,
1205+
stream_counter.clone(),
11861206
));
11871207
self.total_size += 1;
1188-
Some((last_update, exit))
1208+
Some((last_update, exit, stream_counter))
11891209
} else {
11901210
if let Some(connection) = connection {
11911211
connection.close(
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use std::{
2+
sync::{
3+
atomic::{AtomicU64, Ordering},
4+
RwLock,
5+
},
6+
time::Duration,
7+
};
8+
9+
pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100;
10+
pub const STREAM_THROTTLING_INTERVAL: Duration =
11+
Duration::from_millis(STREAM_THROTTLING_INTERVAL_MS);
12+
13+
#[derive(Debug)]
14+
pub(crate) struct ConnectionStreamCounter {
15+
pub(crate) stream_count: AtomicU64,
16+
last_throttling_instant: RwLock<tokio::time::Instant>,
17+
}
18+
19+
impl ConnectionStreamCounter {
20+
pub(crate) fn new() -> Self {
21+
Self {
22+
stream_count: AtomicU64::default(),
23+
last_throttling_instant: RwLock::new(tokio::time::Instant::now()),
24+
}
25+
}
26+
27+
/// Reset the counter and last throttling instant and
28+
/// return last_throttling_instant regardless it is reset or not.
29+
pub(crate) fn reset_throttling_params_if_needed(&self) -> tokio::time::Instant {
30+
let last_throttling_instant = *self.last_throttling_instant.read().unwrap();
31+
if tokio::time::Instant::now().duration_since(last_throttling_instant)
32+
> STREAM_THROTTLING_INTERVAL
33+
{
34+
let mut last_throttling_instant = self.last_throttling_instant.write().unwrap();
35+
// Recheck as some other thread might have done throttling since this thread tried to acquire the write lock.
36+
if tokio::time::Instant::now().duration_since(*last_throttling_instant)
37+
> STREAM_THROTTLING_INTERVAL
38+
{
39+
*last_throttling_instant = tokio::time::Instant::now();
40+
self.stream_count.store(0, Ordering::Relaxed);
41+
}
42+
*last_throttling_instant
43+
} else {
44+
last_throttling_instant
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)