diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs index 6e7710764a..edfee21447 100644 --- a/quinn-proto/src/connection/streams/mod.rs +++ b/quinn-proto/src/connection/streams/mod.rs @@ -55,7 +55,7 @@ impl<'a> Streams<'a> { self.state.next[dir as usize] += 1; let id = StreamId::new(self.state.side, dir, self.state.next[dir as usize] - 1); - self.state.insert(false, id); + self.state.insert_local(id); self.state.send_streams += 1; Some(id) } diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index ee063afd2a..7f2e676dab 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -151,7 +151,7 @@ impl StreamsState { receive_window: VarInt, stream_receive_window: VarInt, ) -> Self { - let mut this = Self { + Self { side, send: FxHashMap::default(), recv: FxHashMap::default(), @@ -184,15 +184,7 @@ impl StreamsState { initial_max_stream_data_bidi_remote: 0u32.into(), receive_window_shrink_debt: 0, streams_blocked: [false, false], - }; - - for dir in Dir::iter() { - for i in 0..this.max_remote[dir as usize] { - this.insert(true, StreamId::new(!side, dir, i)); - } } - - this } pub(crate) fn set_params(&mut self, params: &TransportParameters) { @@ -202,10 +194,11 @@ impl StreamsState { self.max[Dir::Bi as usize] = params.initial_max_streams_bidi.into(); self.max[Dir::Uni as usize] = params.initial_max_streams_uni.into(); self.received_max_data(params.initial_max_data); - for i in 0..self.max_remote[Dir::Bi as usize] { - let id = StreamId::new(!self.side, Dir::Bi, i); - if let Some(s) = self.send.get_mut(&id).and_then(|s| s.as_mut()) { - s.max_data = params.initial_max_stream_data_bidi_local.into(); + for (&id, slot) in self.send.iter_mut() { + if id.initiator() != self.side && id.dir() == Dir::Bi { + if let Some(s) = slot.as_mut() { + s.max_data = params.initial_max_stream_data_bidi_local.into(); + } } } } @@ -215,10 +208,6 @@ impl StreamsState { fn ensure_remote_streams(&mut self, dir: Dir) { let new_count = self.max_concurrent_remote_count[dir as usize] .saturating_sub(self.allocated_remote_count[dir as usize]); - for i in 0..new_count { - let id = StreamId::new(!self.side, dir, self.max_remote[dir as usize] + i); - self.insert(true, id); - } self.allocated_remote_count[dir as usize] += new_count; self.max_remote[dir as usize] += new_count; } @@ -263,6 +252,9 @@ impl StreamsState { debug!("received illegal STREAM frame"); })?; + // Create state for this stream if the remote peer created it. + let newly_created = self.ensure_remote(id); + let Some(rs) = self .recv .get_mut(&id) @@ -282,7 +274,11 @@ impl StreamsState { self.data_recvd = self.data_recvd.saturating_add(new_bytes); if !rs.stopped { - self.on_stream_frame(true, id); + // Newly opened streams are inherently readable; the app discovers them via + // `StreamEvent::Opened` and a separate `Readable` would be redundant. + if !newly_created { + self.events.push_back(StreamEvent::Readable { id }); + } return Ok(ShouldTransmit(false)); } @@ -313,6 +309,9 @@ impl StreamsState { debug!("received illegal RESET_STREAM frame"); })?; + // Create state for this stream if the remote peer created it. + let newly_created = self.ensure_remote(id); + let Some(rs) = self .recv .get_mut(&id) @@ -339,8 +338,11 @@ impl StreamsState { // Stopped streams should be disposed immediately on reset let rs = self.recv.remove(&id).flatten().unwrap(); self.stream_recv_freed(id, rs); + } else if !newly_created { + // Newly opened streams are inherently readable; the app discovers them via + // `StreamEvent::Opened` and a separate `Readable` would be redundant. + self.events.push_back(StreamEvent::Readable { id }); } - self.on_stream_frame(!stopped, id); // Update connection-level flow control Ok(if bytes_read != final_offset.into_inner() { @@ -357,6 +359,9 @@ impl StreamsState { /// Process incoming `STOP_SENDING` frame #[allow(unreachable_pub)] // fuzzing only pub fn received_stop_sending(&mut self, id: StreamId, error_code: VarInt) { + // Create state for this stream if the remote peer created it. + self.ensure_remote(id); + let max_send_data = self.max_send_data(id); let Some(stream) = self .send @@ -369,7 +374,6 @@ impl StreamsState { if stream.try_stop(error_code) { self.events .push_back(StreamEvent::Stopped { id, error_code }); - self.on_stream_frame(false, id); } } @@ -632,24 +636,6 @@ impl StreamsState { stream_frames } - /// Notify the application that new streams were opened or a stream became readable. - fn on_stream_frame(&mut self, notify_readable: bool, stream: StreamId) { - if stream.initiator() == self.side { - // Notifying about the opening of locally-initiated streams would be redundant. - if notify_readable { - self.events.push_back(StreamEvent::Readable { id: stream }); - } - return; - } - let next = &mut self.next_remote[stream.dir() as usize]; - if stream.index() >= *next { - *next = stream.index() + 1; - self.opened[stream.dir() as usize] = true; - } else if notify_readable { - self.events.push_back(StreamEvent::Readable { id: stream }); - } - } - pub(crate) fn received_ack_of(&mut self, frame: frame::StreamMeta) { let mut entry = match self.send.entry(frame.id) { hash_map::Entry::Vacant(_) => return, @@ -750,6 +736,9 @@ impl StreamsState { )); } + // Create state for this stream if the remote peer created it. + self.ensure_remote(id); + let write_limit = self.write_limit(); let max_send_data = self.max_send_data(id); if let Some(ss) = self @@ -775,7 +764,6 @@ impl StreamsState { )); } - self.on_stream_frame(false, id); Ok(()) } @@ -892,17 +880,49 @@ impl StreamsState { expanded } - pub(super) fn insert(&mut self, remote: bool, id: StreamId) { - let bi = id.dir() == Dir::Bi; - // bidirectional OR (unidirectional AND NOT remote) - if bi || !remote { - assert!(self.send.insert(id, None).is_none()); + /// Insert `(id, None)` placeholders for a locally-initiated stream into `send` (and `recv` + /// for bidi). Called from `Streams::open`; the caller guarantees the id is fresh. + pub(super) fn insert_local(&mut self, id: StreamId) { + debug_assert_eq!(id.initiator(), self.side); + assert!(self.send.insert(id, None).is_none()); + if id.dir() == Dir::Bi { + let recv = self.free_recv.pop(); + assert!(self.recv.insert(id, recv).is_none()); } - // bidirectional OR (unidirectional AND remote) - if bi || remote { + } + + /// Allocate any new remote streams when a packet arrives for a stream id. + /// Any streams above `next_remote` are considered in the default state, avoiding allocations. + /// Once we receive a packet for a new remote stream, we advance `next_remote` and allocate actual state. + /// If there's a gap, we insert `None` placeholders for the missing streams. + /// Returns `true` if `id` was a newly allocated remote stream. + fn ensure_remote(&mut self, id: StreamId) -> bool { + let dir = id.dir(); + let dir_idx = dir as usize; + + // If we initiated this stream, nothing to do + if id.initiator() == self.side + // If this stream is larger than the max allowed, return. + // NOTE: STREAM/RESET_STREAM already enforces this, however STOP_SENDING/MAX_STREAM_DATA do not + || id.index() >= self.max_remote[dir_idx] + // If this stream has already been opened, nothing to do + || id.index() < self.next_remote[dir_idx] + { + return false; + } + + // Create all of the streams between the largest opened and this stream. + for i in self.next_remote[dir_idx]..=id.index() { + let id = StreamId::new(!self.side, dir, i); let recv = self.free_recv.pop(); assert!(self.recv.insert(id, recv).is_none()); + if dir == Dir::Bi { + assert!(self.send.insert(id, None).is_none()); + } } + self.next_remote[dir_idx] = id.index() + 1; + self.opened[dir_idx] = true; + true } /// Adds credits to the connection flow control window @@ -1880,10 +1900,167 @@ mod tests { for _ in 0..2 { client.set_max_concurrent(Dir::Uni, 200u32.into()); client.set_max_concurrent(Dir::Bi, 201u32.into()); - assert_eq!(client.recv.len(), 200 + 201); assert_eq!(client.max_remote[Dir::Uni as usize], 200); assert_eq!(client.max_remote[Dir::Bi as usize], 201); + assert_eq!(client.allocated_remote_count[Dir::Uni as usize], 200); + assert_eq!(client.allocated_remote_count[Dir::Bi as usize], 201); + // Slots are materialized lazily: no remote stream has been touched yet. + assert!(client.recv.is_empty()); + assert!(client.send.is_empty()); + } + } + + #[test] + fn lazy_remote_allocation_starts_empty() { + // `StreamsState::new` must not pre-populate `send`/`recv` with placeholder slots. + let client = StreamsState::new( + Side::Client, + 10_000u32.into(), + 10_000u32.into(), + 1024 * 1024, + (1024 * 1024u32).into(), + (1024 * 1024u32).into(), + ); + // No slots allocated until a stream is actually received. + assert!(client.recv.is_empty()); + assert!(client.send.is_empty()); + assert_eq!(client.recv.capacity(), 0); + assert_eq!(client.send.capacity(), 0); + } + + #[test] + fn out_of_order_implicit_open() { + // Receiving idx=5 implicitly opens idx 0..=4. A later frame for idx=3 must be + // processed normally, not dropped as "closed". + const STREAM_5_PAYLOAD: &[u8] = &[0xAA; 8]; + const STREAM_3_PAYLOAD: &[u8] = &[0xBB; 4]; + + let mut client = make(Side::Client); + assert_eq!( + client.received( + frame::Stream { + id: StreamId::new(Side::Server, Dir::Uni, 5), + offset: 0, + fin: true, + data: Bytes::from_static(STREAM_5_PAYLOAD), + }, + STREAM_5_PAYLOAD.len(), + ), + Ok(ShouldTransmit(false)) + ); + assert_eq!(client.next_remote[Dir::Uni as usize], 6); + assert_eq!( + client.received( + frame::Stream { + id: StreamId::new(Side::Server, Dir::Uni, 3), + offset: 0, + fin: true, + data: Bytes::from_static(STREAM_3_PAYLOAD), + }, + STREAM_3_PAYLOAD.len(), + ), + Ok(ShouldTransmit(false)) + ); + + let id = StreamId::new(Side::Server, Dir::Uni, 3); + let mut pending = Retransmits::default(); + let mut recv = RecvStream { + id, + state: &mut client, + pending: &mut pending, + }; + let mut chunks = recv.read(true).unwrap(); + assert_eq!( + chunks.next(STREAM_3_PAYLOAD.len()).unwrap().unwrap().bytes, + STREAM_3_PAYLOAD + ); + let _ = chunks.finalize(); + } + + #[test] + fn frame_for_closed_stream_is_dropped() { + // After a remote stream is fully freed, a subsequent frame for the same id must be + // dropped — absence from the map unambiguously means "closed" for ids below the + // frontier. + const PAYLOAD: &[u8] = &[0; 4]; + + let mut client = make(Side::Client); + let id = StreamId::new(Side::Server, Dir::Uni, 0); + assert_eq!( + client.received( + frame::Stream { + id, + offset: 0, + fin: true, + data: Bytes::from_static(PAYLOAD), + }, + PAYLOAD.len(), + ), + Ok(ShouldTransmit(false)) + ); + // Stop the stream so it's fully freed. + let mut pending = Retransmits::default(); + RecvStream { + id, + state: &mut client, + pending: &mut pending, + } + .stop(0u32.into()) + .unwrap(); + assert!(!client.recv.contains_key(&id)); + + // A stray retransmit for the freed stream must be dropped without resurrecting state. + assert_eq!( + client.received( + frame::Stream { + id, + offset: 0, + fin: true, + data: Bytes::from_static(PAYLOAD), + }, + PAYLOAD.len(), + ), + Ok(ShouldTransmit(false)) + ); + assert!(!client.recv.contains_key(&id)); + } + + #[test] + fn churn_keeps_maps_bounded() { + // Rapidly open + fully close a long sequence of remote streams. The maps must stay + // bounded (only active streams are materialized) even though thousands of ids have + // been used over the connection's lifetime. + const N: u64 = 5_000; + + let mut client = make(Side::Client); + for i in 0..N { + let id = StreamId::new(Side::Server, Dir::Uni, i); + assert_eq!( + client.received( + frame::Stream { + id, + offset: 0, + fin: true, + data: Bytes::from_static(&[0; 1]), + }, + 1, + ), + Ok(ShouldTransmit(false)) + ); + let mut pending = Retransmits::default(); + let mut recv = RecvStream { + id, + state: &mut client, + pending: &mut pending, + }; + let mut chunks = recv.read(true).unwrap(); + let _ = chunks.next(1).unwrap(); + assert!(chunks.next(1).unwrap().is_none()); + let _ = chunks.finalize(); } + // Every stream was fully drained; the map must be empty. + assert_eq!(client.recv.len(), 0); + assert_eq!(client.send.len(), 0); } #[test] diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index 4dc1f89def..9f92bcb5cb 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -670,10 +670,10 @@ fn zero_rtt_rejection() { let s2 = pair.client_streams(client_ch).open(Dir::Uni).unwrap(); assert_eq!(s, s2); + // `s2` was never successfully received by the server, so from its perspective the + // stream has never been opened. We currently surface that as `ClosedStream`. let mut recv = pair.server_recv(server_ch, s2); - let mut chunks = recv.read(false).unwrap(); - assert_eq!(chunks.next(usize::MAX), Err(ReadError::Blocked)); - let _ = chunks.finalize(); + assert_eq!(recv.read(false).err(), Some(ReadableError::ClosedStream)); assert_eq!(pair.client_conn_mut(client_ch).stats().path.lost_packets, 0); }