diff --git a/benchmark/source/matrix_multiplication.cpp b/benchmark/source/matrix_multiplication.cpp index bcfa906..b1ba464 100644 --- a/benchmark/source/matrix_multiplication.cpp +++ b/benchmark/source/matrix_multiplication.cpp @@ -97,11 +97,12 @@ TEST_CASE("matrix_multiplication") { } #endif { - dp::thread_pool> pool{}; + dp::thread_pool> pool{}; run_benchmark(&bench, array_size, iterations, "dp::thread_pool - fu2::unique_function", - [&](const std::vector& a, const std::vector& b) -> void { - pool.enqueue_detach(thread_task, a, b); + [&pool, task = thread_task](const std::vector& a, + const std::vector& b) -> void { + pool.enqueue_detach(std::move(task), a, b); }); } diff --git a/cmake/CPM.cmake b/cmake/CPM.cmake index 176748d..c961ea6 100644 --- a/cmake/CPM.cmake +++ b/cmake/CPM.cmake @@ -1,4 +1,4 @@ -set(CPM_DOWNLOAD_VERSION 0.38.1) +set(CPM_DOWNLOAD_VERSION 0.42.0) if(CPM_SOURCE_CACHE) # Expand relative path. This is important if the provided path contains a tilde (~) diff --git a/examples/mandelbrot/source/main.cpp b/examples/mandelbrot/source/main.cpp index 38147ad..8d5c417 100644 --- a/examples/mandelbrot/source/main.cpp +++ b/examples/mandelbrot/source/main.cpp @@ -20,7 +20,7 @@ void mandelbrot_threadpool(int image_width, int image_height, int max_iterations std::cout << "calculating mandelbrot" << std::endl; - dp::thread_pool pool; + dp::thread_pool pool{}; std::vector>> futures; futures.reserve(source.height()); const auto start = std::chrono::steady_clock::now(); diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 48b00e6..216046a 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -9,6 +9,7 @@ #include #include #include +#include #ifdef __has_include # if __has_include() # include @@ -16,11 +17,11 @@ #endif #include "thread_pool/thread_safe_queue.h" +#include "thread_pool/work_stealing_deque.h" namespace dp { namespace details { - -#ifdef __cpp_lib_move_only_function +#if __cpp_lib_move_only_function using default_function_type = std::move_only_function; #else using default_function_type = std::function; @@ -40,12 +41,16 @@ namespace dp { const unsigned int &number_of_threads = std::thread::hardware_concurrency(), InitializationFunction init = [](std::size_t) {}) : tasks_(number_of_threads) { + producer_id_ = std::this_thread::get_id(); std::size_t current_id = 0; for (std::size_t i = 0; i < number_of_threads; ++i) { priority_queue_.push_back(size_t(current_id)); try { threads_.emplace_back([&, id = current_id, init](const std::stop_token &stop_tok) { + tasks_[id].thread_id = std::this_thread::get_id(); + add_thread_id_to_map(tasks_[id].thread_id, id); + // invoke the init function on the thread try { std::invoke(init, id); @@ -58,8 +63,17 @@ namespace dp { tasks_[id].signal.acquire(); do { - // invoke the task - while (auto task = tasks_[id].tasks.pop_front()) { + // execute work from the global queue + // all threads can pull from the top, but the producer thread owns + // the bottom + while (auto task = global_tasks_.pop_top()) { + unassigned_tasks_.fetch_sub(1, std::memory_order_release); + std::invoke(std::move(task.value())); + in_flight_tasks_.fetch_sub(1, std::memory_order_release); + } + + // invoke any tasks from the queue that this thread owns + while (auto task = tasks_[id].tasks.pop_top()) { // decrement the unassigned tasks as the task is now going // to be executed unassigned_tasks_.fetch_sub(1, std::memory_order_release); @@ -71,10 +85,10 @@ namespace dp { in_flight_tasks_.fetch_sub(1, std::memory_order_release); } - // try to steal a task + // try to steal a task from other threads for (std::size_t j = 1; j < tasks_.size(); ++j) { const std::size_t index = (id + j) % tasks_.size(); - if (auto task = tasks_[index].tasks.steal()) { + if (auto task = tasks_[index].tasks.pop_top()) { // steal a task unassigned_tasks_.fetch_sub(1, std::memory_order_release); std::invoke(std::move(task.value())); @@ -87,6 +101,8 @@ namespace dp { // front and waiting for more work } while (unassigned_tasks_.load(std::memory_order_acquire) > 0); + // the thread finished all its work, so we "notify" by putting this + // thread in front in the priority queue priority_queue_.rotate_to_front(id); // check if all tasks are completed and release the barrier (binary // semaphore) @@ -141,7 +157,7 @@ namespace dp { typename ReturnType = std::invoke_result_t> requires std::invocable [[nodiscard]] std::future enqueue(Function f, Args... args) { -#ifdef __cpp_lib_move_only_function +#if __cpp_lib_move_only_function // we can do this in C++23 because we now have support for move only functions std::promise promise; auto future = promise.get_future(); @@ -244,13 +260,45 @@ namespace dp { private: template void enqueue_task(Function &&f) { - auto i_opt = priority_queue_.copy_front_and_rotate_to_back(); - if (!i_opt.has_value()) { - // would only be a problem if there are zero threads - return; + // are we enquing from the producer thread? Or is a worker thread + // enquing to the pool? + auto current_id = std::this_thread::get_id(); + auto is_producer = current_id == producer_id_; + // assign the work + if (is_producer) { + // we push to the global task queue + global_tasks_.emplace(std::forward(f)); + } else { + // This is a violation of the pre-condition. + // We cannot accept work from an arbitrary thread that is not the root producer or a + // worker in the pool + assert(thread_id_to_index_.contains(current_id)); + // assign the task + tasks_[thread_id_to_index_.at(current_id)].tasks.emplace( + std::forward(f)); } - // get the index - auto i = *(i_opt); + + /** + * Now we need to wake up the correct thread. If the thread that is enqueuing the task + * is a worker from the pool, then that thread needs to execute the work. Otherwise we + * need to use the priority queue to use the next available thread. + */ + + // immediately invoked lambda + auto thread_wakeup_index = [&]() -> std::size_t { + if (is_producer) { + auto i_opt = priority_queue_.copy_front_and_rotate_to_back(); + if (!i_opt.has_value()) { + // would only be a problem if there are zero threads + return std::size_t{0}; + } + // get the index + return *(i_opt); + } else { + // get the worker thread id index + return thread_id_to_index_.at(current_id); + } + }(); // increment the unassigned tasks and in flight tasks unassigned_tasks_.fetch_add(1, std::memory_order_release); @@ -262,13 +310,18 @@ namespace dp { } // assign work - tasks_[i].tasks.push_back(std::forward(f)); - tasks_[i].signal.release(); + tasks_[thread_wakeup_index].signal.release(); + } + + void add_thread_id_to_map(std::thread::id thread_id, std::size_t index) { + std::lock_guard lock(thread_id_map_mutex_); + thread_id_to_index_.insert_or_assign(thread_id, index); } struct task_item { - dp::thread_safe_queue tasks{}; + dp::work_stealing_deque tasks{}; std::binary_semaphore signal{0}; + std::thread::id thread_id; }; std::vector threads_; @@ -277,6 +330,11 @@ namespace dp { // guarantee these get zero-initialized std::atomic_int_fast64_t unassigned_tasks_{0}, in_flight_tasks_{0}; std::atomic_bool threads_complete_signal_{false}; + + std::thread::id producer_id_; + dp::work_stealing_deque global_tasks_{}; + std::mutex thread_id_map_mutex_{}; + std::unordered_map thread_id_to_index_{}; }; /** diff --git a/include/thread_pool/work_stealing_deque.h b/include/thread_pool/work_stealing_deque.h new file mode 100644 index 0000000..067ab09 --- /dev/null +++ b/include/thread_pool/work_stealing_deque.h @@ -0,0 +1,209 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace dp { + +#ifdef __cpp_lib_hardware_interference_size + using std::hardware_destructive_interference_size; +#else + // 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ... + inline constexpr std::size_t hardware_destructive_interference_size = + 2 * sizeof(std::max_align_t); +#endif + + /** + * @brief Chase-Lev work stealing queue + * @details Support single producer, multiple consumer. The producer owns the back, consumers + * own the top. Consumers can also take from the top of the queue. The queue is "lock-free" in + * that it does not directly use mutexes or locks. + * + * This is an implementation of the deque described in "Correct and Efficient Work-Stealing for + * Weak Memory Models" and "Dynamic Circular Work-Stealing Deque" by Chase,Lev. + * + * This implementation is taken from the following implementations + * - https://github.com/ConorWilliams/ConcurrentDeque/blob/main/include/riften/deque.hpp + * - https://github.com/taskflow/work-stealing-queue/blob/master/wsq.hpp + * + * I've made some minor edits and changes based on new C++ 23 features and other personal coding + * style choices/preferences. + */ + template + requires std::is_destructible_v + class work_stealing_deque final { + /** + * @brief Simple circular array buffer that can regrow + */ + class circular_buffer final { + std::size_t size_; + std::size_t mask_; + std::unique_ptr buffer_ = std::make_unique_for_overwrite(size_); + + public: + explicit circular_buffer(const std::size_t size) : size_(size), mask_(size - 1) { + // size must be a power of 2 + assert(std::has_single_bit(size)); + } + + [[nodiscard]] std::int64_t capacity() const noexcept { return size_; } + + void store(const std::size_t index, T&& value) noexcept + requires std::is_nothrow_move_assignable_v + { + buffer_[index & mask_] = std::move(value); + } + + T&& load(const std::size_t index) noexcept { + if constexpr (std::is_move_constructible_v || + std::is_nothrow_move_constructible_v) { + return std::move(buffer_[index & mask_]); + } else { + return buffer_[index & mask_]; + } + } + + /** + * @brief Resize the internal buffer. Copies [start, end) to the new buffer. + * @param start The start index + * @param end The end index + */ + circular_buffer* resize(const std::size_t start, const std::size_t end) { + auto temp = new circular_buffer(size_ * 2); + for (std::size_t i = start; i != end; ++i) { + temp->store(i, load(i)); + } + return temp; + } + }; + + constexpr static std::size_t default_count = 1024; + alignas(hardware_destructive_interference_size) std::atomic_int64_t top_; + alignas(hardware_destructive_interference_size) std::atomic_int64_t bottom_; + alignas(hardware_destructive_interference_size) std::atomic buffer_; + + std::vector> garbage_{32}; + + static constexpr std::memory_order relaxed = std::memory_order_relaxed; + static constexpr std::memory_order acquire = std::memory_order_acquire; + static constexpr std::memory_order consume = std::memory_order_consume; + static constexpr std::memory_order release = std::memory_order_release; + static constexpr std::memory_order seq_cst = std::memory_order_seq_cst; + + public: + explicit work_stealing_deque(const std::size_t& capacity = default_count) + : top_(0), bottom_(0), buffer_(new circular_buffer(capacity)) {} + + // queue is non-copyable + work_stealing_deque(work_stealing_deque&) = delete; + work_stealing_deque& operator=(work_stealing_deque&) = delete; + + [[nodiscard]] std::size_t capacity() const { return buffer_.load(relaxed)->capacity(); } + [[nodiscard]] std::size_t size() const { + const auto bottom = bottom_.load(relaxed); + const auto top = top_.load(relaxed); + return static_cast(bottom >= top ? bottom - top : 0); + } + + [[nodiscard]] bool empty() const { return !size(); } + + template + void emplace(Args&&... args) { + // construct first in case it throws + T value(std::forward(args)...); + push_bottom(std::move(value)); + } + + /** + * @brief Push data to the bottom of the queue. + * @details Only the producer thread can push data to the bottom. Consumers should take data + * from the top. See #pop_top. + */ + void push_bottom(T&& value) { + auto bottom = bottom_.load(relaxed); + auto top = top_.load(acquire); + auto* buffer = buffer_.load(relaxed); + + // check if the buffer is full + if (buffer->capacity() < (bottom - top) + 1) { + garbage_.emplace_back(std::exchange(buffer, buffer->resize(top, bottom))); + buffer_.store(buffer, relaxed); + } + + buffer->store(bottom, std::forward(value)); + + // this synchronizes with other acquire fences + // memory operations about this line cannot be reordered + std::atomic_thread_fence(release); + + bottom_.store(bottom + 1, relaxed); + } + + /** + * @brief Steal from the top of the queue + * + * @return std::optional + */ + std::optional pop_top() { + auto top = top_.load(acquire); + // this synchronizes with other release fences + // memory ops below this line cannot be reordered with ops above this line + std::atomic_thread_fence(seq_cst); + auto bottom = bottom_.load(acquire); + + if (top < bottom) { + // non-empty queue + auto item = buffer_.load(consume)->load(top); + + if (!top_.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) { + // failed the race + return std::nullopt; + } + + return item; + } + // empty queue + return std::nullopt; + } + + std::optional take_bottom() { + auto bottom = bottom_.load(relaxed) - 1; + auto* buffer = buffer_.load(relaxed); + + // prevent stealing + bottom_.store(bottom, relaxed); + + // this synchronizes with other release fences + // memory ops below this line cannot be reordered + std::atomic_thread_fence(seq_cst); + + auto top = top_.load(relaxed); + if (top <= bottom) { + // queue isn't empty + if (top == bottom) { + // there is only 1 item left in the queue, we need the CAS to succeed + // since another thread may be trying to steal and could steal before we're able + // to take the bottom + if (!top_.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) { + // failed race + bottom_.store(bottom + 1, relaxed); + return std::nullopt; + } + bottom_.store(bottom + 1, relaxed); + } + return buffer->load(bottom); + } else { + bottom_.store(bottom + 1, relaxed); + return std::nullopt; + } + } + }; +} // namespace dp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9e21faa..6c9a7a3 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -64,7 +64,7 @@ endif() if(NOT TEST_INSTALLED_VERSION) target_compile_options( ${PROJECT_NAME} PRIVATE $<$:-Wall -Wpedantic -Wextra - -Werror> + -Werror -Wno-deprecated -Wno-interference-size> ) target_compile_options(${PROJECT_NAME} PRIVATE $<$:/W4 /WX /wd4324>) target_compile_definitions( diff --git a/test/source/work_stealing_deque.cpp b/test/source/work_stealing_deque.cpp new file mode 100644 index 0000000..3d9b28a --- /dev/null +++ b/test/source/work_stealing_deque.cpp @@ -0,0 +1,208 @@ +#include +#include + +#include +#include +#include +#include + +TEST_CASE("work_stealing_deque: construct queue") { + dp::work_stealing_deque queue{}; +} + +TEST_CASE("work_stealing_deque: construct and grow queue") { + dp::work_stealing_deque queue{2}; + + queue.push_bottom(1); + queue.push_bottom(2); + queue.push_bottom(3); + + REQUIRE_EQ(queue.capacity(), 4); +} + +TEST_CASE("work_stealing_deque: take bottom while queue is empty") { + dp::work_stealing_deque queue{}; + + REQUIRE_EQ(queue.take_bottom(), std::nullopt); +} + +TEST_CASE("work_stealing_deque: take bottom while queue is not empty") { + dp::work_stealing_deque queue{}; + + queue.push_bottom(1); + queue.push_bottom(2); + queue.push_bottom(3); + + REQUIRE_EQ(queue.take_bottom(), 3); + REQUIRE_EQ(queue.take_bottom(), 2); + REQUIRE_EQ(queue.take_bottom(), 1); + REQUIRE_EQ(queue.take_bottom(), std::nullopt); +} + +TEST_CASE("work_stealing_deque: multiple thread steal single item") { + dp::work_stealing_deque queue{}; + + queue.push_bottom(23567); + std::uint64_t value = 0; + + { + auto thread_task = [&queue, &value]() { + if (const auto temp = queue.pop_top()) { + value = temp.value(); + } + }; + std::jthread t1{thread_task}; + std::jthread t2{thread_task}; + std::jthread t3{thread_task}; + std::jthread t4{thread_task}; + } + + REQUIRE_EQ(value, 23567); +} + +TEST_CASE("work_stealing_deque: steal std::function while pushing") { + dp::work_stealing_deque> deque{}; + std::atomic_uint64_t count{0}; + constexpr auto max = 64'000; + auto expected_sum = 0; + std::atomic_uint64_t pending_tasks{0}; + std::deque signals; + signals.emplace_back(0); + signals.emplace_back(0); + signals.emplace_back(0); + signals.emplace_back(0); + + auto supply_task = [&] { + for (auto i = 0; i < max; i++) { + deque.push_bottom([&count, i]() { count += i; }); + expected_sum += i; + pending_tasks.fetch_add(1, std::memory_order_release); + // wake all threads + if ((i + 1) % 8000 == 0) { + for (auto& signal : signals) signal.release(); + } + } + }; + + auto task = [&](int id) { + signals[id].acquire(); + while (pending_tasks.load(std::memory_order_acquire) > 0) { + auto value = deque.pop_top(); + if (value.has_value()) { + auto temp = std::move(value.value()); + std::invoke(temp); + pending_tasks.fetch_sub(1, std::memory_order_release); + } + } + }; + + { + std::jthread supplier(supply_task); + std::jthread t1(task, 0); + std::jthread t2(task, 1); + std::jthread t3(task, 2); + std::jthread t4(task, 3); + } + + REQUIRE_EQ(count.load(), expected_sum); +} + +class move_only { + int private_value_ = 2; + + public: + move_only() = default; + ~move_only() = default; + move_only(move_only&) = delete; + move_only(move_only&& other) noexcept { private_value_ = std::move(other.private_value_); } + move_only& operator=(move_only&) = delete; + move_only& operator=(move_only&& other) noexcept { + private_value_ = std::move(other.private_value_); + return *this; + } + [[nodiscard]] int secret() const { return private_value_; } +}; + +TEST_CASE("work_stealing_deque: store move only types") { + move_only mv_only{}; + dp::work_stealing_deque deque{}; + deque.push_bottom(std::move(mv_only)); + + const auto value = deque.take_bottom(); + REQUIRE(value.has_value()); + REQUIRE_NE(value->secret(), 2); +} + +TEST_CASE("work_stealing_deque: steal move only type") { + move_only mv_only{}; + dp::work_stealing_deque queue{}; + queue.push_bottom(std::move(mv_only)); + std::optional value = std::nullopt; + { + auto thread_task = [&queue, &value]() { + if (auto temp = queue.pop_top()) { + value.emplace(std::move(temp.value())); + } + }; + + std::jthread t1{thread_task}; + std::jthread t2{thread_task}; + std::jthread t3{thread_task}; + std::jthread t4{thread_task}; + } + + REQUIRE(value.has_value()); + REQUIRE_NE(value->secret(), 2); +} + +#if __cpp_lib_move_only_function + +TEST_CASE("work_stealing_deque: steal std::move_only_function while pushing") { + dp::work_stealing_deque> deque{}; + std::atomic_uint64_t count{0}; + constexpr auto max = 64'000; + auto expected_sum = 0; + std::atomic_uint64_t pending_tasks{0}; + std::deque signals; + signals.emplace_back(0); + signals.emplace_back(0); + signals.emplace_back(0); + signals.emplace_back(0); + + auto supply_task = [&] { + for (auto i = 0; i < max; i++) { + deque.push_bottom([&count, i]() { count += i; }); + expected_sum += i; + pending_tasks.fetch_add(1, std::memory_order_release); + // wake all threads + if (i % 1000 == 0) { + for (auto& signal : signals) signal.release(); + } + } + }; + + auto task = [&](int id) { + signals[id].acquire(); + while (pending_tasks.load(std::memory_order_acquire) > 0) { + auto value = deque.pop_top(); + if (value.has_value()) { + auto temp = std::move(value.value()); + if (temp) { + std::invoke(value.value()); + pending_tasks.fetch_sub(1, std::memory_order_release); + } + } + } + }; + + { + std::jthread supplier(supply_task); + std::jthread t1(task, 0); + std::jthread t2(task, 1); + std::jthread t3(task, 2); + std::jthread t4(task, 3); + } + + REQUIRE_EQ(count.load(), expected_sum); +} +#endif