Skip to content

Commit 0817715

Browse files
authored
Merge pull request #864 from RuleOfThrees/multiasync
MultiAsync
2 parents 47438c7 + 3e0570e commit 0817715

File tree

16 files changed

+774
-30
lines changed

16 files changed

+774
-30
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ _site/
4444

4545
# Vim
4646
.ycm_extra_conf.py*
47+
*.swp
4748

4849
# VSCode
4950
.vscode/
@@ -52,5 +53,9 @@ _site/
5253
# clangd
5354
.cache/
5455

56+
# compilation database
57+
# used in various editor configurations, such as vim & YcM
58+
compile_commands.json
59+
5560
# macOS
5661
.DS_Store

cpr/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ add_library(cpr
55
async.cpp
66
auth.cpp
77
bearer.cpp
8+
callback.cpp
89
cert_info.cpp
910
cookies.cpp
1011
cprtypes.cpp

cpr/callback.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#include <cpr/callback.h>
2+
#include <curl/curl.h>
3+
#include <functional>
4+
5+
namespace cpr {
6+
7+
void CancellationCallback::SetProgressCallback(ProgressCallback& u_cb) {
8+
user_cb.emplace(std::reference_wrapper{u_cb});
9+
}
10+
bool CancellationCallback::operator()(cpr_pf_arg_t dltotal, cpr_pf_arg_t dlnow, cpr_pf_arg_t ultotal, cpr_pf_arg_t ulnow) const {
11+
const bool cont_operation{!cancellation_state->load()};
12+
return user_cb ? (cont_operation && (*user_cb)(dltotal, dlnow, ultotal, ulnow)) : cont_operation;
13+
}
14+
} // namespace cpr

cpr/session.cpp

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,11 +246,15 @@ void Session::SetWriteCallback(const WriteCallback& write) {
246246

247247
void Session::SetProgressCallback(const ProgressCallback& progress) {
248248
progresscb_ = progress;
249+
if (isCancellable) {
250+
cancellationcb_.SetProgressCallback(progresscb_);
251+
return;
252+
}
249253
#if LIBCURL_VERSION_NUM < 0x072000
250-
curl_easy_setopt(curl_->handle, CURLOPT_PROGRESSFUNCTION, cpr::util::progressUserFunction);
254+
curl_easy_setopt(curl_->handle, CURLOPT_PROGRESSFUNCTION, cpr::util::progressUserFunction<ProgressCallback>);
251255
curl_easy_setopt(curl_->handle, CURLOPT_PROGRESSDATA, &progresscb_);
252256
#else
253-
curl_easy_setopt(curl_->handle, CURLOPT_XFERINFOFUNCTION, cpr::util::progressUserFunction);
257+
curl_easy_setopt(curl_->handle, CURLOPT_XFERINFOFUNCTION, cpr::util::progressUserFunction<ProgressCallback>);
254258
curl_easy_setopt(curl_->handle, CURLOPT_XFERINFODATA, &progresscb_);
255259
#endif
256260
curl_easy_setopt(curl_->handle, CURLOPT_NOPROGRESS, 0L);
@@ -968,4 +972,17 @@ void Session::SetOption(const ReserveSize& reserve_size) { SetReserveSize(reserv
968972
void Session::SetOption(const AcceptEncoding& accept_encoding) { SetAcceptEncoding(accept_encoding); }
969973
void Session::SetOption(AcceptEncoding&& accept_encoding) { SetAcceptEncoding(accept_encoding); }
970974
// clang-format on
975+
976+
void Session::SetCancellationParam(std::shared_ptr<std::atomic_bool> param) {
977+
cancellationcb_ = CancellationCallback{std::move(param)};
978+
isCancellable = true;
979+
#if LIBCURL_VERSION_NUM < 0x072000
980+
curl_easy_setopt(curl_->handle, CURLOPT_PROGRESSFUNCTION, cpr::util::progressUserFunction<CancellationCallback>);
981+
curl_easy_setopt(curl_->handle, CURLOPT_PROGRESSDATA, &cancellationcb_);
982+
#else
983+
curl_easy_setopt(curl_->handle, CURLOPT_XFERINFOFUNCTION, cpr::util::progressUserFunction<CancellationCallback>);
984+
curl_easy_setopt(curl_->handle, CURLOPT_XFERINFODATA, &cancellationcb_);
985+
#endif
986+
curl_easy_setopt(curl_->handle, CURLOPT_NOPROGRESS, 0L);
987+
}
971988
} // namespace cpr

cpr/util.cpp

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,6 @@ size_t writeUserFunction(char* ptr, size_t size, size_t nmemb, const WriteCallba
144144
return (*write)({ptr, size}) ? size : 0;
145145
}
146146

147-
#if LIBCURL_VERSION_NUM < 0x072000
148-
int progressUserFunction(const ProgressCallback* progress, double dltotal, double dlnow, double ultotal, double ulnow) {
149-
#else
150-
int progressUserFunction(const ProgressCallback* progress, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
151-
#endif
152-
return (*progress)(dltotal, dlnow, ultotal, ulnow) ? 0 : 1;
153-
} // namespace cpr::util
154-
155147
int debugUserFunction(CURL* /*handle*/, curl_infotype type, char* data, size_t size, const DebugCallback* debug) {
156148
(*debug)(static_cast<DebugCallback::InfoType>(type), std::string(data, size));
157149
return 0;

include/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ target_sources(cpr PRIVATE
1010
cpr/accept_encoding.h
1111
cpr/api.h
1212
cpr/async.h
13+
cpr/async_wrapper.h
1314
cpr/auth.h
1415
cpr/bearer.h
1516
cpr/body.h

include/cpr/api.h

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <utility>
99

1010
#include "cpr/async.h"
11+
#include "cpr/async_wrapper.h"
1112
#include "cpr/auth.h"
1213
#include "cpr/bearer.h"
1314
#include "cpr/cprtypes.h"
@@ -17,11 +18,10 @@
1718
#include "cpr/response.h"
1819
#include "cpr/session.h"
1920
#include <cpr/filesystem.h>
20-
#include <utility>
2121

2222
namespace cpr {
2323

24-
using AsyncResponse = std::future<Response>;
24+
using AsyncResponse = AsyncWrapper<Response>;
2525

2626
namespace priv {
2727

@@ -85,6 +85,32 @@ void setup_multiperform(MultiPerform& multiperform, Ts&&... ts) {
8585
setup_multiperform_internal<Ts...>(multiperform, std::forward<Ts>(ts)...);
8686
}
8787

88+
using session_action_t = cpr::Response (cpr::Session::*)();
89+
90+
template <session_action_t SessionAction, typename T>
91+
void setup_multiasync(std::vector<AsyncWrapper<Response, true>>& responses, T&& parameters) {
92+
std::shared_ptr<std::atomic_bool> cancellation_state = std::make_shared<std::atomic_bool>(false);
93+
94+
std::function<Response(T)> execFn{[cancellation_state](T params) {
95+
if (cancellation_state->load()) {
96+
return Response{};
97+
}
98+
cpr::Session s{};
99+
s.SetCancellationParam(cancellation_state);
100+
apply_set_option(s, std::forward<T>(params));
101+
return std::invoke(SessionAction, s);
102+
}};
103+
responses.emplace_back(GlobalThreadPool::GetInstance()->Submit(std::move(execFn), std::forward<T>(parameters)), std::move(cancellation_state));
104+
}
105+
106+
template <session_action_t SessionAction, typename T, typename... Ts>
107+
void setup_multiasync(std::vector<AsyncWrapper<Response, true>>& responses, T&& head, Ts&&... tail) {
108+
setup_multiasync<SessionAction>(responses, std::forward<T>(head));
109+
if constexpr (sizeof...(Ts) > 0) {
110+
setup_multiasync<SessionAction>(responses, std::forward<Ts>(tail)...);
111+
}
112+
}
113+
88114
} // namespace priv
89115

90116
// Get methods
@@ -245,7 +271,7 @@ Response Download(std::ofstream& file, Ts&&... ts) {
245271
// Download async method
246272
template <typename... Ts>
247273
AsyncResponse DownloadAsync(fs::path local_path, Ts... ts) {
248-
return std::async(
274+
return AsyncWrapper{std::async(
249275
std::launch::async,
250276
[](fs::path local_path_, Ts... ts_) {
251277
#ifdef CPR_USE_BOOST_FILESYSTEM
@@ -255,7 +281,7 @@ AsyncResponse DownloadAsync(fs::path local_path, Ts... ts) {
255281
#endif
256282
return Download(f, std::move(ts_)...);
257283
},
258-
std::move(local_path), std::move(ts)...);
284+
std::move(local_path), std::move(ts)...)};
259285
}
260286

261287
// Download with user callback
@@ -316,6 +342,55 @@ std::vector<Response> MultiPost(Ts&&... ts) {
316342
return multiperform.Post();
317343
}
318344

345+
template <typename... Ts>
346+
std::vector<AsyncWrapper<Response, true>> MultiGetAsync(Ts&&... ts) {
347+
std::vector<AsyncWrapper<Response, true>> ret{};
348+
priv::setup_multiasync<&cpr::Session::Get>(ret, std::forward<Ts>(ts)...);
349+
return ret;
350+
}
351+
352+
template <typename... Ts>
353+
std::vector<AsyncWrapper<Response, true>> MultiDeleteAsync(Ts&&... ts) {
354+
std::vector<AsyncWrapper<Response, true>> ret{};
355+
priv::setup_multiasync<&cpr::Session::Delete>(ret, std::forward<Ts>(ts)...);
356+
return ret;
357+
}
358+
359+
template <typename... Ts>
360+
std::vector<AsyncWrapper<Response, true>> MultiHeadAsync(Ts&&... ts) {
361+
std::vector<AsyncWrapper<Response, true>> ret{};
362+
priv::setup_multiasync<&cpr::Session::Head>(ret, std::forward<Ts>(ts)...);
363+
return ret;
364+
}
365+
template <typename... Ts>
366+
std::vector<AsyncWrapper<Response, true>> MultiOptionsAsync(Ts&&... ts) {
367+
std::vector<AsyncWrapper<Response, true>> ret{};
368+
priv::setup_multiasync<&cpr::Session::Options>(ret, std::forward<Ts>(ts)...);
369+
return ret;
370+
}
371+
372+
template <typename... Ts>
373+
std::vector<AsyncWrapper<Response, true>> MultiPatchAsync(Ts&&... ts) {
374+
std::vector<AsyncWrapper<Response, true>> ret{};
375+
priv::setup_multiasync<&cpr::Session::Patch>(ret, std::forward<Ts>(ts)...);
376+
return ret;
377+
}
378+
379+
template <typename... Ts>
380+
std::vector<AsyncWrapper<Response, true>> MultiPostAsync(Ts&&... ts) {
381+
std::vector<AsyncWrapper<Response, true>> ret{};
382+
priv::setup_multiasync<&cpr::Session::Post>(ret, std::forward<Ts>(ts)...);
383+
return ret;
384+
}
385+
386+
template <typename... Ts>
387+
std::vector<AsyncWrapper<Response, true>> MultiPutAsync(Ts&&... ts) {
388+
std::vector<AsyncWrapper<Response, true>> ret{};
389+
priv::setup_multiasync<&cpr::Session::Put>(ret, std::forward<Ts>(ts)...);
390+
return ret;
391+
}
392+
393+
319394
} // namespace cpr
320395

321396
#endif

include/cpr/async.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#ifndef CPR_ASYNC_H
22
#define CPR_ASYNC_H
33

4+
#include "async_wrapper.h"
45
#include "singleton.h"
56
#include "threadpool.h"
67

@@ -16,14 +17,14 @@ class GlobalThreadPool : public ThreadPool {
1617
};
1718

1819
/**
19-
* Return a future, calling future.get() will wait task done and return RetType.
20+
* Return a wrapper for a future, calling future.get() will wait until the task is done and return RetType.
2021
* async(fn, args...)
2122
* async(std::bind(&Class::mem_fn, &obj))
2223
* async(std::mem_fn(&Class::mem_fn, &obj))
2324
**/
2425
template <class Fn, class... Args>
2526
auto async(Fn&& fn, Args&&... args) {
26-
return GlobalThreadPool::GetInstance()->Submit(std::forward<Fn>(fn), std::forward<Args>(args)...);
27+
return AsyncWrapper{GlobalThreadPool::GetInstance()->Submit(std::forward<Fn>(fn), std::forward<Args>(args)...)};
2728
}
2829

2930
class async {

include/cpr/async_wrapper.h

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
#ifndef CPR_ASYNC_WRAPPER_H
2+
#define CPR_ASYNC_WRAPPER_H
3+
4+
#include <atomic>
5+
#include <future>
6+
#include <memory>
7+
8+
#include "cpr/response.h"
9+
10+
namespace cpr {
11+
enum class [[nodiscard]] CancellationResult { failure, success, invalid_operation };
12+
13+
/**
14+
* A class template intended to wrap results of async operations (instances of std::future<T>)
15+
* and also provide extended capablilities relaed to these requests, for example cancellation.
16+
*
17+
* The RAII semantics are the same as std::future<T> - moveable, not copyable.
18+
*/
19+
template <typename T, bool isCancellable = false>
20+
class AsyncWrapper {
21+
private:
22+
std::future<T> future;
23+
std::shared_ptr<std::atomic_bool> is_cancelled;
24+
25+
public:
26+
// Constructors
27+
explicit AsyncWrapper(std::future<T>&& f) : future{std::move(f)} {}
28+
AsyncWrapper(std::future<T>&& f, std::shared_ptr<std::atomic_bool>&& cancelledState) : future{std::move(f)}, is_cancelled{std::move(cancelledState)} {}
29+
30+
// Copy Semantics
31+
AsyncWrapper(const AsyncWrapper&) = delete;
32+
AsyncWrapper& operator=(const AsyncWrapper&) = delete;
33+
34+
// Move Semantics
35+
AsyncWrapper(AsyncWrapper&&) noexcept = default;
36+
AsyncWrapper& operator=(AsyncWrapper&&) noexcept = default;
37+
38+
// Destructor
39+
~AsyncWrapper() {
40+
if constexpr (isCancellable) {
41+
if(is_cancelled) {
42+
is_cancelled->store(true);
43+
}
44+
}
45+
}
46+
// These methods replicate the behaviour of std::future<T>
47+
[[nodiscard]] T get() {
48+
if constexpr (isCancellable) {
49+
if (IsCancelled()) {
50+
throw std::logic_error{"Calling AsyncWrapper::get on a cancelled request!"};
51+
}
52+
}
53+
if (!future.valid()) {
54+
throw std::logic_error{"Calling AsyncWrapper::get when the associated future instance is invalid!"};
55+
}
56+
return future.get();
57+
}
58+
59+
[[nodiscard]] bool valid() const noexcept {
60+
if constexpr (isCancellable) {
61+
return !is_cancelled->load() && future.valid();
62+
} else {
63+
return future.valid();
64+
}
65+
}
66+
67+
void wait() const {
68+
if constexpr (isCancellable) {
69+
if (is_cancelled->load()) {
70+
throw std::logic_error{"Calling AsyncWrapper::wait when the associated future is invalid or cancelled!"};
71+
}
72+
}
73+
if (!future.valid()) {
74+
throw std::logic_error{"Calling AsyncWrapper::wait_until when the associated future is invalid!"};
75+
}
76+
future.wait();
77+
}
78+
79+
template <class Rep, class Period>
80+
std::future_status wait_for(const std::chrono::duration<Rep, Period>& timeout_duration) const {
81+
if constexpr (isCancellable) {
82+
if (IsCancelled()) {
83+
throw std::logic_error{"Calling AsyncWrapper::wait_for when the associated future is cancelled!"};
84+
}
85+
}
86+
if (!future.valid()) {
87+
throw std::logic_error{"Calling AsyncWrapper::wait_until when the associated future is invalid!"};
88+
}
89+
return future.wait_for(timeout_duration);
90+
}
91+
92+
template <class Clock, class Duration>
93+
std::future_status wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time) const {
94+
if constexpr (isCancellable) {
95+
if (IsCancelled()) {
96+
throw std::logic_error{"Calling AsyncWrapper::wait_until when the associated future is cancelled!"};
97+
}
98+
}
99+
if (!future.valid()) {
100+
throw std::logic_error{"Calling AsyncWrapper::wait_until when the associated future is invalid!"};
101+
}
102+
return future.wait_until(timeout_time);
103+
}
104+
105+
std::shared_future<T> share() noexcept {
106+
return future.share();
107+
}
108+
109+
// Cancellation-related methods
110+
CancellationResult Cancel() {
111+
if constexpr (!isCancellable) {
112+
return CancellationResult::invalid_operation;
113+
}
114+
if (!future.valid() || is_cancelled->load()) {
115+
return CancellationResult::invalid_operation;
116+
}
117+
is_cancelled->store(true);
118+
return CancellationResult::success;
119+
}
120+
121+
[[nodiscard]] bool IsCancelled() const {
122+
if constexpr (isCancellable) {
123+
return is_cancelled->load();
124+
} else {
125+
return false;
126+
}
127+
}
128+
};
129+
130+
// Deduction guides
131+
template <typename T>
132+
AsyncWrapper(std::future<T>&&) -> AsyncWrapper<T, false>;
133+
134+
template <typename T>
135+
AsyncWrapper(std::future<T>&&, std::shared_ptr<std::atomic_bool>&&) -> AsyncWrapper<T, true>;
136+
137+
} // namespace cpr
138+
139+
140+
#endif

0 commit comments

Comments
 (0)