Skip to content

Commit 99c0e23

Browse files
committed
Add simulated Read/Write fragmentation test
1 parent 5b9e6ca commit 99c0e23

File tree

3 files changed

+94
-3
lines changed

3 files changed

+94
-3
lines changed

tests/common/mod.rs

+50-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use async_std::path::PathBuf;
44
use async_std::sync::Arc;
55
use async_std::task::{Context, Poll};
66
use std::pin::Pin;
7-
use std::sync::Mutex;
7+
use std::sync::{atomic::AtomicBool, Mutex};
88

99
#[derive(Debug, Copy, Clone)]
1010
#[allow(dead_code)]
@@ -19,6 +19,12 @@ pub struct TestCase {
1919
source_fixture: Arc<File>,
2020
expected_fixture: Arc<Mutex<File>>,
2121
result: Arc<Mutex<File>>,
22+
throttle: Arc<Throttle>,
23+
}
24+
25+
enum Throttle {
26+
NoThrottle,
27+
YieldPending(AtomicBool, AtomicBool),
2228
}
2329

2430
impl TestCase {
@@ -68,9 +74,15 @@ impl TestCase {
6874
source_fixture: Arc::new(source_fixture),
6975
expected_fixture: Arc::new(Mutex::new(expected_fixture)),
7076
result,
77+
throttle: Arc::new(Throttle::NoThrottle),
7178
}
7279
}
7380

81+
#[allow(dead_code)]
82+
pub fn throttle(&mut self) {
83+
self.throttle = Arc::new(Throttle::YieldPending(AtomicBool::new(false), AtomicBool::new(false)));
84+
}
85+
7486
pub async fn read_result(&self) -> String {
7587
use async_std::prelude::*;
7688
let mut result = String::new();
@@ -128,13 +140,48 @@ impl Read for TestCase {
128140
cx: &mut Context,
129141
buf: &mut [u8],
130142
) -> Poll<io::Result<usize>> {
131-
Pin::new(&mut &*self.source_fixture).poll_read(cx, buf)
143+
match &*self.throttle {
144+
Throttle::NoThrottle => {
145+
Pin::new(&mut &*self.source_fixture).poll_read(cx, buf)
146+
},
147+
Throttle::YieldPending(read_flag, _) => {
148+
if read_flag.fetch_xor(true, std::sync::atomic::Ordering::SeqCst) {
149+
println!("read yield");
150+
cx.waker().wake_by_ref();
151+
Poll::Pending
152+
} else {
153+
// read partial
154+
let throttle_len = std::cmp::min(buf.len(), 10);
155+
let buf = &mut buf[..throttle_len];
156+
let ret = Pin::new(&mut &*self.source_fixture).poll_read(cx, buf);
157+
println!("read partial 10 {:?} {:?}", ret, buf);
158+
ret
159+
}
160+
},
161+
}
132162
}
133163
}
134164

135165
impl Write for TestCase {
136166
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
137-
Pin::new(&mut &*self.result.lock().unwrap()).poll_write(cx, buf)
167+
match &*self.throttle {
168+
Throttle::NoThrottle => {
169+
Pin::new(&mut &*self.result.lock().unwrap()).poll_write(cx, buf)
170+
},
171+
Throttle::YieldPending(_, write_flag) => {
172+
if write_flag.fetch_xor(true, std::sync::atomic::Ordering::SeqCst) {
173+
println!("write yield");
174+
cx.waker().wake_by_ref();
175+
Poll::Pending
176+
} else {
177+
// write partial
178+
let throttle_len = std::cmp::min(buf.len(), 10);
179+
let buf = &buf[..throttle_len];
180+
println!("write partial 10 {:?}", buf);
181+
Pin::new(&mut &*self.result.lock().unwrap()).poll_write(cx, buf)
182+
}
183+
},
184+
}
138185
}
139186

140187
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
HTTP/1.1 200 OK
2+
transfer-encoding: chunked
3+
date: {DATE}
4+
content-type: text/plain
5+
6+
1
7+
M
8+
6
9+
ozilla
10+
9
11+
Developer
12+
5
13+
Netwo
14+
2
15+
rk
16+
0
17+

tests/server.rs

+27
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,33 @@ async fn test_chunked_echo() {
7777
case.assert().await;
7878
}
7979

80+
#[async_std::test]
81+
async fn test_chunked_echo_throttled() {
82+
let mut case = TestCase::new_server(
83+
"fixtures/request-chunked-echo.txt",
84+
"fixtures/response-chunked-echo-throttled.txt",
85+
)
86+
.await;
87+
case.throttle();
88+
let addr = "http://example.com";
89+
90+
async_h1::accept(addr, case.clone(), |req| async {
91+
let mut resp = Response::new(StatusCode::Ok);
92+
let ct = req.content_type();
93+
let body: Body = req.into();
94+
resp.set_body(body);
95+
if let Some(ct) = ct {
96+
resp.set_content_type(ct);
97+
}
98+
99+
Ok(resp)
100+
})
101+
.await
102+
.unwrap();
103+
104+
case.assert().await;
105+
}
106+
80107
#[async_std::test]
81108
async fn test_unexpected_eof() {
82109
// We can't predict unexpected EOF, so the response content-length is still 11

0 commit comments

Comments
 (0)