Skip to content
59 changes: 41 additions & 18 deletions benchmarks/rpc/WorkQueueBenchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@

#include <benchmark/benchmark.h>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/json/object.hpp>

#include <algorithm>
#include <atomic>
#include <cassert>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <mutex>
#include <thread>
#include <vector>

using namespace rpc;
using namespace util::config;
Expand Down Expand Up @@ -75,36 +80,54 @@ benchmarkWorkQueue(benchmark::State& state)
{
init();

auto const total = static_cast<size_t>(state.range(0));
auto const numThreads = static_cast<uint32_t>(state.range(1));
auto const maxSize = static_cast<uint32_t>(state.range(2));
auto const delayMs = static_cast<uint32_t>(state.range(3));
auto const queueThreads = static_cast<uint32_t>(state.range(0));
auto const clientThreads = static_cast<uint32_t>(state.range(1));
auto const itemsPerThread = static_cast<uint32_t>(state.range(2));
auto const maxSize = static_cast<uint32_t>(state.range(3));
auto const delayMs = static_cast<uint32_t>(state.range(4));

for (auto _ : state) {
std::atomic_size_t totalExecuted = 0uz;
std::atomic_size_t totalQueued = 0uz;

state.PauseTiming();
WorkQueue queue(numThreads, maxSize);
WorkQueue queue(queueThreads, maxSize);
state.ResumeTiming();

for (auto i = 0uz; i < total; ++i) {
totalQueued += static_cast<std::size_t>(queue.postCoro(
[&delayMs, &totalExecuted](auto yield) {
++totalExecuted;

boost::asio::steady_timer timer(yield.get_executor(), std::chrono::milliseconds{delayMs});
timer.async_wait(yield);
},
/* isWhiteListed = */ false
));
std::vector<std::thread> threads;
threads.reserve(clientThreads);

for (auto t = 0uz; t < clientThreads; ++t) {
threads.emplace_back([&] {
for (auto i = 0uz; i < itemsPerThread; ++i) {
totalQueued += static_cast<std::size_t>(queue.postCoro(
[&delayMs, &totalExecuted](auto yield) {
++totalExecuted;

boost::asio::steady_timer timer(yield.get_executor(), std::chrono::milliseconds{delayMs});
timer.async_wait(yield);

std::this_thread::sleep_for(std::chrono::microseconds{10});
},
/* isWhiteListed = */ false
));
}
});
}

for (auto& t : threads)
t.join();

queue.stop();

ASSERT(totalExecuted == totalQueued, "Totals don't match");
ASSERT(totalQueued <= total, "Queued more than requested");
ASSERT(totalQueued >= maxSize, "Queued less than maxSize");
ASSERT(totalQueued <= itemsPerThread * clientThreads, "Queued more than requested");

if (maxSize == 0) {
ASSERT(totalQueued == itemsPerThread * clientThreads, "Queued exactly the expected amount");
} else {
ASSERT(totalQueued >= std::min(maxSize, itemsPerThread * clientThreads), "Queued less than expected");
}
}
}

Expand All @@ -118,5 +141,5 @@ benchmarkWorkQueue(benchmark::State& state)
*/
// TODO: figure out what happens on 1 thread
BENCHMARK(benchmarkWorkQueue)
->ArgsProduct({{1'000, 10'000, 100'000}, {2, 4, 8}, {0, 5'000}, {10, 100, 250}})
->ArgsProduct({{2, 4, 8, 16}, {4, 8, 16}, {1'000, 10'000}, {0, 5'000}, {10, 100, 250}})
->Unit(benchmark::kMillisecond);
140 changes: 60 additions & 80 deletions src/rpc/WorkQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"

#include <boost/asio/post.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/json/object.hpp>

#include <chrono>
Expand All @@ -39,6 +37,27 @@

namespace rpc {

void
WorkQueue::OneTimeCallable::setCallable(std::function<void()> func)
{
func_ = std::move(func);
}

void
WorkQueue::OneTimeCallable::operator()()
{
if (not called_) {
func_();
called_ = true;
}
}

WorkQueue::OneTimeCallable::
operator bool() const
{
return func_.operator bool();
}

WorkQueue::WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize)
: queued_{PrometheusService::counterInt(
"work_queue_queued_total_number",
Expand All @@ -56,8 +75,6 @@ WorkQueue::WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t
"The current number of tasks in the queue"
)}
, ioc_{numWorkers}
, strand_{ioc_.get_executor()}
, waitTimer_(ioc_)
{
if (maxSize != 0)
maxSize_ = maxSize;
Expand All @@ -77,12 +94,14 @@ WorkQueue::~WorkQueue()
void
WorkQueue::startProcessing()
{
util::spawn(strand_, [this](auto yield) {
ASSERT(not hasDispatcher_, "Dispatcher already running");
ASSERT(not processingStarted_, "Attempt to start processing work queue more than once");
processingStarted_ = true;

hasDispatcher_ = true;
dispatcherLoop(yield);
});
// Spawn workers for all tasks that were queued before processing started
auto const numTasks = size();
for (auto i = 0uz; i < numTasks; ++i) {
util::spawn(ioc_, [this](auto yield) { executeTask(std::chrono::system_clock::now(), yield); });
}
}

bool
Expand All @@ -98,93 +117,30 @@ WorkQueue::postCoro(TaskType func, bool isWhiteListed, Priority priority)
return false;
}

++curSize_.get();
auto needsWakeup = false;
auto const queuedAt = std::chrono::system_clock::now();

{
auto state = dispatcherState_.lock();

needsWakeup = std::exchange(state->isIdle, false);

auto state = queueState_.lock();
state->push(priority, std::move(func));
}

if (needsWakeup)
boost::asio::post(strand_, [this] { waitTimer_.cancel(); });

return true;
}

void
WorkQueue::dispatcherLoop(boost::asio::yield_context yield)
{
LOG(log_.info()) << "WorkQueue dispatcher starting";

// all ongoing tasks must be completed before stopping fully
while (not stopping_ or size() > 0) {
std::optional<TaskType> task;

{
auto state = dispatcherState_.lock();

if (state->empty()) {
state->isIdle = true;
} else {
task = state->popNext();
}
}

if (not stopping_ and not task.has_value()) {
waitTimer_.expires_at(std::chrono::steady_clock::time_point::max());
boost::system::error_code ec;
waitTimer_.async_wait(yield[ec]);
} else if (task.has_value()) {
util::spawn(
ioc_,
[this, spawnedAt = std::chrono::system_clock::now(), task = std::move(*task)](auto yield) mutable {
auto const takenAt = std::chrono::system_clock::now();
auto const waited =
std::chrono::duration_cast<std::chrono::microseconds>(takenAt - spawnedAt).count();

++queued_.get();
durationUs_.get() += waited;
LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size();

task(yield);

--curSize_.get();
}
);
}
}
++curSize_.get();

LOG(log_.info()) << "WorkQueue dispatcher shutdown requested - time to execute onTasksComplete";
if (not processingStarted_)
return true;

{
auto onTasksComplete = onQueueEmpty_.lock();
ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true.");
onTasksComplete->operator()();
}
util::spawn(ioc_, [this, queuedAt](auto yield) { executeTask(queuedAt, yield); });

LOG(log_.info()) << "WorkQueue dispatcher finished";
return true;
}

void
WorkQueue::requestStop(std::function<void()> onQueueEmpty)
{
auto handler = onQueueEmpty_.lock();
*handler = std::move(onQueueEmpty);
handler->setCallable(std::move(onQueueEmpty));

stopping_ = true;
auto needsWakeup = false;

{
auto state = dispatcherState_.lock();
needsWakeup = std::exchange(state->isIdle, false);
}

if (needsWakeup)
boost::asio::post(strand_, [this] { waitTimer_.cancel(); });
}

void
Expand All @@ -194,6 +150,10 @@ WorkQueue::stop()
requestStop();

ioc_.join();

auto onTasksComplete = onQueueEmpty_.lock();
ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true.");
onTasksComplete->operator()();
}

WorkQueue
Expand Down Expand Up @@ -227,4 +187,24 @@ WorkQueue::size() const
return curSize_.get().value();
}

void
WorkQueue::executeTask(std::chrono::system_clock::time_point opTime, boost::asio::yield_context yield)
{
std::optional<TaskType> task;
{
auto state = queueState_.lock();
task = state->popNext();
}

auto const takenAt = std::chrono::system_clock::now();
auto const waited = std::chrono::duration_cast<std::chrono::microseconds>(takenAt - opTime).count();

++queued_.get();
durationUs_.get() += waited;
LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size();

task->operator()(yield);
--curSize_.get();
}

} // namespace rpc
32 changes: 20 additions & 12 deletions src/rpc/WorkQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Gauge.hpp"

#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/json.hpp>
#include <boost/json/object.hpp>

#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <functional>
Expand Down Expand Up @@ -76,11 +73,10 @@ class WorkQueue : public Reportable {
};

private:
struct DispatcherState {
struct QueueState {
QueueType high;
QueueType normal;

bool isIdle = false;
size_t highPriorityCounter = 0;

void
Expand Down Expand Up @@ -133,14 +129,26 @@ class WorkQueue : public Reportable {

util::Logger log_{"RPC"};
boost::asio::thread_pool ioc_;
boost::asio::strand<boost::asio::thread_pool::executor_type> strand_;
bool hasDispatcher_ = false;

std::atomic_bool stopping_;
std::atomic_bool processingStarted_{false};

util::Mutex<std::function<void()>> onQueueEmpty_;
util::Mutex<DispatcherState> dispatcherState_;
boost::asio::steady_timer waitTimer_;
class OneTimeCallable {
std::function<void()> func_;
bool called_{false};

public:
void
setCallable(std::function<void()> func);

void
operator()();

explicit
operator bool() const;
};
util::Mutex<OneTimeCallable> onQueueEmpty_;
util::Mutex<QueueState> queueState_;

public:
struct DontStartProcessingTag {};
Expand Down Expand Up @@ -234,7 +242,7 @@ class WorkQueue : public Reportable {

private:
void
dispatcherLoop(boost::asio::yield_context yield);
executeTask(std::chrono::system_clock::time_point opTime, boost::asio::yield_context yield);
};

} // namespace rpc
Loading
Loading