Skip to content

Commit a2e00ae

Browse files
committed
Add test for buffer io
1 parent 7690dea commit a2e00ae

1 file changed

Lines changed: 360 additions & 0 deletions

File tree

core/tests/buf_stream.rs

Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
use occams_rpc_core::io::*;
2+
use rand::{Rng, RngCore};
3+
use std::future::Future;
4+
use std::io;
5+
use std::sync::{Arc, Mutex};
6+
7+
#[derive(Debug)]
8+
enum MockReadBehavior {
9+
Chunked(Vec<Vec<u8>>),
10+
Randomized { data: Vec<u8>, pos: usize },
11+
}
12+
13+
// A mock stream for read operations only
14+
#[derive(Debug)]
15+
struct MockReadStream {
16+
read_behavior: MockReadBehavior,
17+
}
18+
19+
impl MockReadStream {
20+
fn new_chunked_reader(chunks: Vec<Vec<u8>>) -> Self {
21+
Self { read_behavior: MockReadBehavior::Chunked(chunks) }
22+
}
23+
24+
fn new_chunked_reader_deterministic(chunks: Vec<Vec<u8>>) -> Self {
25+
// For deterministic reads, we just use the same chunked reader
26+
Self { read_behavior: MockReadBehavior::Chunked(chunks) }
27+
}
28+
29+
fn new_randomized_reader(data: Vec<u8>) -> Self {
30+
Self { read_behavior: MockReadBehavior::Randomized { data, pos: 0 } }
31+
}
32+
}
33+
34+
impl AsyncRead for MockReadStream {
35+
fn read(&mut self, buf: &mut [u8]) -> impl Future<Output = io::Result<usize>> + Send {
36+
async move {
37+
match &mut self.read_behavior {
38+
MockReadBehavior::Chunked(chunks) => {
39+
if chunks.is_empty() {
40+
return Ok(0);
41+
}
42+
let chunk = chunks.remove(0);
43+
let n = std::cmp::min(buf.len(), chunk.len());
44+
buf[..n].copy_from_slice(&chunk[..n]);
45+
Ok(n)
46+
}
47+
MockReadBehavior::Randomized { data, pos } => {
48+
if *pos >= data.len() {
49+
return Ok(0); // True EOF
50+
}
51+
let mut rng = rand::thread_rng();
52+
let remaining = data.len() - *pos;
53+
let max_read = std::cmp::min(buf.len(), remaining);
54+
if max_read == 0 {
55+
return Ok(0);
56+
}
57+
let read_size = rng.gen_range(1..=max_read);
58+
59+
buf[..read_size].copy_from_slice(&data[*pos..*pos + read_size]);
60+
*pos += read_size;
61+
Ok(read_size)
62+
}
63+
}
64+
}
65+
}
66+
}
67+
68+
// A mock stream for write operations with buffering support
69+
#[derive(Debug)]
70+
struct MockWriteStream {
71+
write_buffer: Arc<Mutex<Vec<u8>>>,
72+
deterministic: bool, // Flag to control deterministic behavior for writes
73+
}
74+
75+
impl MockWriteStream {
76+
fn new(write_buffer: Arc<Mutex<Vec<u8>>>, deterministic: bool) -> Self {
77+
Self { write_buffer, deterministic }
78+
}
79+
}
80+
81+
impl AsyncRead for MockWriteStream {
82+
fn read(&mut self, _buf: &mut [u8]) -> impl Future<Output = io::Result<usize>> + Send {
83+
async move {
84+
// For write-only stream, always return EOF
85+
Ok(0)
86+
}
87+
}
88+
}
89+
90+
impl AsyncWrite for MockWriteStream {
91+
fn write(&mut self, buf: &[u8]) -> impl Future<Output = io::Result<usize>> + Send {
92+
async move {
93+
if buf.is_empty() {
94+
return Ok(0);
95+
}
96+
97+
let n = if self.deterministic {
98+
// In deterministic mode, always write the full buffer
99+
buf.len()
100+
} else {
101+
// In random mode, sometimes do short writes
102+
let mut rng = rand::thread_rng();
103+
// Using a 50% chance for short writes to make it more likely to occur in tests.
104+
if rng.gen_bool(0.5) { rng.gen_range(1..=buf.len()) } else { buf.len() }
105+
};
106+
107+
self.write_buffer.lock().unwrap().extend_from_slice(&buf[..n]);
108+
Ok(n)
109+
}
110+
}
111+
}
112+
113+
// ==================== DETERMINISTIC TESTS ====================
114+
115+
#[tokio::test]
116+
async fn test_async_read_exact_fixed_chunks() {
117+
// Use fixed, deterministic values
118+
let data_size = 2048;
119+
let mut source_data = vec![0u8; data_size];
120+
// Fill with deterministic pattern
121+
for i in 0..data_size {
122+
source_data[i] = (i % 256) as u8;
123+
}
124+
125+
// Create fixed chunks
126+
let chunks = vec![
127+
source_data[0..512].to_vec(),
128+
source_data[512..1024].to_vec(),
129+
source_data[1024..1536].to_vec(),
130+
source_data[1536..2048].to_vec(),
131+
];
132+
133+
let mut read_stream = MockReadStream::new_chunked_reader_deterministic(chunks);
134+
135+
let mut out = vec![0u8; data_size];
136+
read_stream.read_exact(&mut out).await.unwrap();
137+
assert_eq!(out, source_data);
138+
}
139+
140+
#[tokio::test]
141+
async fn test_async_read_bypass_fixed() {
142+
// Use fixed, deterministic values
143+
let data_size = 300; // > 256 to test bypass
144+
let mut source_data = vec![0u8; data_size];
145+
// Fill with deterministic pattern
146+
for i in 0..data_size {
147+
source_data[i] = (i % 256) as u8;
148+
}
149+
150+
let mut read_stream =
151+
MockReadStream::new_chunked_reader_deterministic(vec![source_data.clone()]);
152+
153+
let mut out = vec![0u8; data_size];
154+
read_stream.read_exact(&mut out).await.unwrap();
155+
assert_eq!(out, source_data);
156+
}
157+
158+
#[tokio::test]
159+
async fn test_async_read_multiple_reads_fixed() {
160+
// Use fixed, deterministic values
161+
let chunk1_data = vec![1u8; 100];
162+
let chunk2_data = vec![2u8; 100];
163+
164+
let chunks = vec![chunk1_data.clone(), chunk2_data.clone()];
165+
let mut read_stream = MockReadStream::new_chunked_reader_deterministic(chunks);
166+
167+
let mut out1 = vec![0u8; 100];
168+
read_stream.read_exact(&mut out1).await.unwrap();
169+
assert_eq!(out1, chunk1_data);
170+
171+
let mut out2 = vec![0u8; 100];
172+
read_stream.read_exact(&mut out2).await.unwrap();
173+
assert_eq!(out2, chunk2_data);
174+
}
175+
176+
#[tokio::test]
177+
async fn test_async_write_all_buffering_deterministic() {
178+
let data_handle = Arc::new(Mutex::new(Vec::new()));
179+
let mock_stream = MockWriteStream::new(data_handle.clone(), true); // deterministic = true
180+
let mut writer = AsyncBufStream::new(mock_stream, 8);
181+
182+
writer.write_all(b"hello").await.unwrap();
183+
{
184+
assert!(data_handle.lock().unwrap().is_empty()); // buffered
185+
}
186+
writer.write_all(b" wo").await.unwrap(); // total 5+3=8, should not flush yet
187+
{
188+
assert!(data_handle.lock().unwrap().is_empty()); // still buffered, pos is 8
189+
}
190+
191+
writer.write_all(b"rld").await.unwrap(); // overflows buffer
192+
// "hello wo" should be flushed.
193+
{
194+
assert_eq!(*data_handle.lock().unwrap(), b"hello wo");
195+
}
196+
// "rld" is in the buffer
197+
198+
writer.flush().await.unwrap();
199+
{
200+
assert_eq!(*data_handle.lock().unwrap(), b"hello world");
201+
}
202+
}
203+
204+
#[tokio::test]
205+
async fn test_async_write_bypass_deterministic() {
206+
let data_handle = Arc::new(Mutex::new(Vec::new()));
207+
let mock_stream = MockWriteStream::new(data_handle.clone(), true); // deterministic = true
208+
let mut writer = AsyncBufStream::new(mock_stream, 8);
209+
210+
writer.write_all(b"abc").await.unwrap();
211+
{
212+
assert!(data_handle.lock().unwrap().is_empty()); // buffered
213+
}
214+
// This write is larger than the buffer, it should bypass it.
215+
writer.write_all(b"this is a long line").await.unwrap();
216+
// The buffer "abc" should be flushed first.
217+
// Then "this is a long line" is written directly.
218+
{
219+
assert_eq!(*data_handle.lock().unwrap(), b"abcthis is a long line");
220+
}
221+
222+
writer.flush().await.unwrap();
223+
{
224+
assert_eq!(*data_handle.lock().unwrap(), b"abcthis is a long line");
225+
}
226+
}
227+
228+
// ==================== RANDOMIZED TESTS ====================
229+
230+
#[tokio::test]
231+
async fn test_async_read_exact_random_chunks() {
232+
let mut rng = rand::thread_rng();
233+
let data_size = rng.gen_range(1024..4096);
234+
let mut source_data = vec![0u8; data_size];
235+
rng.fill_bytes(&mut source_data);
236+
237+
let mut chunks = Vec::new();
238+
let mut remaining_data = &source_data[..];
239+
while !remaining_data.is_empty() {
240+
let chunk_size = rng.gen_range(1..128).min(remaining_data.len());
241+
chunks.push(remaining_data[..chunk_size].to_vec());
242+
remaining_data = &remaining_data[chunk_size..];
243+
}
244+
245+
let mut read_stream = MockReadStream::new_chunked_reader(chunks);
246+
247+
let mut out = vec![0u8; data_size];
248+
read_stream.read_exact(&mut out).await.unwrap();
249+
assert_eq!(out, source_data);
250+
}
251+
252+
#[tokio::test]
253+
async fn test_async_read_bypass_random() {
254+
let mut rng = rand::thread_rng();
255+
let data_size = rng.gen_range(257..512);
256+
let mut source_data = vec![0u8; data_size];
257+
rng.fill_bytes(&mut source_data);
258+
259+
let mut read_stream = MockReadStream::new_chunked_reader(vec![source_data.clone()]);
260+
261+
let mut out = vec![0u8; data_size];
262+
read_stream.read_exact(&mut out).await.unwrap();
263+
assert_eq!(out, source_data);
264+
}
265+
266+
#[tokio::test]
267+
async fn test_async_read_multiple_reads_random() {
268+
let mut rng = rand::thread_rng();
269+
let chunk1_size = rng.gen_range(64..128);
270+
let mut chunk1_data = vec![0u8; chunk1_size];
271+
rng.fill_bytes(&mut chunk1_data);
272+
273+
let chunk2_size = rng.gen_range(64..128);
274+
let mut chunk2_data = vec![0u8; chunk2_size];
275+
rng.fill_bytes(&mut chunk2_data);
276+
277+
let chunks = vec![chunk1_data.clone(), chunk2_data.clone()];
278+
let mut read_stream = MockReadStream::new_chunked_reader(chunks);
279+
280+
let mut out1 = vec![0u8; chunk1_size];
281+
read_stream.read_exact(&mut out1).await.unwrap();
282+
assert_eq!(out1, chunk1_data);
283+
284+
let mut out2 = vec![0u8; chunk2_size];
285+
read_stream.read_exact(&mut out2).await.unwrap();
286+
assert_eq!(out2, chunk2_data);
287+
}
288+
289+
#[tokio::test]
290+
async fn test_random_read_sizes_and_returns() {
291+
let mut rng = rand::thread_rng();
292+
let data_size = rng.gen_range(8192..16384);
293+
let mut source_data = vec![0u8; data_size];
294+
rng.fill_bytes(&mut source_data);
295+
296+
let mut read_stream = MockReadStream::new_randomized_reader(source_data.clone());
297+
298+
let mut result_data = Vec::with_capacity(data_size);
299+
while result_data.len() < data_size {
300+
let read_size = rng.gen_range(1..=512);
301+
let mut temp_buf = vec![0u8; read_size];
302+
match read_stream.read(&mut temp_buf).await {
303+
Ok(0) => break, // EOF
304+
Ok(n) => {
305+
result_data.extend_from_slice(&temp_buf[..n]);
306+
}
307+
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
308+
Err(e) => panic!("Read failed: {}", e),
309+
}
310+
}
311+
312+
assert_eq!(result_data.len(), data_size);
313+
assert_eq!(result_data, source_data);
314+
}
315+
316+
#[tokio::test]
317+
async fn test_async_write_all_buffering_random() {
318+
let data_handle = Arc::new(Mutex::new(Vec::new()));
319+
let mock_stream = MockWriteStream::new(data_handle.clone(), false); // deterministic = false
320+
let mut writer = AsyncBufStream::new(mock_stream, 8);
321+
322+
writer.write_all(b"hello").await.unwrap();
323+
{
324+
assert!(data_handle.lock().unwrap().is_empty()); // buffered
325+
}
326+
writer.write_all(b" wo").await.unwrap(); // total 5+3=8, should not flush yet
327+
{
328+
assert!(data_handle.lock().unwrap().is_empty()); // still buffered, pos is 8
329+
}
330+
331+
writer.write_all(b"rld").await.unwrap(); // overflows buffer
332+
// "hello wo" should be flushed.
333+
{
334+
assert_eq!(*data_handle.lock().unwrap(), b"hello wo");
335+
// "rld" is in the buffer
336+
}
337+
writer.flush().await.unwrap();
338+
{
339+
assert_eq!(*data_handle.lock().unwrap(), b"hello world");
340+
}
341+
}
342+
343+
#[tokio::test]
344+
async fn test_async_write_bypass_random() {
345+
let data_handle = Arc::new(Mutex::new(Vec::new()));
346+
let mock_stream = MockWriteStream::new(data_handle.clone(), false); // deterministic = false
347+
let mut writer = AsyncBufStream::new(mock_stream, 8);
348+
349+
writer.write_all(b"abc").await.unwrap();
350+
{
351+
assert!(data_handle.lock().unwrap().is_empty()); // buffered
352+
}
353+
// This write is larger than the buffer, it should bypass it.
354+
writer.write_all(b"this is a long line").await.unwrap();
355+
// due to short writes, the behavior cannot be assert
356+
writer.flush().await.unwrap();
357+
{
358+
assert_eq!(*data_handle.lock().unwrap(), b"abcthis is a long line");
359+
}
360+
}

0 commit comments

Comments
 (0)