Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl<'a> Streams<'a> {

self.state.next[dir as usize] += 1;
let id = StreamId::new(self.state.side, dir, self.state.next[dir as usize] - 1);
self.state.insert(false, id);
self.state.insert_local(id);
self.state.send_streams += 1;
Some(id)
}
Expand Down
271 changes: 224 additions & 47 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl StreamsState {
receive_window: VarInt,
stream_receive_window: VarInt,
) -> Self {
let mut this = Self {
Self {
side,
send: FxHashMap::default(),
recv: FxHashMap::default(),
Expand Down Expand Up @@ -184,15 +184,7 @@ impl StreamsState {
initial_max_stream_data_bidi_remote: 0u32.into(),
receive_window_shrink_debt: 0,
streams_blocked: [false, false],
};

for dir in Dir::iter() {
for i in 0..this.max_remote[dir as usize] {
this.insert(true, StreamId::new(!side, dir, i));
Comment thread
Ralith marked this conversation as resolved.
}
}

this
}

pub(crate) fn set_params(&mut self, params: &TransportParameters) {
Expand All @@ -202,10 +194,11 @@ impl StreamsState {
self.max[Dir::Bi as usize] = params.initial_max_streams_bidi.into();
self.max[Dir::Uni as usize] = params.initial_max_streams_uni.into();
self.received_max_data(params.initial_max_data);
for i in 0..self.max_remote[Dir::Bi as usize] {
let id = StreamId::new(!self.side, Dir::Bi, i);
if let Some(s) = self.send.get_mut(&id).and_then(|s| s.as_mut()) {
s.max_data = params.initial_max_stream_data_bidi_local.into();
for (&id, slot) in self.send.iter_mut() {
if id.initiator() != self.side && id.dir() == Dir::Bi {
if let Some(s) = slot.as_mut() {
s.max_data = params.initial_max_stream_data_bidi_local.into();
}
}
}
}
Expand All @@ -215,10 +208,6 @@ impl StreamsState {
fn ensure_remote_streams(&mut self, dir: Dir) {
let new_count = self.max_concurrent_remote_count[dir as usize]
.saturating_sub(self.allocated_remote_count[dir as usize]);
for i in 0..new_count {
let id = StreamId::new(!self.side, dir, self.max_remote[dir as usize] + i);
self.insert(true, id);
}
self.allocated_remote_count[dir as usize] += new_count;
self.max_remote[dir as usize] += new_count;
}
Expand Down Expand Up @@ -263,6 +252,9 @@ impl StreamsState {
debug!("received illegal STREAM frame");
})?;

// Create state for this stream if the remote peer created it.
let newly_created = self.ensure_remote(id);

let Some(rs) = self
.recv
.get_mut(&id)
Expand All @@ -282,7 +274,11 @@ impl StreamsState {
self.data_recvd = self.data_recvd.saturating_add(new_bytes);

if !rs.stopped {
self.on_stream_frame(true, id);
// Newly opened streams are inherently readable; the app discovers them via
// `StreamEvent::Opened` and a separate `Readable` would be redundant.
if !newly_created {
self.events.push_back(StreamEvent::Readable { id });
}
return Ok(ShouldTransmit(false));
}

Expand Down Expand Up @@ -313,6 +309,9 @@ impl StreamsState {
debug!("received illegal RESET_STREAM frame");
})?;

// Create state for this stream if the remote peer created it.
let newly_created = self.ensure_remote(id);

let Some(rs) = self
.recv
.get_mut(&id)
Expand All @@ -339,8 +338,11 @@ impl StreamsState {
// Stopped streams should be disposed immediately on reset
let rs = self.recv.remove(&id).flatten().unwrap();
self.stream_recv_freed(id, rs);
} else if !newly_created {
// Newly opened streams are inherently readable; the app discovers them via
// `StreamEvent::Opened` and a separate `Readable` would be redundant.
self.events.push_back(StreamEvent::Readable { id });
}
self.on_stream_frame(!stopped, id);

// Update connection-level flow control
Ok(if bytes_read != final_offset.into_inner() {
Expand All @@ -357,6 +359,9 @@ impl StreamsState {
/// Process incoming `STOP_SENDING` frame
#[allow(unreachable_pub)] // fuzzing only
pub fn received_stop_sending(&mut self, id: StreamId, error_code: VarInt) {
// Create state for this stream if the remote peer created it.
self.ensure_remote(id);

let max_send_data = self.max_send_data(id);
let Some(stream) = self
.send
Expand All @@ -369,7 +374,6 @@ impl StreamsState {
if stream.try_stop(error_code) {
self.events
.push_back(StreamEvent::Stopped { id, error_code });
self.on_stream_frame(false, id);
}
}

Expand Down Expand Up @@ -632,24 +636,6 @@ impl StreamsState {
stream_frames
}

/// Notify the application that new streams were opened or a stream became readable.
fn on_stream_frame(&mut self, notify_readable: bool, stream: StreamId) {
if stream.initiator() == self.side {
// Notifying about the opening of locally-initiated streams would be redundant.
if notify_readable {
self.events.push_back(StreamEvent::Readable { id: stream });
}
return;
}
let next = &mut self.next_remote[stream.dir() as usize];
Comment thread
Ralith marked this conversation as resolved.
if stream.index() >= *next {
*next = stream.index() + 1;
self.opened[stream.dir() as usize] = true;
} else if notify_readable {
Comment thread
Ralith marked this conversation as resolved.
self.events.push_back(StreamEvent::Readable { id: stream });
}
}

pub(crate) fn received_ack_of(&mut self, frame: frame::StreamMeta) {
let mut entry = match self.send.entry(frame.id) {
hash_map::Entry::Vacant(_) => return,
Expand Down Expand Up @@ -750,6 +736,9 @@ impl StreamsState {
));
}

// Create state for this stream if the remote peer created it.
self.ensure_remote(id);

let write_limit = self.write_limit();
let max_send_data = self.max_send_data(id);
if let Some(ss) = self
Expand All @@ -775,7 +764,6 @@ impl StreamsState {
));
}

self.on_stream_frame(false, id);
Ok(())
}

Expand Down Expand Up @@ -892,17 +880,49 @@ impl StreamsState {
expanded
}

pub(super) fn insert(&mut self, remote: bool, id: StreamId) {
let bi = id.dir() == Dir::Bi;
// bidirectional OR (unidirectional AND NOT remote)
if bi || !remote {
assert!(self.send.insert(id, None).is_none());
/// Insert `(id, None)` placeholders for a locally-initiated stream into `send` (and `recv`
/// for bidi). Called from `Streams::open`; the caller guarantees the id is fresh.
pub(super) fn insert_local(&mut self, id: StreamId) {
debug_assert_eq!(id.initiator(), self.side);
assert!(self.send.insert(id, None).is_none());
if id.dir() == Dir::Bi {
let recv = self.free_recv.pop();
assert!(self.recv.insert(id, recv).is_none());
}
// bidirectional OR (unidirectional AND remote)
if bi || remote {
}

/// Allocate any new remote streams when a packet arrives for a stream id.
/// Any streams above `next_remote` are considered in the default state, avoiding allocations.
/// Once we receive a packet for a new remote stream, we advance `next_remote` and allocate actual state.
/// If there's a gap, we insert `None` placeholders for the missing streams.
/// Returns `true` if `id` was a newly allocated remote stream.
fn ensure_remote(&mut self, id: StreamId) -> bool {
let dir = id.dir();
let dir_idx = dir as usize;

// If we initiated this stream, nothing to do
if id.initiator() == self.side
// If this stream is larger than the max allowed, return.
// NOTE: STREAM/RESET_STREAM already enforces this, however STOP_SENDING/MAX_STREAM_DATA do not
Comment thread
Ralith marked this conversation as resolved.
|| id.index() >= self.max_remote[dir_idx]
// If this stream has already been opened, nothing to do
|| id.index() < self.next_remote[dir_idx]
{
return false;
}

// Create all of the streams between the largest opened and this stream.
for i in self.next_remote[dir_idx]..=id.index() {
let id = StreamId::new(!self.side, dir, i);
let recv = self.free_recv.pop();
assert!(self.recv.insert(id, recv).is_none());
if dir == Dir::Bi {
assert!(self.send.insert(id, None).is_none());
}
}
self.next_remote[dir_idx] = id.index() + 1;
self.opened[dir_idx] = true;
true
}

/// Adds credits to the connection flow control window
Expand Down Expand Up @@ -1880,10 +1900,167 @@ mod tests {
for _ in 0..2 {
client.set_max_concurrent(Dir::Uni, 200u32.into());
client.set_max_concurrent(Dir::Bi, 201u32.into());
assert_eq!(client.recv.len(), 200 + 201);
assert_eq!(client.max_remote[Dir::Uni as usize], 200);
assert_eq!(client.max_remote[Dir::Bi as usize], 201);
assert_eq!(client.allocated_remote_count[Dir::Uni as usize], 200);
assert_eq!(client.allocated_remote_count[Dir::Bi as usize], 201);
// Slots are materialized lazily: no remote stream has been touched yet.
assert!(client.recv.is_empty());
assert!(client.send.is_empty());
}
}

#[test]
fn lazy_remote_allocation_starts_empty() {
// `StreamsState::new` must not pre-populate `send`/`recv` with placeholder slots.
let client = StreamsState::new(
Side::Client,
10_000u32.into(),
10_000u32.into(),
1024 * 1024,
(1024 * 1024u32).into(),
(1024 * 1024u32).into(),
);
// No slots allocated until a stream is actually received.
assert!(client.recv.is_empty());
assert!(client.send.is_empty());
assert_eq!(client.recv.capacity(), 0);
assert_eq!(client.send.capacity(), 0);
Comment thread
Ralith marked this conversation as resolved.
}

#[test]
fn out_of_order_implicit_open() {
Comment thread
Ralith marked this conversation as resolved.
// Receiving idx=5 implicitly opens idx 0..=4. A later frame for idx=3 must be
// processed normally, not dropped as "closed".
const STREAM_5_PAYLOAD: &[u8] = &[0xAA; 8];
const STREAM_3_PAYLOAD: &[u8] = &[0xBB; 4];

let mut client = make(Side::Client);
assert_eq!(
client.received(
frame::Stream {
id: StreamId::new(Side::Server, Dir::Uni, 5),
offset: 0,
fin: true,
data: Bytes::from_static(STREAM_5_PAYLOAD),
},
STREAM_5_PAYLOAD.len(),
),
Ok(ShouldTransmit(false))
);
assert_eq!(client.next_remote[Dir::Uni as usize], 6);
assert_eq!(
client.received(
frame::Stream {
id: StreamId::new(Side::Server, Dir::Uni, 3),
offset: 0,
fin: true,
data: Bytes::from_static(STREAM_3_PAYLOAD),
},
STREAM_3_PAYLOAD.len(),
),
Ok(ShouldTransmit(false))
);

let id = StreamId::new(Side::Server, Dir::Uni, 3);
let mut pending = Retransmits::default();
let mut recv = RecvStream {
id,
state: &mut client,
pending: &mut pending,
};
let mut chunks = recv.read(true).unwrap();
assert_eq!(
chunks.next(STREAM_3_PAYLOAD.len()).unwrap().unwrap().bytes,
STREAM_3_PAYLOAD
);
let _ = chunks.finalize();
}

#[test]
fn frame_for_closed_stream_is_dropped() {
// After a remote stream is fully freed, a subsequent frame for the same id must be
// dropped — absence from the map unambiguously means "closed" for ids below the
// frontier.
const PAYLOAD: &[u8] = &[0; 4];

let mut client = make(Side::Client);
let id = StreamId::new(Side::Server, Dir::Uni, 0);
assert_eq!(
client.received(
frame::Stream {
id,
offset: 0,
fin: true,
data: Bytes::from_static(PAYLOAD),
},
PAYLOAD.len(),
),
Ok(ShouldTransmit(false))
);
// Stop the stream so it's fully freed.
let mut pending = Retransmits::default();
RecvStream {
id,
state: &mut client,
pending: &mut pending,
}
.stop(0u32.into())
.unwrap();
assert!(!client.recv.contains_key(&id));

// A stray retransmit for the freed stream must be dropped without resurrecting state.
assert_eq!(
client.received(
frame::Stream {
id,
offset: 0,
fin: true,
data: Bytes::from_static(PAYLOAD),
},
PAYLOAD.len(),
),
Ok(ShouldTransmit(false))
);
assert!(!client.recv.contains_key(&id));
}

#[test]
fn churn_keeps_maps_bounded() {
// Rapidly open + fully close a long sequence of remote streams. The maps must stay
// bounded (only active streams are materialized) even though thousands of ids have
// been used over the connection's lifetime.
const N: u64 = 5_000;

let mut client = make(Side::Client);
for i in 0..N {
let id = StreamId::new(Side::Server, Dir::Uni, i);
assert_eq!(
client.received(
frame::Stream {
id,
offset: 0,
fin: true,
data: Bytes::from_static(&[0; 1]),
},
1,
),
Ok(ShouldTransmit(false))
);
let mut pending = Retransmits::default();
let mut recv = RecvStream {
id,
state: &mut client,
pending: &mut pending,
};
let mut chunks = recv.read(true).unwrap();
let _ = chunks.next(1).unwrap();
assert!(chunks.next(1).unwrap().is_none());
let _ = chunks.finalize();
}
// Every stream was fully drained; the map must be empty.
assert_eq!(client.recv.len(), 0);
assert_eq!(client.send.len(), 0);
}

#[test]
Expand Down
Loading
Loading