Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 46 additions & 4 deletions streamer/src/nonblocking/swqos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ pub const QUIC_MAX_STAKED_CONCURRENT_STREAMS: usize = 512;

pub const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: usize = 100_000;

/// RTT after which we start BDP scaling
const REFERENCE_RTT_MS: u64 = 50;

/// Above this RTT we stop scaling for BDP
const MAX_RTT_MS: u64 = 350;

#[derive(Clone)]
pub struct SwQosConfig {
pub max_streams_per_ms: u64,
Expand Down Expand Up @@ -140,8 +146,12 @@ impl SwQos {
}
}

fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize {
match peer_type {
fn compute_max_allowed_uni_streams_with_rtt(
rtt_millis: u64,
peer_type: ConnectionPeerType,
total_stake: u64,
) -> usize {
let streams = match peer_type {
ConnectionPeerType::Staked(peer_stake) => {
// No checked math for f64 type. So let's explicitly check for 0 here
if total_stake == 0 || peer_stake > total_stake {
Expand All @@ -164,7 +174,10 @@ fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u
}
}
ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
}
};
// scale amount of streams based on RTT if RTT is larger than REFERENCE_RTT_MS
// multiply first then divide to avoid rounding errors.
(streams * rtt_millis.clamp(REFERENCE_RTT_MS, MAX_RTT_MS) as usize) / REFERENCE_RTT_MS as usize
}

impl SwQos {
Expand All @@ -182,7 +195,10 @@ impl SwQos {
),
ConnectionHandlerError,
> {
if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
// get current RTT and limit it to MAX_RTT_MS
let rtt_millis = connection.rtt().as_millis() as u64;
if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams_with_rtt(
rtt_millis,
conn_context.peer_type(),
conn_context.total_stake,
) as u64)
Expand Down Expand Up @@ -525,6 +541,10 @@ impl QosController<SwQosConnectionContext> for SwQos {
pub mod test {
use super::*;

fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize {
compute_max_allowed_uni_streams_with_rtt(REFERENCE_RTT_MS, peer_type, total_stake)
}

#[test]
fn test_max_allowed_uni_streams() {
assert_eq!(
Expand All @@ -551,4 +571,26 @@ pub mod test {
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
}

#[test]
fn test_max_allowed_uni_streams_with_rtt() {
assert_eq!(
compute_max_allowed_uni_streams_with_rtt(
REFERENCE_RTT_MS / 2,
ConnectionPeerType::Unstaked,
10000
),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
"Max streams should not be less than normal for low RTT"
);
assert_eq!(
compute_max_allowed_uni_streams_with_rtt(
REFERENCE_RTT_MS + REFERENCE_RTT_MS / 2,
ConnectionPeerType::Unstaked,
10000
),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS / 2,
"Max streams should scale with BDP in high-RTT connections"
);
}
}
Loading