Skip to content

Commit 839afe6

Browse files
committed
Add simulated Read/Write fragmentation test
1 parent 5b9e6ca commit 839afe6

File tree

2 files changed

+74
-3
lines changed

2 files changed

+74
-3
lines changed

tests/common/mod.rs

+47-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use async_std::path::PathBuf;
44
use async_std::sync::Arc;
55
use async_std::task::{Context, Poll};
66
use std::pin::Pin;
7-
use std::sync::Mutex;
7+
use std::sync::{atomic::AtomicBool, Mutex};
88

99
#[derive(Debug, Copy, Clone)]
1010
#[allow(dead_code)]
@@ -19,6 +19,12 @@ pub struct TestCase {
1919
source_fixture: Arc<File>,
2020
expected_fixture: Arc<Mutex<File>>,
2121
result: Arc<Mutex<File>>,
22+
throttle: Arc<Throttle>,
23+
}
24+
25+
enum Throttle {
26+
NoThrottle,
27+
YieldPending(AtomicBool, AtomicBool),
2228
}
2329

2430
impl TestCase {
@@ -68,9 +74,14 @@ impl TestCase {
6874
source_fixture: Arc::new(source_fixture),
6975
expected_fixture: Arc::new(Mutex::new(expected_fixture)),
7076
result,
77+
throttle: Arc::new(Throttle::NoThrottle),
7178
}
7279
}
7380

81+
pub fn throttle(&mut self) {
82+
self.throttle = Arc::new(Throttle::YieldPending(AtomicBool::new(false), AtomicBool::new(false)));
83+
}
84+
7485
pub async fn read_result(&self) -> String {
7586
use async_std::prelude::*;
7687
let mut result = String::new();
@@ -128,13 +139,46 @@ impl Read for TestCase {
128139
cx: &mut Context,
129140
buf: &mut [u8],
130141
) -> Poll<io::Result<usize>> {
131-
Pin::new(&mut &*self.source_fixture).poll_read(cx, buf)
142+
match &*self.throttle {
143+
Throttle::NoThrottle => {
144+
Pin::new(&mut &*self.source_fixture).poll_read(cx, buf)
145+
},
146+
Throttle::YieldPending(read_flag, _) => {
147+
if read_flag.fetch_xor(true, std::sync::atomic::Ordering::SeqCst) {
148+
println!("read yield");
149+
Poll::Pending
150+
} else {
151+
// read partial
152+
let throttle_len = std::cmp::min(buf.len(), 10);
153+
let buf = &mut buf[..throttle_len];
154+
let ret = Pin::new(&mut &*self.source_fixture).poll_read(cx, buf);
155+
println!("read partial 10 {:?} {:?}", ret, buf);
156+
ret
157+
}
158+
},
159+
}
132160
}
133161
}
134162

135163
impl Write for TestCase {
136164
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
137-
Pin::new(&mut &*self.result.lock().unwrap()).poll_write(cx, buf)
165+
match &*self.throttle {
166+
Throttle::NoThrottle => {
167+
Pin::new(&mut &*self.result.lock().unwrap()).poll_write(cx, buf)
168+
},
169+
Throttle::YieldPending(_, write_flag) => {
170+
if write_flag.fetch_xor(true, std::sync::atomic::Ordering::SeqCst) {
171+
println!("write yield");
172+
Poll::Pending
173+
} else {
174+
// write partial
175+
let throttle_len = std::cmp::min(buf.len(), 10);
176+
let buf = &buf[..throttle_len];
177+
println!("write partial 10 {:?}", buf);
178+
Pin::new(&mut &*self.result.lock().unwrap()).poll_write(cx, buf)
179+
}
180+
},
181+
}
138182
}
139183

140184
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {

tests/server.rs

+27
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,33 @@ async fn test_chunked_echo() {
7777
case.assert().await;
7878
}
7979

80+
#[async_std::test]
81+
async fn test_chunked_echo_throttled() {
82+
let mut case = TestCase::new_server(
83+
"fixtures/request-chunked-echo.txt",
84+
"fixtures/response-chunked-echo.txt",
85+
)
86+
.await;
87+
case.throttle();
88+
let addr = "http://example.com";
89+
90+
async_h1::accept(addr, case.clone(), |req| async {
91+
let mut resp = Response::new(StatusCode::Ok);
92+
let ct = req.content_type();
93+
let body: Body = req.into();
94+
resp.set_body(body);
95+
if let Some(ct) = ct {
96+
resp.set_content_type(ct);
97+
}
98+
99+
Ok(resp)
100+
})
101+
.await
102+
.unwrap();
103+
104+
case.assert().await;
105+
}
106+
80107
#[async_std::test]
81108
async fn test_unexpected_eof() {
82109
// We can't predict unexpected EOF, so the response content-length is still 11

0 commit comments

Comments
 (0)