Skip to content
61 changes: 43 additions & 18 deletions benchmarks/rpc/WorkQueueBenchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@

#include <benchmark/benchmark.h>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/json/object.hpp>

#include <algorithm>
#include <atomic>
#include <cassert>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <mutex>
#include <thread>
#include <vector>

using namespace rpc;
using namespace util::config;
Expand Down Expand Up @@ -75,36 +80,56 @@ benchmarkWorkQueue(benchmark::State& state)
{
init();

auto const total = static_cast<size_t>(state.range(0));
auto const numThreads = static_cast<uint32_t>(state.range(1));
auto const maxSize = static_cast<uint32_t>(state.range(2));
auto const delayMs = static_cast<uint32_t>(state.range(3));
auto const wqThreads = static_cast<uint32_t>(state.range(0));
auto const maxQueueSize = static_cast<uint32_t>(state.range(1));
auto const clientThreads = static_cast<uint32_t>(state.range(2));
auto const itemsPerClient = static_cast<uint32_t>(state.range(3));
auto const clientProcessingMs = static_cast<uint32_t>(state.range(4));

for (auto _ : state) {
std::atomic_size_t totalExecuted = 0uz;
std::atomic_size_t totalQueued = 0uz;

state.PauseTiming();
WorkQueue queue(numThreads, maxSize);
WorkQueue queue(wqThreads, maxQueueSize);
state.ResumeTiming();

for (auto i = 0uz; i < total; ++i) {
totalQueued += static_cast<std::size_t>(queue.postCoro(
[&delayMs, &totalExecuted](auto yield) {
++totalExecuted;

boost::asio::steady_timer timer(yield.get_executor(), std::chrono::milliseconds{delayMs});
timer.async_wait(yield);
},
/* isWhiteListed = */ false
));
std::vector<std::thread> threads;
threads.reserve(clientThreads);

for (auto t = 0uz; t < clientThreads; ++t) {
threads.emplace_back([&] {
for (auto i = 0uz; i < itemsPerClient; ++i) {
totalQueued += static_cast<std::size_t>(queue.postCoro(
[&clientProcessingMs, &totalExecuted](auto yield) {
++totalExecuted;

boost::asio::steady_timer timer(
yield.get_executor(), std::chrono::milliseconds{clientProcessingMs}
);
timer.async_wait(yield);

std::this_thread::sleep_for(std::chrono::microseconds{10});
},
/* isWhiteListed = */ false
));
}
});
}

for (auto& t : threads)
t.join();

queue.stop();

ASSERT(totalExecuted == totalQueued, "Totals don't match");
ASSERT(totalQueued <= total, "Queued more than requested");
ASSERT(totalQueued >= maxSize, "Queued less than maxSize");
ASSERT(totalQueued <= itemsPerClient * clientThreads, "Queued more than requested");

if (maxQueueSize == 0) {
ASSERT(totalQueued == itemsPerClient * clientThreads, "Queued exactly the expected amount");
} else {
ASSERT(totalQueued >= std::min(maxQueueSize, itemsPerClient * clientThreads), "Queued less than expected");
}
}
}

Expand All @@ -118,5 +143,5 @@ benchmarkWorkQueue(benchmark::State& state)
*/
// TODO: figure out what happens on 1 thread
BENCHMARK(benchmarkWorkQueue)
->ArgsProduct({{1'000, 10'000, 100'000}, {2, 4, 8}, {0, 5'000}, {10, 100, 250}})
->ArgsProduct({{2, 4, 8, 16}, {0, 5'000}, {4, 8, 16}, {1'000, 10'000}, {10, 100, 250}})
->Unit(benchmark::kMillisecond);
147 changes: 66 additions & 81 deletions src/rpc/WorkQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"

#include <boost/asio/post.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/json/object.hpp>

#include <chrono>
Expand All @@ -39,6 +37,27 @@

namespace rpc {

void
WorkQueue::OneTimeCallable::setCallable(std::function<void()> func)
{
func_ = std::move(func);
}

void
WorkQueue::OneTimeCallable::operator()()
{
if (not called_) {
func_();
called_ = true;
}
}

WorkQueue::OneTimeCallable::
operator bool() const
{
return func_.operator bool();
}

WorkQueue::WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize)
: queued_{PrometheusService::counterInt(
"work_queue_queued_total_number",
Expand All @@ -56,8 +75,6 @@ WorkQueue::WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t
"The current number of tasks in the queue"
)}
, ioc_{numWorkers}
, strand_{ioc_.get_executor()}
, waitTimer_(ioc_)
{
if (maxSize != 0)
maxSize_ = maxSize;
Expand All @@ -77,12 +94,14 @@ WorkQueue::~WorkQueue()
void
WorkQueue::startProcessing()
{
util::spawn(strand_, [this](auto yield) {
ASSERT(not hasDispatcher_, "Dispatcher already running");
ASSERT(not processingStarted_, "Attempt to start processing work queue more than once");
processingStarted_ = true;

hasDispatcher_ = true;
dispatcherLoop(yield);
});
// Spawn workers for all tasks that were queued before processing started
auto const numTasks = size();
for (auto i = 0uz; i < numTasks; ++i) {
util::spawn(ioc_, [this](auto yield) { executeTask(yield); });
}
}

bool
Expand All @@ -98,93 +117,28 @@ WorkQueue::postCoro(TaskType func, bool isWhiteListed, Priority priority)
return false;
}

++curSize_.get();
auto needsWakeup = false;

{
auto state = dispatcherState_.lock();

needsWakeup = std::exchange(state->isIdle, false);

auto state = queueState_.lock();
state->push(priority, std::move(func));
}

if (needsWakeup)
boost::asio::post(strand_, [this] { waitTimer_.cancel(); });

return true;
}

void
WorkQueue::dispatcherLoop(boost::asio::yield_context yield)
{
LOG(log_.info()) << "WorkQueue dispatcher starting";

// all ongoing tasks must be completed before stopping fully
while (not stopping_ or size() > 0) {
std::optional<TaskType> task;

{
auto state = dispatcherState_.lock();

if (state->empty()) {
state->isIdle = true;
} else {
task = state->popNext();
}
}

if (not stopping_ and not task.has_value()) {
waitTimer_.expires_at(std::chrono::steady_clock::time_point::max());
boost::system::error_code ec;
waitTimer_.async_wait(yield[ec]);
} else if (task.has_value()) {
util::spawn(
ioc_,
[this, spawnedAt = std::chrono::system_clock::now(), task = std::move(*task)](auto yield) mutable {
auto const takenAt = std::chrono::system_clock::now();
auto const waited =
std::chrono::duration_cast<std::chrono::microseconds>(takenAt - spawnedAt).count();

++queued_.get();
durationUs_.get() += waited;
LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size();

task(yield);

--curSize_.get();
}
);
}
}
++curSize_.get();

LOG(log_.info()) << "WorkQueue dispatcher shutdown requested - time to execute onTasksComplete";
if (not processingStarted_)
return true;

{
auto onTasksComplete = onQueueEmpty_.lock();
ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true.");
onTasksComplete->operator()();
}
util::spawn(ioc_, [this](auto yield) { executeTask(yield); });

LOG(log_.info()) << "WorkQueue dispatcher finished";
return true;
}

void
WorkQueue::requestStop(std::function<void()> onQueueEmpty)
{
auto handler = onQueueEmpty_.lock();
*handler = std::move(onQueueEmpty);
handler->setCallable(std::move(onQueueEmpty));

stopping_ = true;
auto needsWakeup = false;

{
auto state = dispatcherState_.lock();
needsWakeup = std::exchange(state->isIdle, false);
}

if (needsWakeup)
boost::asio::post(strand_, [this] { waitTimer_.cancel(); });
}

void
Expand All @@ -194,6 +148,12 @@ WorkQueue::stop()
requestStop();

ioc_.join();

{
auto onTasksComplete = onQueueEmpty_.lock();
ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true.");
onTasksComplete->operator()();
}
}

WorkQueue
Expand Down Expand Up @@ -227,4 +187,29 @@ WorkQueue::size() const
return curSize_.get().value();
}

void
WorkQueue::executeTask(boost::asio::yield_context yield)
{
std::optional<TaskWithTimestamp> taskWithTimestamp;
{
auto state = queueState_.lock();
taskWithTimestamp = state->popNext();
}

ASSERT(
taskWithTimestamp.has_value(),
"Queue should not be empty as we spawn a coro with executeTask for each postCoro."
);
auto const takenAt = std::chrono::system_clock::now();
auto const waited =
std::chrono::duration_cast<std::chrono::microseconds>(takenAt - taskWithTimestamp->queuedAt).count();

++queued_.get();
durationUs_.get() += waited;
LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size();

taskWithTimestamp->task(yield);
--curSize_.get();
}

} // namespace rpc
Loading
Loading