Skip to content

Commit b2acba5

Browse files
committed
test chunked encoding
1 parent 169cdda commit b2acba5

5 files changed

Lines changed: 100 additions & 9 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
mod requests;
22
mod responses;
3+
mod scenarios;

nyquest-backend-tests/src/fixtures/responses.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ mod tests {
8787
const PATH: &str = "responses/status_codes";
8888
const STATUS_CODES: [u16; 4] = [400, 404, 500, 502];
8989
let _handle = crate::add_hyper_fixture(PATH, |mut req| async move {
90-
let mut res = Response::default();
90+
let mut res = Response::<Full<Bytes>>::default();
9191
let body = req.body_mut().collect().await.ok().and_then(|bytes| {
9292
let status = String::from_utf8_lossy(&bytes.to_bytes()).parse().ok()?;
9393
StatusCode::from_u16(status).ok()
@@ -140,7 +140,7 @@ mod tests {
140140
const HEADER_NAME: &str = "X-Test-Header";
141141
const HEADER_VALUE: &str = "test-value";
142142
let _handle = crate::add_hyper_fixture(PATH, |_req| async move {
143-
let mut res = Response::default();
143+
let mut res = Response::<Full<Bytes>>::default();
144144
res.headers_mut()
145145
.insert(HEADER_NAME, HEADER_VALUE.parse().unwrap());
146146
(res, Ok(()))
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
mod chunked_encoding;
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#[cfg(test)]
2+
mod tests {
3+
use futures::stream::{self};
4+
use http_body_util::BodyExt;
5+
use hyper::{Method, Response};
6+
use nyquest::Request as NyquestRequest;
7+
8+
use crate::*;
9+
10+
#[test]
11+
fn test_chunked_encoding() {
12+
const PATH: &str = "scenarios/chunked_encoding";
13+
const CHUNKS: [&str; 4] = ["Hello", ", ", "chunked ", "world!"];
14+
15+
let _handle = crate::add_hyper_fixture(PATH, {
16+
move |req| {
17+
async move {
18+
// Create a streaming response body by yielding each chunk with a small delay
19+
let stream = stream::iter(CHUNKS.iter().map(|chunk| {
20+
let chunk = Bytes::copy_from_slice(chunk.as_bytes());
21+
Ok::<_, hyper::Error>(hyper::body::Frame::data(chunk))
22+
}));
23+
24+
// Convert the stream to a boxed body
25+
let body = http_body_util::StreamBody::new(stream).boxed();
26+
let res = Response::new(body);
27+
28+
(res, (req.method() == Method::GET).then_some(()).ok_or(req))
29+
}
30+
}
31+
});
32+
33+
let expected_content = CHUNKS.concat();
34+
let builder = crate::init_builder_blocking().unwrap();
35+
36+
let assertions = |content: &str| {
37+
assert_eq!(content, expected_content);
38+
};
39+
40+
#[cfg(feature = "blocking")]
41+
{
42+
let client = builder.clone().build_blocking().unwrap();
43+
let res = client.request(NyquestRequest::get(PATH)).unwrap();
44+
let content = res.text().unwrap();
45+
assertions(&content);
46+
}
47+
48+
#[cfg(feature = "async")]
49+
{
50+
let content = TOKIO_RT.block_on(async {
51+
let client = builder.build_async().await.unwrap();
52+
let res = client.request(NyquestRequest::get(PATH)).await.unwrap();
53+
res.text().await.unwrap()
54+
});
55+
assertions(&content);
56+
}
57+
}
58+
}

nyquest-backend-tests/src/lib.rs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010
sync::{LazyLock, Mutex, Once},
1111
};
1212

13-
use http_body_util::Full;
13+
use http_body_util::{BodyExt, Full};
1414
use hyper::{
1515
body::{self, Bytes},
1616
server::conn::http1,
@@ -45,7 +45,28 @@ impl Drop for HyperFixtureHandle {
4545
}
4646
}
4747

48-
type FixtureAssertionResult = (Response<Full<Bytes>>, Result<(), Request<body::Incoming>>);
48+
// BoxedBody for supporting streaming/chunked responses
49+
type BoxedBody = http_body_util::combinators::BoxBody<Bytes, hyper::Error>;
50+
51+
// New type allowing both Full<Bytes> and BoxedBody response types
52+
// This makes existing tests compatible with the new changes
53+
type FixtureAssertionResult = (ResponseWrapper, Result<(), Request<body::Incoming>>);
54+
55+
struct ResponseWrapper(Response<BoxedBody>);
56+
57+
impl From<Response<Full<Bytes>>> for ResponseWrapper {
58+
fn from(resp: Response<Full<Bytes>>) -> Self {
59+
let resp = resp.map(|body| body.map_err(|_| unreachable!()).boxed());
60+
ResponseWrapper(resp)
61+
}
62+
}
63+
64+
impl From<Response<BoxedBody>> for ResponseWrapper {
65+
fn from(resp: Response<BoxedBody>) -> Self {
66+
ResponseWrapper(resp)
67+
}
68+
}
69+
4970
struct HyperServiceFixture {
5071
svc: Box<
5172
dyn Fn(
@@ -56,20 +77,28 @@ struct HyperServiceFixture {
5677
>,
5778
assertion_failed_request: Option<Request<body::Incoming>>,
5879
}
80+
5981
static HYPER_SERVICE_FIXTURES: Mutex<BTreeMap<String, HyperServiceFixture>> =
6082
Mutex::new(BTreeMap::new());
6183

62-
fn add_hyper_fixture<Fut: Future<Output = FixtureAssertionResult> + Send + 'static>(
84+
fn add_hyper_fixture<Fut, Resp>(
6385
url: impl Into<String>,
6486
svc_fn: impl Fn(Request<body::Incoming>) -> Fut + Send + Sync + 'static,
65-
) -> HyperFixtureHandle {
87+
) -> HyperFixtureHandle
88+
where
89+
Fut: Future<Output = (Resp, Result<(), Request<body::Incoming>>)> + Send + 'static,
90+
Resp: Into<ResponseWrapper>,
91+
{
6692
let mut url: String = url.into();
6793
if !url.starts_with('/') {
6894
url.insert(0, '/');
6995
}
7096
let svc = Box::new(move |req| {
7197
let fut = svc_fn(req);
72-
Box::pin(async move { fut.await }) as _
98+
Box::pin(async move {
99+
let (resp, result) = fut.await;
100+
(resp.into(), result)
101+
}) as _
73102
});
74103
let fixture = HyperServiceFixture {
75104
svc,
@@ -83,20 +112,22 @@ fn add_hyper_fixture<Fut: Future<Output = FixtureAssertionResult> + Send + 'stat
83112
HyperFixtureHandle(url)
84113
}
85114

86-
async fn handle_service(req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
115+
async fn handle_service(req: Request<body::Incoming>) -> Result<Response<BoxedBody>, Infallible> {
87116
let path = req.uri().path().to_owned();
88117
let fut = {
89118
let services = HYPER_SERVICE_FIXTURES.lock().unwrap();
90119
let fixture = services.get(&*path).unwrap();
91120
(fixture.svc)(req)
92121
};
93122
let (response, result) = fut.await;
123+
94124
if let Err(req) = result {
95125
let mut services = HYPER_SERVICE_FIXTURES.lock().unwrap();
96126
let fixture = services.get_mut(&*path).unwrap();
97127
fixture.assertion_failed_request = Some(req);
98128
}
99-
Ok(response)
129+
130+
Ok(response.0)
100131
}
101132

102133
async fn setup_hyper_impl() -> Result<String, io::Error> {

0 commit comments

Comments
 (0)