Skip to content

Commit 6f68297

Browse files
committed
fix: eliminate dataraces in thread_pool, queues and kqueue
1 parent d07c78d commit 6f68297

4 files changed

Lines changed: 15 additions & 8 deletions

File tree

include/dpp/queues.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,11 @@ class DPP_EXPORT request_concurrency_queue {
465465
*/
466466
std::shared_mutex in_mutex;
467467

468+
/**
469+
* @brief Removals queue mutex thread safety.
470+
*/
471+
std::mutex rem_mutex;
472+
468473
/**
469474
* @brief Inbound queue timer. The timer is called every second,
470475
* and when it wakes up it checks for requests pending to be sent in the queue.

src/dpp/queues.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ void populate_result(const std::string &url, cluster* owner, http_request_comple
153153
/* Returns true if the request has been made */
154154
bool http_request::is_completed()
155155
{
156+
// For some odd reason, BSD requires this to have a lock to prevent data races even though `completed` is std::atomic.
157+
std::unique_lock<std::mutex> lock(this_captured_mutex);
156158
return completed;
157159
}
158160

@@ -281,8 +283,8 @@ http_request_completion_t http_request::run(request_concurrency_queue* processor
281283
{
282284
std::lock_guard<std::mutex> lock(this_captured_mutex);
283285
this_captured = false;
286+
this_captured_signal.notify_all();
284287
}
285-
this_captured_signal.notify_all();
286288
});
287289
}
288290
);
@@ -317,7 +319,7 @@ request_concurrency_queue::request_concurrency_queue(class cluster* owner, class
317319
tick_and_deliver_requests(in_index);
318320
/* Clear pending removals in the removals queue */
319321
if (time(nullptr) % 90 == 0) {
320-
std::scoped_lock lock1{in_mutex};
322+
std::scoped_lock lock1{rem_mutex};
321323
for (auto it = removals.cbegin(); it != removals.cend();) {
322324
if ((*it)->is_completed()) {
323325
it = removals.erase(it);
@@ -409,6 +411,7 @@ void request_concurrency_queue::tick_and_deliver_requests(uint32_t index)
409411
{
410412
/* Find the owned pointer in requests_in */
411413
std::scoped_lock lock1{in_mutex};
414+
std::scoped_lock lock2{rem_mutex};
412415

413416
const std::string &key = request_view->endpoint;
414417
auto [begin, end] = std::equal_range(requests_in.begin(), requests_in.end(), key, compare_request{});

src/dpp/socketengines/kqueue.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
108108

109109
if ((eh->flags & WANT_DELETION) != 0L) {
110110
remove_socket(kev.ident);
111+
std::lock_guard<std::shared_mutex> lg(this->fds_mutex);
111112
fds.erase(kev.ident);
112113
}
113114
}

src/dpp/thread_pool.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,20 +66,18 @@ thread_pool::~thread_pool() {
6666
{
6767
std::unique_lock<std::mutex> lock(queue_mutex);
6868
stop = true;
69+
cv.notify_all();
6970
}
7071

71-
cv.notify_all();
7272
for (auto &thread: threads) {
7373
thread.join();
7474
}
7575
}
7676

7777
void thread_pool::enqueue(thread_pool_task task) {
78-
{
79-
std::unique_lock<std::mutex> lock(queue_mutex);
80-
tasks.emplace(std::move(task));
81-
}
78+
std::unique_lock<std::mutex> lock(queue_mutex);
79+
tasks.emplace(std::move(task));
8280
cv.notify_one();
8381
}
8482

85-
}
83+
}

0 commit comments

Comments
 (0)