Skip to content

Commit 4d29110

Browse files
kixelatedclaude
andauthored
perf(noq-proto): lazily allocate remote stream slots (n0-computer#667)
Port of quinn-rs/quinn#2601. `StreamsState::new` used to eagerly insert `(id, None)` placeholder entries in both `send` and `recv` FxHashMaps for every remote stream id in `0..max_remote[dir]`. With `max_concurrent_*_streams = 10_000` (e.g. for MoQ relay nodes that burn through short-lived streams), hashbrown rounded each map's bucket array up to ~65K buckets, costing ~0.5-2 MB of bucket memory per Connection before any stream data was sent. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 8188014 commit 4d29110

3 files changed

Lines changed: 228 additions & 50 deletions

File tree

noq-proto/src/connection/streams/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl<'a> Streams<'a> {
5656

5757
self.state.next[dir as usize] += 1;
5858
let id = StreamId::new(self.state.side, dir, self.state.next[dir as usize] - 1);
59-
self.state.insert(false, id);
59+
self.state.insert_local(id);
6060
self.state.send_streams += 1;
6161
Some(id)
6262
}

noq-proto/src/connection/streams/state.rs

Lines changed: 224 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ impl StreamsState {
149149
receive_window: VarInt,
150150
stream_receive_window: VarInt,
151151
) -> Self {
152-
let mut this = Self {
152+
Self {
153153
side,
154154
send: FxHashMap::default(),
155155
recv: FxHashMap::default(),
@@ -182,15 +182,7 @@ impl StreamsState {
182182
initial_max_stream_data_bidi_remote: 0u32.into(),
183183
receive_window_shrink_debt: 0,
184184
streams_blocked: [false, false],
185-
};
186-
187-
for dir in Dir::iter() {
188-
for i in 0..this.max_remote[dir as usize] {
189-
this.insert(true, StreamId::new(!side, dir, i));
190-
}
191185
}
192-
193-
this
194186
}
195187

196188
pub(crate) fn set_params(&mut self, params: &TransportParameters) {
@@ -200,9 +192,11 @@ impl StreamsState {
200192
self.max[Dir::Bi as usize] = params.initial_max_streams_bidi.into();
201193
self.max[Dir::Uni as usize] = params.initial_max_streams_uni.into();
202194
self.received_max_data(params.initial_max_data);
203-
for i in 0..self.max_remote[Dir::Bi as usize] {
204-
let id = StreamId::new(!self.side, Dir::Bi, i);
205-
if let Some(s) = self.send.get_mut(&id).and_then(|s| s.as_mut()) {
195+
for (&id, slot) in self.send.iter_mut() {
196+
if id.initiator() != self.side
197+
&& id.dir() == Dir::Bi
198+
&& let Some(s) = slot.as_mut()
199+
{
206200
s.max_data = params.initial_max_stream_data_bidi_local.into();
207201
}
208202
}
@@ -213,10 +207,6 @@ impl StreamsState {
213207
fn ensure_remote_streams(&mut self, dir: Dir) {
214208
let new_count = self.max_concurrent_remote_count[dir as usize]
215209
.saturating_sub(self.allocated_remote_count[dir as usize]);
216-
for i in 0..new_count {
217-
let id = StreamId::new(!self.side, dir, self.max_remote[dir as usize] + i);
218-
self.insert(true, id);
219-
}
220210
self.allocated_remote_count[dir as usize] += new_count;
221211
self.max_remote[dir as usize] += new_count;
222212
}
@@ -261,6 +251,9 @@ impl StreamsState {
261251
debug!("received illegal STREAM frame");
262252
})?;
263253

254+
// Create state for this stream if the remote peer created it.
255+
let newly_created = self.ensure_remote(id);
256+
264257
let Some(rs) = self
265258
.recv
266259
.get_mut(&id)
@@ -280,7 +273,11 @@ impl StreamsState {
280273
self.data_recvd = self.data_recvd.saturating_add(new_bytes);
281274

282275
if !rs.stopped {
283-
self.on_stream_frame(true, id);
276+
// Newly opened streams are inherently readable; the app discovers them via
277+
// `StreamEvent::Opened` and a separate `Readable` would be redundant.
278+
if !newly_created {
279+
self.events.push_back(StreamEvent::Readable { id });
280+
}
284281
return Ok(ShouldTransmit(false));
285282
}
286283

@@ -311,6 +308,9 @@ impl StreamsState {
311308
debug!("received illegal RESET_STREAM frame");
312309
})?;
313310

311+
// Create state for this stream if the remote peer created it.
312+
let newly_created = self.ensure_remote(id);
313+
314314
let Some(rs) = self
315315
.recv
316316
.get_mut(&id)
@@ -337,8 +337,11 @@ impl StreamsState {
337337
// Stopped streams should be disposed immediately on reset
338338
let rs = self.recv.remove(&id).flatten().unwrap();
339339
self.stream_recv_freed(id, rs);
340+
} else if !newly_created {
341+
// Newly opened streams are inherently readable; the app discovers them via
342+
// `StreamEvent::Opened` and a separate `Readable` would be redundant.
343+
self.events.push_back(StreamEvent::Readable { id });
340344
}
341-
self.on_stream_frame(!stopped, id);
342345

343346
// Update connection-level flow control
344347
Ok(if bytes_read != final_offset.into_inner() {
@@ -355,6 +358,9 @@ impl StreamsState {
355358
/// Process incoming `STOP_SENDING` frame
356359
#[allow(unreachable_pub)] // fuzzing only
357360
pub fn received_stop_sending(&mut self, id: StreamId, error_code: VarInt) {
361+
// Create state for this stream if the remote peer created it.
362+
self.ensure_remote(id);
363+
358364
let max_send_data = self.max_send_data(id);
359365
let Some(stream) = self
360366
.send
@@ -367,7 +373,6 @@ impl StreamsState {
367373
if stream.try_stop(error_code) {
368374
self.events
369375
.push_back(StreamEvent::Stopped { id, error_code });
370-
self.on_stream_frame(false, id);
371376
}
372377
}
373378

@@ -580,24 +585,6 @@ impl StreamsState {
580585
builder.sent_frames().stream_frames.clone()
581586
}
582587

583-
/// Notify the application that new streams were opened or a stream became readable.
584-
fn on_stream_frame(&mut self, notify_readable: bool, stream: StreamId) {
585-
if stream.initiator() == self.side {
586-
// Notifying about the opening of locally-initiated streams would be redundant.
587-
if notify_readable {
588-
self.events.push_back(StreamEvent::Readable { id: stream });
589-
}
590-
return;
591-
}
592-
let next = &mut self.next_remote[stream.dir() as usize];
593-
if stream.index() >= *next {
594-
*next = stream.index() + 1;
595-
self.opened[stream.dir() as usize] = true;
596-
} else if notify_readable {
597-
self.events.push_back(StreamEvent::Readable { id: stream });
598-
}
599-
}
600-
601588
pub(crate) fn received_ack_of(&mut self, frame: frame::StreamMeta) {
602589
let mut entry = match self.send.entry(frame.id) {
603590
hash_map::Entry::Vacant(_) => return,
@@ -698,6 +685,9 @@ impl StreamsState {
698685
));
699686
}
700687

688+
// Create state for this stream if the remote peer created it.
689+
self.ensure_remote(id);
690+
701691
let write_limit = self.write_limit();
702692
let max_send_data = self.max_send_data(id);
703693
if let Some(ss) = self
@@ -723,7 +713,6 @@ impl StreamsState {
723713
));
724714
}
725715

726-
self.on_stream_frame(false, id);
727716
Ok(())
728717
}
729718

@@ -840,17 +829,49 @@ impl StreamsState {
840829
expanded
841830
}
842831

843-
pub(super) fn insert(&mut self, remote: bool, id: StreamId) {
844-
let bi = id.dir() == Dir::Bi;
845-
// bidirectional OR (unidirectional AND NOT remote)
846-
if bi || !remote {
847-
assert!(self.send.insert(id, None).is_none());
832+
/// Insert `(id, None)` placeholders for a locally-initiated stream into `send` (and `recv`
833+
/// for bidi). Called from `Streams::open`; the caller guarantees the id is fresh.
834+
pub(super) fn insert_local(&mut self, id: StreamId) {
835+
debug_assert_eq!(id.initiator(), self.side);
836+
assert!(self.send.insert(id, None).is_none());
837+
if id.dir() == Dir::Bi {
838+
let recv = self.free_recv.pop();
839+
assert!(self.recv.insert(id, recv).is_none());
840+
}
841+
}
842+
843+
/// Allocate any new remote streams when a packet arrives for a stream id.
844+
/// Any streams above `next_remote` are considered in the default state, avoiding allocations.
845+
/// Once we receive a packet for a new remote stream, we advance `next_remote` and allocate actual state.
846+
/// If there's a gap, we insert `None` placeholders for the missing streams.
847+
/// Returns `true` if `id` was a newly allocated remote stream.
848+
fn ensure_remote(&mut self, id: StreamId) -> bool {
849+
let dir = id.dir();
850+
let dir_idx = dir as usize;
851+
852+
// If we initiated this stream, nothing to do
853+
if id.initiator() == self.side
854+
// If this stream is larger than the max allowed, return.
855+
// NOTE: STREAM/RESET_STREAM already enforces this, however STOP_SENDING/MAX_STREAM_DATA do not
856+
|| id.index() >= self.max_remote[dir_idx]
857+
// If this stream has already been opened, nothing to do
858+
|| id.index() < self.next_remote[dir_idx]
859+
{
860+
return false;
848861
}
849-
// bidirectional OR (unidirectional AND remote)
850-
if bi || remote {
862+
863+
// Create all of the streams between the largest opened and this stream.
864+
for i in self.next_remote[dir_idx]..=id.index() {
865+
let id = StreamId::new(!self.side, dir, i);
851866
let recv = self.free_recv.pop();
852867
assert!(self.recv.insert(id, recv).is_none());
868+
if dir == Dir::Bi {
869+
assert!(self.send.insert(id, None).is_none());
870+
}
853871
}
872+
self.next_remote[dir_idx] = id.index() + 1;
873+
self.opened[dir_idx] = true;
874+
true
854875
}
855876

856877
/// Adds credits to the connection flow control window
@@ -1821,10 +1842,167 @@ mod tests {
18211842
for _ in 0..2 {
18221843
client.set_max_concurrent(Dir::Uni, 200u32.into());
18231844
client.set_max_concurrent(Dir::Bi, 201u32.into());
1824-
assert_eq!(client.recv.len(), 200 + 201);
18251845
assert_eq!(client.max_remote[Dir::Uni as usize], 200);
18261846
assert_eq!(client.max_remote[Dir::Bi as usize], 201);
1847+
assert_eq!(client.allocated_remote_count[Dir::Uni as usize], 200);
1848+
assert_eq!(client.allocated_remote_count[Dir::Bi as usize], 201);
1849+
// Slots are materialized lazily: no remote stream has been touched yet.
1850+
assert!(client.recv.is_empty());
1851+
assert!(client.send.is_empty());
1852+
}
1853+
}
1854+
1855+
#[test]
1856+
fn lazy_remote_allocation_starts_empty() {
1857+
// `StreamsState::new` must not pre-populate `send`/`recv` with placeholder slots.
1858+
let client = StreamsState::new(
1859+
Side::Client,
1860+
10_000u32.into(),
1861+
10_000u32.into(),
1862+
1024 * 1024,
1863+
(1024 * 1024u32).into(),
1864+
(1024 * 1024u32).into(),
1865+
);
1866+
// No slots allocated until a stream is actually received.
1867+
assert!(client.recv.is_empty());
1868+
assert!(client.send.is_empty());
1869+
assert_eq!(client.recv.capacity(), 0);
1870+
assert_eq!(client.send.capacity(), 0);
1871+
}
1872+
1873+
#[test]
1874+
fn out_of_order_implicit_open() {
1875+
// Receiving idx=5 implicitly opens idx 0..=4. A later frame for idx=3 must be
1876+
// processed normally, not dropped as "closed".
1877+
const STREAM_5_PAYLOAD: &[u8] = &[0xAA; 8];
1878+
const STREAM_3_PAYLOAD: &[u8] = &[0xBB; 4];
1879+
1880+
let mut client = make(Side::Client);
1881+
assert_eq!(
1882+
client.received(
1883+
frame::Stream {
1884+
id: StreamId::new(Side::Server, Dir::Uni, 5),
1885+
offset: 0,
1886+
fin: true,
1887+
data: Bytes::from_static(STREAM_5_PAYLOAD),
1888+
},
1889+
STREAM_5_PAYLOAD.len(),
1890+
),
1891+
Ok(ShouldTransmit(false))
1892+
);
1893+
assert_eq!(client.next_remote[Dir::Uni as usize], 6);
1894+
assert_eq!(
1895+
client.received(
1896+
frame::Stream {
1897+
id: StreamId::new(Side::Server, Dir::Uni, 3),
1898+
offset: 0,
1899+
fin: true,
1900+
data: Bytes::from_static(STREAM_3_PAYLOAD),
1901+
},
1902+
STREAM_3_PAYLOAD.len(),
1903+
),
1904+
Ok(ShouldTransmit(false))
1905+
);
1906+
1907+
let id = StreamId::new(Side::Server, Dir::Uni, 3);
1908+
let mut pending = Retransmits::default();
1909+
let mut recv = RecvStream {
1910+
id,
1911+
state: &mut client,
1912+
pending: &mut pending,
1913+
};
1914+
let mut chunks = recv.read(true).unwrap();
1915+
assert_eq!(
1916+
chunks.next(STREAM_3_PAYLOAD.len()).unwrap().unwrap().bytes,
1917+
STREAM_3_PAYLOAD
1918+
);
1919+
let _ = chunks.finalize();
1920+
}
1921+
1922+
#[test]
1923+
fn frame_for_closed_stream_is_dropped() {
1924+
// After a remote stream is fully freed, a subsequent frame for the same id must be
1925+
// dropped — absence from the map unambiguously means "closed" for ids below the
1926+
// frontier.
1927+
const PAYLOAD: &[u8] = &[0; 4];
1928+
1929+
let mut client = make(Side::Client);
1930+
let id = StreamId::new(Side::Server, Dir::Uni, 0);
1931+
assert_eq!(
1932+
client.received(
1933+
frame::Stream {
1934+
id,
1935+
offset: 0,
1936+
fin: true,
1937+
data: Bytes::from_static(PAYLOAD),
1938+
},
1939+
PAYLOAD.len(),
1940+
),
1941+
Ok(ShouldTransmit(false))
1942+
);
1943+
// Stop the stream so it's fully freed.
1944+
let mut pending = Retransmits::default();
1945+
RecvStream {
1946+
id,
1947+
state: &mut client,
1948+
pending: &mut pending,
1949+
}
1950+
.stop(0u32.into())
1951+
.unwrap();
1952+
assert!(!client.recv.contains_key(&id));
1953+
1954+
// A stray retransmit for the freed stream must be dropped without resurrecting state.
1955+
assert_eq!(
1956+
client.received(
1957+
frame::Stream {
1958+
id,
1959+
offset: 0,
1960+
fin: true,
1961+
data: Bytes::from_static(PAYLOAD),
1962+
},
1963+
PAYLOAD.len(),
1964+
),
1965+
Ok(ShouldTransmit(false))
1966+
);
1967+
assert!(!client.recv.contains_key(&id));
1968+
}
1969+
1970+
#[test]
1971+
fn churn_keeps_maps_bounded() {
1972+
// Rapidly open + fully close a long sequence of remote streams. The maps must stay
1973+
// bounded (only active streams are materialized) even though thousands of ids have
1974+
// been used over the connection's lifetime.
1975+
const N: u64 = 5_000;
1976+
1977+
let mut client = make(Side::Client);
1978+
for i in 0..N {
1979+
let id = StreamId::new(Side::Server, Dir::Uni, i);
1980+
assert_eq!(
1981+
client.received(
1982+
frame::Stream {
1983+
id,
1984+
offset: 0,
1985+
fin: true,
1986+
data: Bytes::from_static(&[0; 1]),
1987+
},
1988+
1,
1989+
),
1990+
Ok(ShouldTransmit(false))
1991+
);
1992+
let mut pending = Retransmits::default();
1993+
let mut recv = RecvStream {
1994+
id,
1995+
state: &mut client,
1996+
pending: &mut pending,
1997+
};
1998+
let mut chunks = recv.read(true).unwrap();
1999+
let _ = chunks.next(1).unwrap();
2000+
assert!(chunks.next(1).unwrap().is_none());
2001+
let _ = chunks.finalize();
18272002
}
2003+
// Every stream was fully drained; the map must be empty.
2004+
assert_eq!(client.recv.len(), 0);
2005+
assert_eq!(client.send.len(), 0);
18282006
}
18292007

18302008
#[test]

0 commit comments

Comments
 (0)