Skip to content

Commit 4c4c7dd

Browse files
authored
[stream] encode length as varint (#2555)
1 parent e89e651 commit 4c4c7dd

2 files changed

Lines changed: 67 additions & 24 deletions

File tree

stream/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ pub enum Error {
9898
RecvFailed(RuntimeError),
9999
#[error("recv too large: {0} bytes")]
100100
RecvTooLarge(usize),
101+
#[error("invalid varint length prefix")]
102+
InvalidVarint,
101103
#[error("send failed")]
102104
SendFailed(RuntimeError),
103105
#[error("send zero size")]

stream/src/utils/codec.rs

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
use crate::Error;
2-
use bytes::{BufMut as _, Bytes, BytesMut};
2+
use bytes::{Bytes, BytesMut};
3+
use commonware_codec::{
4+
varint::{Decoder, UInt},
5+
EncodeSize as _, Write as _,
6+
};
37
use commonware_runtime::{Sink, Stream};
8+
use commonware_utils::StableBuf;
49

5-
/// Sends data to the sink with a 4-byte length prefix.
10+
/// Sends data to the sink with a varint length prefix.
611
/// Returns an error if the message is too large or the stream is closed.
712
pub async fn send_frame<S: Sink>(
813
sink: &mut S,
@@ -15,22 +20,31 @@ pub async fn send_frame<S: Sink>(
1520
return Err(Error::SendTooLarge(n));
1621
}
1722

18-
// Prefix `buf` with its length and send it
19-
let mut prefixed_buf = BytesMut::with_capacity(4 + buf.len());
20-
let len: u32 = n.try_into().map_err(|_| Error::SendTooLarge(n))?;
21-
prefixed_buf.put_u32(len);
23+
// Prefix `buf` with its varint-encoded length and send it
24+
let len = UInt(n as u32);
25+
let mut prefixed_buf = BytesMut::with_capacity(len.encode_size() + buf.len());
26+
len.write(&mut prefixed_buf);
2227
prefixed_buf.extend_from_slice(buf);
2328
sink.send(prefixed_buf).await.map_err(Error::SendFailed)
2429
}
2530

26-
/// Receives data from the stream with a 4-byte length prefix.
27-
/// Returns an error if the message is too large or the stream is closed.
31+
/// Receives data from the stream with a varint length prefix.
32+
/// Returns an error if the message is too large, the varint is invalid, or the
33+
/// stream is closed.
2834
pub async fn recv_frame<T: Stream>(stream: &mut T, max_message_size: u32) -> Result<Bytes, Error> {
29-
// Read the first 4 bytes to get the length of the message
30-
let len_buf = stream.recv(vec![0; 4]).await.map_err(Error::RecvFailed)?;
35+
// Read and decode the varint length prefix byte-by-byte
36+
let mut decoder = Decoder::<u32>::new();
37+
let mut buf = StableBuf::from(vec![0u8; 1]);
38+
let len = loop {
39+
buf = stream.recv(buf).await.map_err(Error::RecvFailed)?;
40+
match decoder.feed(buf[0]) {
41+
Ok(Some(len)) => break len as usize,
42+
Ok(None) => continue,
43+
Err(_) => return Err(Error::InvalidVarint),
44+
}
45+
};
3146

3247
// Validate frame size
33-
let len = u32::from_be_bytes(len_buf.as_ref()[..4].try_into().unwrap()) as usize;
3448
if len > max_message_size as usize {
3549
return Err(Error::RecvTooLarge(len));
3650
}
@@ -43,6 +57,7 @@ pub async fn recv_frame<T: Stream>(stream: &mut T, max_message_size: u32) -> Res
4357
#[cfg(test)]
4458
mod tests {
4559
use super::*;
60+
use bytes::BufMut;
4661
use commonware_runtime::{deterministic, mocks, Runner};
4762
use rand::Rng;
4863

@@ -106,8 +121,9 @@ mod tests {
106121
assert!(result.is_ok());
107122

108123
// Do the reading manually without using recv_frame
109-
let read = stream.recv(vec![0; 4]).await.unwrap();
110-
assert_eq!(read.as_ref(), (buf.len() as u32).to_be_bytes());
124+
// 1024 (MAX_MESSAGE_SIZE) encodes as varint: [0x80, 0x08] (2 bytes)
125+
let read = stream.recv(vec![0; 2]).await.unwrap();
126+
assert_eq!(read.as_ref(), &[0x80, 0x08]); // 1024 as varint
111127
let read = stream
112128
.recv(vec![0; MAX_MESSAGE_SIZE as usize])
113129
.await
@@ -142,8 +158,10 @@ mod tests {
142158
let mut msg = [0u8; MAX_MESSAGE_SIZE as usize];
143159
context.fill(&mut msg);
144160

145-
let mut buf = BytesMut::with_capacity(4 + msg.len());
146-
buf.put_u32(MAX_MESSAGE_SIZE);
161+
// 1024 (MAX_MESSAGE_SIZE) encodes as varint: [0x80, 0x08]
162+
let mut buf = BytesMut::with_capacity(2 + msg.len());
163+
buf.put_u8(0x80);
164+
buf.put_u8(0x08);
147165
buf.extend_from_slice(&msg);
148166
sink.send(buf).await.unwrap();
149167

@@ -160,8 +178,10 @@ mod tests {
160178
let executor = deterministic::Runner::default();
161179
executor.start(|_| async move {
162180
// Manually insert a frame that gives MAX_MESSAGE_SIZE as the size
163-
let mut buf = BytesMut::with_capacity(4);
164-
buf.put_u32(MAX_MESSAGE_SIZE);
181+
// 1024 (MAX_MESSAGE_SIZE) encodes as varint: [0x80, 0x08]
182+
let mut buf = BytesMut::with_capacity(2);
183+
buf.put_u8(0x80);
184+
buf.put_u8(0x08);
165185
sink.send(buf).await.unwrap();
166186

167187
let result = recv_frame(&mut stream, MAX_MESSAGE_SIZE - 1).await;
@@ -172,23 +192,44 @@ mod tests {
172192
}
173193

174194
#[test]
175-
fn test_recv_frame_short_length_prefix() {
195+
fn test_recv_frame_incomplete_varint() {
176196
let (mut sink, mut stream) = mocks::Channel::init();
177197

178198
let executor = deterministic::Runner::default();
179199
executor.start(|_| async move {
180-
// Manually insert a frame with a short length prefix
181-
let mut buf = BytesMut::with_capacity(3);
182-
buf.put_u8(0x00);
183-
buf.put_u8(0x00);
184-
buf.put_u8(0x00);
200+
// Send incomplete varint (continuation bit set but no following byte)
201+
let mut buf = BytesMut::with_capacity(1);
202+
buf.put_u8(0x80); // Continuation bit set, expects more bytes
185203

186204
sink.send(buf).await.unwrap();
187205
drop(sink); // Close the sink to simulate a closed stream
188206

189-
// Expect an error rather than a panic
207+
// Expect an error because varint is incomplete
190208
let result = recv_frame(&mut stream, MAX_MESSAGE_SIZE).await;
191209
assert!(matches!(&result, Err(Error::RecvFailed(_))));
192210
});
193211
}
212+
213+
#[test]
214+
fn test_recv_frame_invalid_varint_overflow() {
215+
let (mut sink, mut stream) = mocks::Channel::init();
216+
217+
let executor = deterministic::Runner::default();
218+
executor.start(|_| async move {
219+
// Send a varint that overflows u32 (more than 5 bytes with continuation bits)
220+
let mut buf = BytesMut::with_capacity(6);
221+
buf.put_u8(0xFF); // 7 bits + continue
222+
buf.put_u8(0xFF); // 7 bits + continue
223+
buf.put_u8(0xFF); // 7 bits + continue
224+
buf.put_u8(0xFF); // 7 bits + continue
225+
buf.put_u8(0xFF); // 5th byte with overflow bits set + continue
226+
buf.put_u8(0x01); // 6th byte
227+
228+
sink.send(buf).await.unwrap();
229+
230+
// Expect an error because varint overflows u32
231+
let result = recv_frame(&mut stream, MAX_MESSAGE_SIZE).await;
232+
assert!(matches!(&result, Err(Error::InvalidVarint)));
233+
});
234+
}
194235
}

0 commit comments

Comments
 (0)