Skip to content

Commit e0a8c33

Browse files
authored
backend/curl: fix stream read unlimited buffering (#22)
* curl fix stream read unlimited buffering * nsurlsession fix std::io
1 parent a1edabd commit e0a8c33

5 files changed

Lines changed: 23 additions & 10 deletions

File tree

backends/curl/src/async.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ impl futures_io::AsyncRead for CurlAsyncResponse {
103103
Poll::Ready(match poll_res {
104104
Ok(None) => Ok(0),
105105
Ok(Some(read_len)) => Ok(read_len),
106+
Err(NyquestError::RequestTimeout) => {
107+
return Poll::Ready(Err(io::ErrorKind::TimedOut.into()))
108+
}
106109
Err(NyquestError::Io(e)) => return Poll::Ready(Err(e)),
107110
Err(e) => unreachable!("Unexpected error: {}", e),
108111
})

backends/curl/src/async/loop.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ impl RequestHandle {
128128
pub(super) fn poll_bytes<T>(
129129
&mut self,
130130
cx: &mut Context<'_>,
131-
mut cb: impl FnMut(&mut Vec<u8>) -> nyquest_interface::Result<T>,
131+
cb: impl FnOnce(&mut Vec<u8>) -> nyquest_interface::Result<T>,
132132
) -> Poll<nyquest_interface::Result<Option<T>>> {
133133
let mut state = self.shared_context.state.lock().unwrap();
134134
if !state.0.response_buffer.is_empty() {

backends/curl/src/blocking.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,18 +93,18 @@ impl CurlEasyClient {
9393
impl io::Read for CurlBlockingResponse {
9494
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
9595
let handle = self.handle.handle_mut();
96-
match handle.poll_until_partial_response() {
97-
Ok(()) => {}
98-
Err(NyquestError::Io(e)) => return Err(e),
99-
Err(e) => unreachable!("Unexpected error: {e:?}"),
100-
}
101-
let written = handle.with_response_buffer_mut(|response_buf| {
96+
let res = handle.poll_bytes(|response_buf| {
10297
let len = response_buf.len().min(buf.len());
10398
buf[..len].copy_from_slice(&response_buf[..len]);
10499
response_buf.drain(..len);
105100
len
106101
});
107-
Ok(written)
102+
match res {
103+
Ok(len) => Ok(len),
104+
Err(NyquestError::RequestTimeout) => Err(io::ErrorKind::TimedOut.into()),
105+
Err(NyquestError::Io(e)) => Err(e),
106+
Err(e) => unreachable!("Unexpected error: {}", e),
107+
}
108108
}
109109
}
110110

backends/curl/src/blocking/multi_easy.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,15 +196,22 @@ impl MultiEasy {
196196
.map(|_| ())
197197
}
198198

199-
pub fn poll_until_partial_response(&mut self) -> NyquestResult<()> {
199+
pub fn poll_bytes<T>(&mut self, cb: impl FnOnce(&mut Vec<u8>) -> T) -> NyquestResult<T> {
200+
{
201+
let mut state = self.state.lock().unwrap();
202+
if !state.response_buffer.is_empty() {
203+
return Ok(cb(&mut state.response_buffer));
204+
}
205+
}
200206
self.poll_until(|state| {
201207
let is_empty = state.lock().unwrap().response_buffer.is_empty();
202208
Ok(if is_empty {
203209
ControlFlow::Continue(())
204210
} else {
205211
ControlFlow::Break(())
206212
})
207-
})
213+
})?;
214+
Ok(self.with_response_buffer_mut(cb))
208215
}
209216

210217
pub fn take_response_buffer(&mut self) -> Vec<u8> {

backends/nsurlsession/src/blocking.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ impl std::io::Read for NSUrlSessionBlockingResponse {
4848
Ok(read_len @ 1..) => {
4949
return Ok(read_len);
5050
}
51+
Err(NyquestError::RequestTimeout) => {
52+
return Err(std::io::ErrorKind::TimedOut.into())
53+
}
5154
Err(NyquestError::Io(e)) => return Err(e),
5255
Err(e) => unreachable!("Unexpected error: {e}"),
5356
Ok(0) if inner.shared.is_completed() => return Ok(0),

0 commit comments

Comments
 (0)