Skip to content

Commit dc45b6a

Browse files
authored
Converted curl/http to be thread-safe.
1 parent 6953358 commit dc45b6a

19 files changed

+662
-352
lines changed

src/manager.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ namespace this_thread {
3131
void event_open(Event* event) { utils::ThreadInternal::poll()->open(event); }
3232
void event_open_and_count(Event* event) { utils::ThreadInternal::poll()->open(event); manager->connection_manager()->inc_socket_count(); }
3333
void event_close_and_count(Event* event) { utils::ThreadInternal::poll()->close(event); manager->connection_manager()->dec_socket_count(); }
34-
void event_closed_and_count(Event* event) { utils::ThreadInternal::poll()->closed(event); manager->connection_manager()->dec_socket_count(); }
34+
void event_closed_and_count(Event* event) { utils::ThreadInternal::poll()->cleanup_closed(event); manager->connection_manager()->dec_socket_count(); }
3535
void event_insert_read(Event* event) { utils::ThreadInternal::poll()->insert_read(event); }
3636
void event_insert_write(Event* event) { utils::ThreadInternal::poll()->insert_write(event); }
3737
void event_insert_error(Event* event) { utils::ThreadInternal::poll()->insert_error(event); }

src/net/curl_get.cc

Lines changed: 91 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "net/curl_get.h"
44

5+
#include <cassert>
56
#include <iostream>
67
#include <curl/easy.h>
78

@@ -11,6 +12,8 @@
1112

1213
namespace torrent::net {
1314

15+
// TODO: Seperate the thread-owned and public variables in different cachelines.
16+
1417
static size_t
1518
curl_get_receive_write(void* data, size_t size, size_t nmemb, void* handle) {
1619
if (!((CurlGet*)handle)->stream()->write((const char*)data, size * nmemb).fail())
@@ -19,70 +22,124 @@ curl_get_receive_write(void* data, size_t size, size_t nmemb, void* handle) {
1922
return 0;
2023
}
2124

22-
CurlGet::CurlGet(CurlStack* s)
23-
: m_stack(s) {
24-
25+
CurlGet::CurlGet() {
2526
m_task_timeout.slot() = [this]() { receive_timeout(); };
2627
}
2728

2829
CurlGet::~CurlGet() {
29-
close();
30+
// CurlStack keeps a shared_ptr to this object, so it will only be destroyed once it is removed
31+
// from the stack.
32+
assert(!is_busy() && "CurlGet::~CurlGet called while still busy.");
33+
}
34+
35+
void
36+
CurlGet::set_url(const std::string& url) {
37+
auto guard = lock_guard();
38+
39+
if (m_handle != nullptr)
40+
throw torrent::internal_error("CurlGet::set_url(...) called on a busy object.");
41+
42+
m_url = url;
3043
}
3144

45+
// Make sure the output stream does not have any bad/failed bits set.
46+
//
47+
// TODO: Make the stream into something you pass to CurlGet, as a unique_ptr, and then have a way to
48+
// receive it in a thread-safe way on success.
3249
void
33-
CurlGet::start() {
34-
if (is_busy())
35-
throw torrent::internal_error("Tried to call CurlGet::start on a busy object.");
50+
CurlGet::set_stream(std::iostream* str) {
51+
auto guard = lock_guard();
52+
53+
if (m_handle != nullptr)
54+
throw torrent::internal_error("CurlGet::set_stream(...) called on a busy object.");
55+
56+
m_stream = str;
57+
}
58+
59+
void
60+
CurlGet::set_timeout(uint32_t seconds) {
61+
auto guard = lock_guard();
62+
63+
if (m_handle != nullptr)
64+
throw torrent::internal_error("CurlGet::set_timeout(...) called on a busy object.");
65+
66+
m_timeout = seconds;
67+
}
68+
69+
// TODO: When we add callback for start/close add an atomic_bool to indicate we've queued the
70+
// action, and use that to tell the user that the http_get is busy or not.
3671

37-
if (m_stream == NULL)
38-
throw torrent::internal_error("Tried to call CurlGet::start without a valid output stream.");
72+
void
73+
CurlGet::prepare_start(CurlStack* stack) {
74+
if (m_handle != nullptr)
75+
throw torrent::internal_error("CurlGet::prepare_start(...) called on a busy object.");
3976

40-
if (!m_stack->is_running())
41-
return;
77+
if (m_stream == nullptr)
78+
throw torrent::internal_error("CurlGet::prepare_start(...) called with a null stream.");
4279

4380
m_handle = curl_easy_init();
81+
m_stack = stack;
4482

45-
if (m_handle == NULL)
83+
if (m_handle == nullptr)
4684
throw torrent::internal_error("Call to curl_easy_init() failed.");
4785

4886
curl_easy_setopt(m_handle, CURLOPT_URL, m_url.c_str());
4987
curl_easy_setopt(m_handle, CURLOPT_WRITEFUNCTION, &curl_get_receive_write);
5088
curl_easy_setopt(m_handle, CURLOPT_WRITEDATA, this);
5189

5290
if (m_timeout != 0) {
53-
curl_easy_setopt(m_handle, CURLOPT_CONNECTTIMEOUT, (long)60);
54-
curl_easy_setopt(m_handle, CURLOPT_TIMEOUT, (long)m_timeout);
55-
56-
// Normally libcurl should handle the timeout. But sometimes that doesn't
57-
// work right so we do a fallback timeout that just aborts the transfer.
58-
torrent::this_thread::scheduler()->update_wait_for_ceil_seconds(&m_task_timeout, 5s + 1s*m_timeout);
91+
curl_easy_setopt(m_handle, CURLOPT_CONNECTTIMEOUT, 60l);
92+
curl_easy_setopt(m_handle, CURLOPT_TIMEOUT, static_cast<long>(m_timeout));
5993
}
6094

61-
curl_easy_setopt(m_handle, CURLOPT_FORBID_REUSE, (long)1);
62-
curl_easy_setopt(m_handle, CURLOPT_NOSIGNAL, (long)1);
63-
curl_easy_setopt(m_handle, CURLOPT_FOLLOWLOCATION, (long)1);
64-
curl_easy_setopt(m_handle, CURLOPT_MAXREDIRS, (long)5);
95+
curl_easy_setopt(m_handle, CURLOPT_FORBID_REUSE, 1l);
96+
curl_easy_setopt(m_handle, CURLOPT_NOSIGNAL, 1l);
97+
curl_easy_setopt(m_handle, CURLOPT_FOLLOWLOCATION, 1l);
98+
curl_easy_setopt(m_handle, CURLOPT_MAXREDIRS, 5l);
6599

66100
curl_easy_setopt(m_handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_WHATEVER);
67-
68101
curl_easy_setopt(m_handle, CURLOPT_ENCODING, "");
69102

70103
m_ipv6 = false;
104+
}
105+
106+
void
107+
CurlGet::activate() {
108+
CURLMcode code = curl_multi_add_handle(m_stack->handle(), m_handle);
109+
110+
if (code != CURLM_OK)
111+
throw torrent::internal_error("CurlGet::activate() error calling curl_multi_add_handle: " + std::string(curl_multi_strerror(code)));
71112

72-
m_stack->add_get(this);
113+
// Normally libcurl should handle the timeout. But sometimes that doesn't
114+
// work right so we do a fallback timeout that just aborts the transfer.
115+
//
116+
// TODO: Verify this is still needed, as it was added to work around during early libcurl
117+
// versions.
118+
if (m_timeout != 0)
119+
torrent::this_thread::scheduler()->update_wait_for_ceil_seconds(&m_task_timeout, 1min + 1s*m_timeout);
120+
121+
m_active = true;
73122
}
74123

75124
void
76-
CurlGet::close() {
77-
torrent::this_thread::scheduler()->erase(&m_task_timeout);
125+
CurlGet::cleanup() {
126+
if (m_handle == nullptr)
127+
throw torrent::internal_error("CurlGet::cleanup() called on a null m_handle.");
78128

79-
if (!is_busy())
80-
return;
129+
if (m_active) {
130+
CURLMcode code = curl_multi_remove_handle(m_stack->handle(), m_handle);
81131

82-
m_stack->remove_get(this);
132+
if (code != CURLM_OK)
133+
throw torrent::internal_error("CurlGet::cleanup() error calling curl_multi_remove_handle: " + std::string(curl_multi_strerror(code)));
134+
135+
torrent::this_thread::scheduler()->erase(&m_task_timeout);
136+
m_active = false;
137+
}
83138

84139
curl_easy_cleanup(m_handle);
85-
m_handle = NULL;
140+
141+
m_handle = nullptr;
142+
m_stack = nullptr;
86143
}
87144

88145
void
@@ -114,9 +171,12 @@ CurlGet::size_total() {
114171

115172
void
116173
CurlGet::receive_timeout() {
117-
return m_stack->transfer_done(m_handle, "Timed out");
174+
// return m_stack->transfer_done(m_handle, "Timed out");
175+
throw internal_error("CurlGet::receive_timeout() called, however this was a hack to work around libcurl not handling timeouts correctly.");
118176
}
119177

178+
// TODO: Verify slots are handled while CurlGet and CurlStack are unlocked.
179+
120180
void
121181
CurlGet::trigger_done() {
122182
::utils::slot_list_call(m_signal_done);

src/net/curl_get.h

Lines changed: 83 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <iosfwd>
55
#include <list>
6+
#include <mutex>
67
#include <string>
78
#include <curl/curl.h>
89

@@ -14,68 +15,124 @@ class CurlStack;
1415

1516
class CurlGet {
1617
public:
17-
CurlGet(CurlStack* s);
18-
virtual ~CurlGet();
18+
CurlGet();
19+
~CurlGet();
1920

20-
bool is_active() const { return m_active; }
21-
bool is_busy() const { return m_handle; }
22-
bool is_using_ipv6() { return m_ipv6; }
21+
bool is_active() const;
22+
bool is_busy() const;
23+
bool is_using_ipv6() const;
2324

24-
void start();
25-
void close();
26-
27-
const std::string& url() const { return m_url; }
28-
std::iostream* stream() { return m_stream; }
29-
uint32_t timeout() const { return m_timeout; }
25+
// TODO: Don't allow getting the stream.
26+
const std::string& url() const;
27+
std::iostream* stream();
28+
uint32_t timeout() const;
3029

3130
// Make sure the output stream does not have any bad/failed bits set.
32-
void set_active(bool a) { m_active = a; }
33-
void set_url(const std::string& url) { m_url = url; }
34-
void set_stream(std::iostream* str) { m_stream = str; }
35-
void set_timeout(uint32_t seconds) { m_timeout = seconds; }
31+
void set_url(const std::string& url);
32+
void set_stream(std::iostream* str);
33+
// TODO: Change to std::chrono.
34+
void set_timeout(uint32_t seconds);
3635

3736
void retry_ipv6();
3837

3938
int64_t size_done();
4039
int64_t size_total();
4140

41+
// TODO: This isn't thread-safe, and HttpGet needs to be fixed.
4242
CURL* handle() { return m_handle; }
43+
CurlStack* curl_stack() { return m_stack; }
4344

4445
// The owner of the Http object must close it as soon as possible
4546
// after receiving the signal, as the implementation may allocate
4647
// limited resources during its lifetime.
4748
auto& signal_done() { return m_signal_done; }
4849
auto& signal_failed() { return m_signal_failed; }
4950

50-
private:
51+
protected:
5152
friend class CurlStack;
5253

53-
CurlGet(const CurlGet&) = delete;
54-
void operator = (const CurlGet&) = delete;
54+
// CurlStack is responsible for locking.
55+
bool is_active_no_locking() const { return m_active; }
56+
57+
void prepare_start(CurlStack* stack);
58+
void activate();
59+
void cleanup();
60+
61+
// We need to lock when changing any of the values publically accessible. This means we don't need
62+
// to lock when changing the underlying vector.
63+
void lock() const { m_mutex.lock(); }
64+
auto lock_guard() const { return std::scoped_lock(m_mutex); }
65+
void unlock() const { m_mutex.unlock(); }
66+
auto& mutex() const { return m_mutex; }
5567

5668
void receive_timeout();
5769

5870
void trigger_done();
5971
void trigger_failed(const std::string& message);
6072

61-
bool m_active{};
73+
private:
74+
CurlGet(const CurlGet&) = delete;
75+
void operator = (const CurlGet&) = delete;
76+
77+
mutable std::mutex m_mutex;
78+
79+
// TODO: Consider locking requirements.
80+
CURL* m_handle{};
81+
CurlStack* m_stack;
82+
6283
bool m_ipv6;
6384

64-
// TODO: Use shared_ptr, or replace with std::function.
65-
std::string m_url;
66-
std::iostream* m_stream{};
85+
torrent::utils::SchedulerEntry m_task_timeout;
86+
87+
// When you change timeout to a different type, update curl_get.cc where it multiplies 1s*m_timeout.
6788

68-
// When you change this to a different type, update curl_get.cc where it multiplies 1s*m_timeout.
89+
bool m_active{};
6990
uint32_t m_timeout{};
7091

71-
CURL* m_handle{};
72-
CurlStack* m_stack;
92+
std::string m_url;
93+
// TODO: Use shared_ptr, or replace with std::function.
94+
std::iostream* m_stream{};
7395

74-
torrent::utils::SchedulerEntry m_task_timeout;
7596
std::list<std::function<void()>> m_signal_done;
7697
std::list<std::function<void(const std::string&)>> m_signal_failed;
7798
};
7899

100+
inline bool
101+
CurlGet::is_active() const {
102+
auto guard = lock_guard();
103+
return m_active;
104+
}
105+
106+
inline bool
107+
CurlGet::is_busy() const {
108+
auto guard = lock_guard();
109+
return m_handle != nullptr;
110+
}
111+
112+
inline bool
113+
CurlGet::is_using_ipv6() const {
114+
auto guard = lock_guard();
115+
return m_ipv6;
116+
}
117+
118+
inline const std::string&
119+
CurlGet::url() const {
120+
auto guard = lock_guard();
121+
return m_url;
122+
}
123+
124+
inline std::iostream*
125+
CurlGet::stream() {
126+
auto guard = lock_guard();
127+
return m_stream;
128+
}
129+
130+
inline uint32_t
131+
CurlGet::timeout() const {
132+
auto guard = lock_guard();
133+
return m_timeout;
134+
}
135+
79136
}
80137

81138
#endif

0 commit comments

Comments
 (0)