Skip to content

Commit f5061bb

Browse files
authored
backend/curl handle pause related errors (#46)
* backend/curl handle pause related errors * backend/curl wake from loop when unpause fails
1 parent f7759ad commit f5061bb

5 files changed

Lines changed: 53 additions & 27 deletions

File tree

backends/curl/src/async.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl AsyncResponse for CurlAsyncResponse {
8484
let mut buf = vec![];
8585
while let Some(()) = this
8686
.handle
87-
.poll_bytes_async(|data| {
87+
.wait_for_bytes(|data| {
8888
if let Some(max_response_buffer_size) = this.max_response_buffer_size {
8989
if buf.len() + data.len() > max_response_buffer_size as usize {
9090
return Err(NyquestError::ResponseTooLarge);

backends/curl/src/async/handler.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,15 @@ impl EasyCallback for AsyncHandler {
3232
// ... signals an error condition to the library and returns CURLE_WRITE_ERROR.
3333
return Ok(0);
3434
};
35-
{
36-
let mut state = inner.ctx.state.lock().unwrap();
37-
let state = &mut state.state;
35+
let mut state = inner.ctx.state.lock().unwrap();
36+
let state = &mut state.state;
37+
inner.ctx.waker.wake();
38+
if state.data_available() {
39+
Err(WriteError::Pause)
40+
} else {
3841
state.write_data(data);
42+
Ok(data.len())
3943
}
40-
unsafe {
41-
inner.pause.pause_recv();
42-
}
43-
inner.ctx.waker.wake();
44-
Ok(data.len())
4544
}
4645

4746
fn header(&mut self, data: &[u8]) -> bool {
@@ -54,7 +53,9 @@ impl EasyCallback for AsyncHandler {
5453
let state = &mut state.state;
5554
if state.push_header_data(data) {
5655
unsafe {
57-
inner.pause.pause_recv();
56+
if inner.pause.pause_recv().is_err() {
57+
return false;
58+
}
5859
}
5960
}
6061
}

backends/curl/src/async/loop.rs

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub(super) struct RequestHandle {
1919
id: usize,
2020
shared_state: Arc<SharedRequestStates>,
2121
manager: LoopManagerShared,
22+
is_recv_unpause_sent: bool,
2223
}
2324

2425
enum LoopTask {
@@ -79,7 +80,7 @@ impl RequestHandle {
7980
.await
8081
}
8182

82-
pub(super) async fn poll_bytes_async<T>(
83+
pub(super) async fn wait_for_bytes<T>(
8384
&mut self,
8485
cb: impl FnOnce(&mut Vec<u8>) -> nyquest_interface::Result<T>,
8586
) -> nyquest_interface::Result<Option<T>> {
@@ -100,16 +101,20 @@ impl RequestHandle {
100101
cb: impl FnOnce(&mut Vec<u8>) -> nyquest_interface::Result<T>,
101102
) -> Poll<nyquest_interface::Result<Option<T>>> {
102103
let mut state = self.shared_state.state.lock().unwrap();
103-
if !state.state.response_buffer.is_empty() {
104+
if state.state.data_available() {
104105
let res = cb(&mut state.state.response_buffer);
106+
self.is_recv_unpause_sent = false;
105107
return Poll::Ready(res.map(Some));
106108
}
107109
if let RequestResult::Done { res, .. } = std::mem::take(&mut state.result) {
110+
self.is_recv_unpause_sent = false;
108111
return Poll::Ready(res.map(|_| None));
109112
};
110-
self.manager
111-
.dispatch_task(LoopTask::UnpauseRecvHandle(self.id))
112-
.ok();
113+
if !std::mem::replace(&mut self.is_recv_unpause_sent, true) {
114+
self.manager
115+
.dispatch_task(LoopTask::UnpauseRecvHandle(self.id))
116+
.ok();
117+
}
113118
self.shared_state.waker.register(cx.waker());
114119
Poll::Pending
115120
}
@@ -244,6 +249,7 @@ impl LoopManager {
244249
id: res,
245250
shared_state,
246251
manager: inner,
252+
is_recv_unpause_sent: false,
247253
})
248254
}
249255
Err(res) => res,
@@ -333,7 +339,6 @@ fn run_loop(multl_waker_tx: oneshot::Sender<LoopManagerShared>) {
333339
{
334340
return;
335341
}
336-
// TODO: store ctx in Easy2Handle
337342
let mut tasks = Default::default();
338343
let mut last_call = false;
339344
loop {
@@ -408,20 +413,32 @@ fn run_loop(multl_waker_tx: oneshot::Sender<LoopManagerShared>) {
408413
};
409414
}
410415
LoopTask::UnpauseRecvHandle(id) => {
411-
if let Some(easy) = multi.lookup(id) {
412-
// Ignore the error. Also see
413-
// https://github.com/sagebind/isahc/blob/9d1edd475231ad5cfd5842d939db1382dc3a88f5/src/agent/mod.rs#L432
414-
easy.as_raw_easy_mut().unpause_recv().ok();
416+
let Some(mut handle) = multi.lookup(id) else {
417+
continue;
418+
};
419+
let ctx = handle.as_callback_mut().ctx.clone();
420+
if let Err(e) =
421+
handle.with_error_message(|e| e.as_raw_easy_mut().unpause_recv())
422+
{
423+
let res = Err(e.into());
424+
ctx.state.lock().unwrap().result = RequestResult::Done { res, id };
425+
ctx.waker.wake();
415426
}
416427
}
417428
LoopTask::UnpauseSendHandle(id) => {
418-
if let Some(mut easy) = multi.lookup(id) {
429+
let Some(mut handle) = multi.lookup(id) else {
430+
continue;
431+
};
432+
let ctx = handle.as_callback_mut().ctx.clone();
433+
if let Err(e) = handle.with_error_message(|mut e| {
419434
// curl seems buggy with unsized upload multipart streams (i.e. chunked)
420435
// Pausing and unpausing again seems to make it work
421-
easy.as_mut().as_raw_easy_mut().pause_send().ok();
422-
// Ignore the error. Also see
423-
// https://github.com/sagebind/isahc/blob/9d1edd475231ad5cfd5842d939db1382dc3a88f5/src/agent/mod.rs#L432
424-
easy.as_raw_easy_mut().unpause_send().ok();
436+
e.as_mut().as_raw_easy_mut().pause_send()?;
437+
e.as_raw_easy_mut().unpause_send()
438+
}) {
439+
let res = Err(e.into());
440+
ctx.state.lock().unwrap().result = RequestResult::Done { res, id };
441+
ctx.waker.wake();
425442
}
426443
}
427444
LoopTask::DropHandle(id) => {

backends/curl/src/async/pause.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use curl_sys::{CURLPAUSE_RECV, CURLPAUSE_SEND};
22

3+
use crate::curl_ng::{CurlCodeContext, WithCurlCodeContext};
4+
35
pub const _CURLPAUSE_ALL: i32 = CURLPAUSE_RECV | CURLPAUSE_SEND;
46

57
#[derive(Clone, Copy)]
@@ -14,8 +16,9 @@ impl EasyPause {
1416
/// The caller must ensure:
1517
/// 1. The handle is a valid CURL handle.
1618
/// 2. The handle is either within the same thread or we are in a callback.
17-
pub(super) unsafe fn pause_recv(&self) {
18-
curl_sys::curl_easy_pause(self.0, CURLPAUSE_RECV);
19+
pub(super) unsafe fn pause_recv(&self) -> Result<(), CurlCodeContext> {
20+
curl_sys::curl_easy_pause(self.0, CURLPAUSE_RECV)
21+
.with_easy_context("curl_easy_pause recv in callback")
1922
}
2023
}
2124

backends/curl/src/state.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,9 @@ impl RequestState {
4242
// TODO: handle max response buffer size
4343
self.response_buffer.extend_from_slice(data);
4444
}
45+
46+
#[cfg(feature = "async")]
47+
pub(crate) fn data_available(&self) -> bool {
48+
!self.response_buffer.is_empty()
49+
}
4550
}

0 commit comments

Comments
 (0)