From 80c0674c4380be89004f56ca81985f9abf3e402c Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Wed, 7 Jan 2026 15:00:18 +0000 Subject: [PATCH 1/9] Faster implementation of work queue --- benchmarks/rpc/WorkQueueBenchmarks.cpp | 59 +++++++--- src/rpc/WorkQueue.cpp | 147 +++++++++++++------------ src/rpc/WorkQueue.hpp | 33 +++--- tests/unit/rpc/WorkQueueTests.cpp | 13 +-- 4 files changed, 140 insertions(+), 112 deletions(-) diff --git a/benchmarks/rpc/WorkQueueBenchmarks.cpp b/benchmarks/rpc/WorkQueueBenchmarks.cpp index 51578c448..4e16b20c6 100644 --- a/benchmarks/rpc/WorkQueueBenchmarks.cpp +++ b/benchmarks/rpc/WorkQueueBenchmarks.cpp @@ -29,13 +29,18 @@ #include #include +#include +#include +#include #include #include #include #include #include #include +#include +#include using namespace rpc; using namespace util::config; @@ -75,36 +80,56 @@ benchmarkWorkQueue(benchmark::State& state) { init(); - auto const total = static_cast(state.range(0)); - auto const numThreads = static_cast(state.range(1)); - auto const maxSize = static_cast(state.range(2)); - auto const delayMs = static_cast(state.range(3)); + auto const queueThreads = static_cast(state.range(0)); + auto const clientThreads = static_cast(state.range(1)); + auto const itemsPerThread = static_cast(state.range(2)); + auto const maxSize = static_cast(state.range(3)); + auto const delayMs = static_cast(state.range(4)); for (auto _ : state) { std::atomic_size_t totalExecuted = 0uz; std::atomic_size_t totalQueued = 0uz; state.PauseTiming(); - WorkQueue queue(numThreads, maxSize); + WorkQueue queue(queueThreads, maxSize); state.ResumeTiming(); - for (auto i = 0uz; i < total; ++i) { - totalQueued += static_cast(queue.postCoro( - [&delayMs, &totalExecuted](auto yield) { - ++totalExecuted; + std::vector threads; + threads.reserve(clientThreads); + + for (auto t = 0uz; t < clientThreads; ++t) { + threads.emplace_back([&] { + for (auto i = 0uz; i < itemsPerThread; ++i) { + totalQueued += static_cast(queue.postCoro( + [&delayMs, &totalExecuted](auto yield) { + ++totalExecuted; + + boost::asio::steady_timer timer(yield.get_executor(), std::chrono::milliseconds{delayMs}); + timer.async_wait(yield); + + std::this_thread::sleep_for(std::chrono::microseconds{10}); + }, + /* isWhiteListed = */ false + )); + } + }); + } - boost::asio::steady_timer timer(yield.get_executor(), std::chrono::milliseconds{delayMs}); - timer.async_wait(yield); - }, - /* isWhiteListed = */ false - )); + for (auto& t : threads) { + if (t.joinable()) + t.join(); } queue.stop(); ASSERT(totalExecuted == totalQueued, "Totals don't match"); - ASSERT(totalQueued <= total, "Queued more than requested"); - ASSERT(totalQueued >= maxSize, "Queued less than maxSize"); + ASSERT(totalQueued <= itemsPerThread * clientThreads, "Queued more than requested"); + + if (maxSize == 0) { + ASSERT(totalQueued == itemsPerThread * clientThreads, "Queued exactly the expected amount"); + } else { + ASSERT(totalQueued >= std::min(maxSize, itemsPerThread * clientThreads), "Queued less than expected"); + } } } @@ -118,5 +143,5 @@ benchmarkWorkQueue(benchmark::State& state) */ // TODO: figure out what happens on 1 thread BENCHMARK(benchmarkWorkQueue) - ->ArgsProduct({{1'000, 10'000, 100'000}, {2, 4, 8}, {0, 5'000}, {10, 100, 250}}) + ->ArgsProduct({{2, 4, 8, 16}, {4, 8, 16}, {1'000, 10'000}, {0, 5'000}, {10, 100, 250}}) ->Unit(benchmark::kMillisecond); diff --git a/src/rpc/WorkQueue.cpp b/src/rpc/WorkQueue.cpp index b676fc64b..35c2d5f8f 100644 --- a/src/rpc/WorkQueue.cpp +++ b/src/rpc/WorkQueue.cpp @@ -25,9 +25,7 @@ #include "util/prometheus/Label.hpp" #include "util/prometheus/Prometheus.hpp" -#include #include -#include #include #include @@ -39,6 +37,27 @@ namespace rpc { +void +WorkQueue::OneTimeCallable::setCallable(std::function func) +{ + func_ = func; +} + +void +WorkQueue::OneTimeCallable::operator()() +{ + if (not called_) { + func_(); + called_ = true; + } +} + +WorkQueue::OneTimeCallable:: +operator bool() const +{ + return func_.operator bool(); +} + WorkQueue::WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize) : queued_{PrometheusService::counterInt( "work_queue_queued_total_number", @@ -56,8 +75,6 @@ WorkQueue::WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t "The current number of tasks in the queue" )} , ioc_{numWorkers} - , strand_{ioc_.get_executor()} - , waitTimer_(ioc_) { if (maxSize != 0) maxSize_ = maxSize; @@ -77,12 +94,32 @@ WorkQueue::~WorkQueue() void WorkQueue::startProcessing() { - util::spawn(strand_, [this](auto yield) { - ASSERT(not hasDispatcher_, "Dispatcher already running"); + ASSERT(not processingStarted_, "Attempt to start processing work queue more than once"); + processingStarted_ = true; + + // Spawn workers for all tasks that were queued before processing started + auto const numTasks = size(); + for (auto i = 0uz; i < numTasks; ++i) { + util::spawn(ioc_, [this](auto yield) { + std::optional task; + auto const spawnedAt = std::chrono::system_clock::now(); + + { + auto state = queueState_.lock(); + task = state->popNext(); + } - hasDispatcher_ = true; - dispatcherLoop(yield); - }); + auto const takenAt = std::chrono::system_clock::now(); + auto const waited = std::chrono::duration_cast(takenAt - spawnedAt).count(); + + ++queued_.get(); + durationUs_.get() += waited; + LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size(); + + task->operator()(yield); + --curSize_.get(); + }); + } } bool @@ -98,93 +135,53 @@ WorkQueue::postCoro(TaskType func, bool isWhiteListed, Priority priority) return false; } - ++curSize_.get(); - auto needsWakeup = false; + auto const queuedAt = std::chrono::system_clock::now(); { - auto state = dispatcherState_.lock(); - - needsWakeup = std::exchange(state->isIdle, false); - + auto state = queueState_.lock(); state->push(priority, std::move(func)); } - if (needsWakeup) - boost::asio::post(strand_, [this] { waitTimer_.cancel(); }); - - return true; -} + ++curSize_.get(); -void -WorkQueue::dispatcherLoop(boost::asio::yield_context yield) -{ - LOG(log_.info()) << "WorkQueue dispatcher starting"; + if (not processingStarted_) + return true; - // all ongoing tasks must be completed before stopping fully - while (not stopping_ or size() > 0) { + util::spawn(ioc_, [this, queuedAt](auto yield) { std::optional task; { - auto state = dispatcherState_.lock(); - - if (state->empty()) { - state->isIdle = true; - } else { - task = state->popNext(); - } + auto state = queueState_.lock(); + task = state->popNext(); } - if (not stopping_ and not task.has_value()) { - waitTimer_.expires_at(std::chrono::steady_clock::time_point::max()); - boost::system::error_code ec; - waitTimer_.async_wait(yield[ec]); - } else if (task.has_value()) { - util::spawn( - ioc_, - [this, spawnedAt = std::chrono::system_clock::now(), task = std::move(*task)](auto yield) mutable { - auto const takenAt = std::chrono::system_clock::now(); - auto const waited = - std::chrono::duration_cast(takenAt - spawnedAt).count(); - - ++queued_.get(); - durationUs_.get() += waited; - LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size(); - - task(yield); - - --curSize_.get(); - } - ); - } - } + auto const takenAt = std::chrono::system_clock::now(); + auto const waited = std::chrono::duration_cast(takenAt - queuedAt).count(); - LOG(log_.info()) << "WorkQueue dispatcher shutdown requested - time to execute onTasksComplete"; + ++queued_.get(); + durationUs_.get() += waited; + LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size(); - { - auto onTasksComplete = onQueueEmpty_.lock(); - ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true."); - onTasksComplete->operator()(); - } + task->operator()(yield); + --curSize_.get(); - LOG(log_.info()) << "WorkQueue dispatcher finished"; + if (curSize_.get().value() == 0 && stopping_) { + auto onTasksComplete = onQueueEmpty_.lock(); + ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true."); + onTasksComplete->operator()(); + } + }); + + return true; } void WorkQueue::requestStop(std::function onQueueEmpty) { auto handler = onQueueEmpty_.lock(); - *handler = std::move(onQueueEmpty); + handler->setCallable(std::move(onQueueEmpty)); stopping_ = true; - auto needsWakeup = false; - - { - auto state = dispatcherState_.lock(); - needsWakeup = std::exchange(state->isIdle, false); - } - - if (needsWakeup) - boost::asio::post(strand_, [this] { waitTimer_.cancel(); }); } void @@ -194,6 +191,10 @@ WorkQueue::stop() requestStop(); ioc_.join(); + + auto onTasksComplete = onQueueEmpty_.lock(); + ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true."); + onTasksComplete->operator()(); } WorkQueue diff --git a/src/rpc/WorkQueue.hpp b/src/rpc/WorkQueue.hpp index 8fa466501..76b989a4f 100644 --- a/src/rpc/WorkQueue.hpp +++ b/src/rpc/WorkQueue.hpp @@ -25,12 +25,8 @@ #include "util/prometheus/Counter.hpp" #include "util/prometheus/Gauge.hpp" -#include #include -#include -#include #include -#include #include #include @@ -76,11 +72,10 @@ class WorkQueue : public Reportable { }; private: - struct DispatcherState { + struct QueueState { QueueType high; QueueType normal; - bool isIdle = false; size_t highPriorityCounter = 0; void @@ -133,14 +128,26 @@ class WorkQueue : public Reportable { util::Logger log_{"RPC"}; boost::asio::thread_pool ioc_; - boost::asio::strand strand_; - bool hasDispatcher_ = false; std::atomic_bool stopping_; + std::atomic_bool processingStarted_{false}; - util::Mutex> onQueueEmpty_; - util::Mutex dispatcherState_; - boost::asio::steady_timer waitTimer_; + class OneTimeCallable { + std::function func_; + bool called_{false}; + + public: + void + setCallable(std::function func); + + void + operator()(); + + explicit + operator bool() const; + }; + util::Mutex onQueueEmpty_; + util::Mutex queueState_; public: struct DontStartProcessingTag {}; @@ -231,10 +238,6 @@ class WorkQueue : public Reportable { */ [[nodiscard]] size_t size() const; - -private: - void - dispatcherLoop(boost::asio::yield_context yield); }; } // namespace rpc diff --git a/tests/unit/rpc/WorkQueueTests.cpp b/tests/unit/rpc/WorkQueueTests.cpp index d32422e6c..c55f04734 100644 --- a/tests/unit/rpc/WorkQueueTests.cpp +++ b/tests/unit/rpc/WorkQueueTests.cpp @@ -207,11 +207,7 @@ TEST_F(WorkQueueStopTest, CallsOnTasksCompleteWhenStoppingOnLastTask) queue.stop(); } -struct WorkQueueMockPrometheusTest : WithMockPrometheus, RPCWorkQueueTestBase { - WorkQueueMockPrometheusTest() : RPCWorkQueueTestBase(/* workers = */ 1, /*maxQueueSize = */ 2) - { - } -}; +struct WorkQueueMockPrometheusTest : WithMockPrometheus {}; TEST_F(WorkQueueMockPrometheusTest, postCoroCounters) { @@ -221,7 +217,9 @@ TEST_F(WorkQueueMockPrometheusTest, postCoroCounters) std::binary_semaphore semaphore{0}; - EXPECT_CALL(curSizeMock, value()).WillOnce(::testing::Return(0)).WillRepeatedly(::testing::Return(1)); + EXPECT_CALL(curSizeMock, value()) + .WillOnce(::testing::Return(0)) // in startProcessing + .WillOnce(::testing::Return(0)); // first check in postCoro EXPECT_CALL(curSizeMock, add(1)); EXPECT_CALL(queuedMock, add(1)); EXPECT_CALL(durationMock, add(::testing::Ge(0))).WillOnce([&](auto) { @@ -230,8 +228,9 @@ TEST_F(WorkQueueMockPrometheusTest, postCoroCounters) semaphore.release(); }); + // Note: the queue is not in the fixture because above expectations must be setup before startProcessing runs + WorkQueue queue(/* numWorkers = */ 4, /* maxSize = */ 2); auto const res = queue.postCoro([&](auto /* yield */) { semaphore.acquire(); }, /* isWhiteListed = */ false); ASSERT_TRUE(res); - queue.stop(); } From cc7bffa83a406f197adc7320c4ff920d917ecd4a Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Wed, 7 Jan 2026 15:05:29 +0000 Subject: [PATCH 2/9] Remove unnecessary joinable check --- benchmarks/rpc/WorkQueueBenchmarks.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/benchmarks/rpc/WorkQueueBenchmarks.cpp b/benchmarks/rpc/WorkQueueBenchmarks.cpp index 4e16b20c6..b733cbea7 100644 --- a/benchmarks/rpc/WorkQueueBenchmarks.cpp +++ b/benchmarks/rpc/WorkQueueBenchmarks.cpp @@ -115,10 +115,8 @@ benchmarkWorkQueue(benchmark::State& state) }); } - for (auto& t : threads) { - if (t.joinable()) - t.join(); - } + for (auto& t : threads) + t.join(); queue.stop(); From ccd29c8396d7231e390f1a659d0a82ab501a262b Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Thu, 8 Jan 2026 14:37:24 +0000 Subject: [PATCH 3/9] Fix race condition --- src/rpc/WorkQueue.cpp | 67 +++++++++++-------------------- src/rpc/WorkQueue.hpp | 5 +++ tests/unit/rpc/WorkQueueTests.cpp | 4 +- 3 files changed, 31 insertions(+), 45 deletions(-) diff --git a/src/rpc/WorkQueue.cpp b/src/rpc/WorkQueue.cpp index 35c2d5f8f..7756167f3 100644 --- a/src/rpc/WorkQueue.cpp +++ b/src/rpc/WorkQueue.cpp @@ -40,7 +40,7 @@ namespace rpc { void WorkQueue::OneTimeCallable::setCallable(std::function func) { - func_ = func; + func_ = std::move(func); } void @@ -100,25 +100,7 @@ WorkQueue::startProcessing() // Spawn workers for all tasks that were queued before processing started auto const numTasks = size(); for (auto i = 0uz; i < numTasks; ++i) { - util::spawn(ioc_, [this](auto yield) { - std::optional task; - auto const spawnedAt = std::chrono::system_clock::now(); - - { - auto state = queueState_.lock(); - task = state->popNext(); - } - - auto const takenAt = std::chrono::system_clock::now(); - auto const waited = std::chrono::duration_cast(takenAt - spawnedAt).count(); - - ++queued_.get(); - durationUs_.get() += waited; - LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size(); - - task->operator()(yield); - --curSize_.get(); - }); + util::spawn(ioc_, [this](auto yield) { executeTask(std::chrono::system_clock::now(), yield); }); } } @@ -147,30 +129,7 @@ WorkQueue::postCoro(TaskType func, bool isWhiteListed, Priority priority) if (not processingStarted_) return true; - util::spawn(ioc_, [this, queuedAt](auto yield) { - std::optional task; - - { - auto state = queueState_.lock(); - task = state->popNext(); - } - - auto const takenAt = std::chrono::system_clock::now(); - auto const waited = std::chrono::duration_cast(takenAt - queuedAt).count(); - - ++queued_.get(); - durationUs_.get() += waited; - LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size(); - - task->operator()(yield); - --curSize_.get(); - - if (curSize_.get().value() == 0 && stopping_) { - auto onTasksComplete = onQueueEmpty_.lock(); - ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true."); - onTasksComplete->operator()(); - } - }); + util::spawn(ioc_, [this, queuedAt](auto yield) { executeTask(queuedAt, yield); }); return true; } @@ -228,4 +187,24 @@ WorkQueue::size() const return curSize_.get().value(); } +void +WorkQueue::executeTask(std::chrono::system_clock::time_point opTime, boost::asio::yield_context yield) +{ + std::optional task; + { + auto state = queueState_.lock(); + task = state->popNext(); + } + + auto const takenAt = std::chrono::system_clock::now(); + auto const waited = std::chrono::duration_cast(takenAt - opTime).count(); + + ++queued_.get(); + durationUs_.get() += waited; + LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size(); + + task->operator()(yield); + --curSize_.get(); +} + } // namespace rpc diff --git a/src/rpc/WorkQueue.hpp b/src/rpc/WorkQueue.hpp index 76b989a4f..21841aa75 100644 --- a/src/rpc/WorkQueue.hpp +++ b/src/rpc/WorkQueue.hpp @@ -30,6 +30,7 @@ #include #include +#include #include #include #include @@ -238,6 +239,10 @@ class WorkQueue : public Reportable { */ [[nodiscard]] size_t size() const; + +private: + void + executeTask(std::chrono::system_clock::time_point opTime, boost::asio::yield_context yield); }; } // namespace rpc diff --git a/tests/unit/rpc/WorkQueueTests.cpp b/tests/unit/rpc/WorkQueueTests.cpp index c55f04734..4c77794e8 100644 --- a/tests/unit/rpc/WorkQueueTests.cpp +++ b/tests/unit/rpc/WorkQueueTests.cpp @@ -18,6 +18,7 @@ //============================================================================== #include "rpc/WorkQueue.hpp" +#include "util/LoggerFixtures.hpp" #include "util/MockPrometheus.hpp" #include "util/config/ConfigDefinition.hpp" #include "util/config/ConfigValue.hpp" @@ -54,7 +55,7 @@ struct RPCWorkQueueTestBase : public virtual ::testing::Test { } }; -struct WorkQueueTest : WithPrometheus, RPCWorkQueueTestBase { +struct WorkQueueTest : WithPrometheus, RPCWorkQueueTestBase, LoggerFixture { WorkQueueTest() : RPCWorkQueueTestBase(/* workers = */ 4, /* maxQueueSize = */ 2) { } @@ -233,4 +234,5 @@ TEST_F(WorkQueueMockPrometheusTest, postCoroCounters) auto const res = queue.postCoro([&](auto /* yield */) { semaphore.acquire(); }, /* isWhiteListed = */ false); ASSERT_TRUE(res); + queue.stop(); } From b91a2c47ce1817b41b0bed106df125fe8be9146f Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Thu, 8 Jan 2026 15:49:29 +0000 Subject: [PATCH 4/9] Fix test --- tests/unit/rpc/WorkQueueTests.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/rpc/WorkQueueTests.cpp b/tests/unit/rpc/WorkQueueTests.cpp index 4c77794e8..cef14faa4 100644 --- a/tests/unit/rpc/WorkQueueTests.cpp +++ b/tests/unit/rpc/WorkQueueTests.cpp @@ -225,7 +225,6 @@ TEST_F(WorkQueueMockPrometheusTest, postCoroCounters) EXPECT_CALL(queuedMock, add(1)); EXPECT_CALL(durationMock, add(::testing::Ge(0))).WillOnce([&](auto) { EXPECT_CALL(curSizeMock, add(-1)); - EXPECT_CALL(curSizeMock, value()).WillOnce(::testing::Return(0)); semaphore.release(); }); From 16add1b02028b47982337dee914ac5c92c44f8ae Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Mon, 12 Jan 2026 14:27:43 +0000 Subject: [PATCH 5/9] Add death test and fix some review comments --- benchmarks/rpc/WorkQueueBenchmarks.cpp | 30 ++++++++++++++------------ src/rpc/WorkQueue.cpp | 10 ++++++--- tests/unit/rpc/WorkQueueTests.cpp | 20 +++++++++++++++++ 3 files changed, 43 insertions(+), 17 deletions(-) diff --git a/benchmarks/rpc/WorkQueueBenchmarks.cpp b/benchmarks/rpc/WorkQueueBenchmarks.cpp index b733cbea7..a5128abcc 100644 --- a/benchmarks/rpc/WorkQueueBenchmarks.cpp +++ b/benchmarks/rpc/WorkQueueBenchmarks.cpp @@ -80,18 +80,18 @@ benchmarkWorkQueue(benchmark::State& state) { init(); - auto const queueThreads = static_cast(state.range(0)); - auto const clientThreads = static_cast(state.range(1)); - auto const itemsPerThread = static_cast(state.range(2)); - auto const maxSize = static_cast(state.range(3)); - auto const delayMs = static_cast(state.range(4)); + auto const wqThreads = static_cast(state.range(0)); + auto const maxQueueSize = static_cast(state.range(1)); + auto const clientThreads = static_cast(state.range(2)); + auto const itemsPerClient = static_cast(state.range(3)); + auto const clientProcessingMs = static_cast(state.range(4)); for (auto _ : state) { std::atomic_size_t totalExecuted = 0uz; std::atomic_size_t totalQueued = 0uz; state.PauseTiming(); - WorkQueue queue(queueThreads, maxSize); + WorkQueue queue(wqThreads, maxQueueSize); state.ResumeTiming(); std::vector threads; @@ -99,12 +99,14 @@ benchmarkWorkQueue(benchmark::State& state) for (auto t = 0uz; t < clientThreads; ++t) { threads.emplace_back([&] { - for (auto i = 0uz; i < itemsPerThread; ++i) { + for (auto i = 0uz; i < itemsPerClient; ++i) { totalQueued += static_cast(queue.postCoro( - [&delayMs, &totalExecuted](auto yield) { + [&clientProcessingMs, &totalExecuted](auto yield) { ++totalExecuted; - boost::asio::steady_timer timer(yield.get_executor(), std::chrono::milliseconds{delayMs}); + boost::asio::steady_timer timer( + yield.get_executor(), std::chrono::milliseconds{clientProcessingMs} + ); timer.async_wait(yield); std::this_thread::sleep_for(std::chrono::microseconds{10}); @@ -121,12 +123,12 @@ benchmarkWorkQueue(benchmark::State& state) queue.stop(); ASSERT(totalExecuted == totalQueued, "Totals don't match"); - ASSERT(totalQueued <= itemsPerThread * clientThreads, "Queued more than requested"); + ASSERT(totalQueued <= itemsPerClient * clientThreads, "Queued more than requested"); - if (maxSize == 0) { - ASSERT(totalQueued == itemsPerThread * clientThreads, "Queued exactly the expected amount"); + if (maxQueueSize == 0) { + ASSERT(totalQueued == itemsPerClient * clientThreads, "Queued exactly the expected amount"); } else { - ASSERT(totalQueued >= std::min(maxSize, itemsPerThread * clientThreads), "Queued less than expected"); + ASSERT(totalQueued >= std::min(maxQueueSize, itemsPerClient * clientThreads), "Queued less than expected"); } } } @@ -141,5 +143,5 @@ benchmarkWorkQueue(benchmark::State& state) */ // TODO: figure out what happens on 1 thread BENCHMARK(benchmarkWorkQueue) - ->ArgsProduct({{2, 4, 8, 16}, {4, 8, 16}, {1'000, 10'000}, {0, 5'000}, {10, 100, 250}}) + ->ArgsProduct({{2, 4, 8, 16}, {0, 5'000}, {4, 8, 16}, {1'000, 10'000}, {10, 100, 250}}) ->Unit(benchmark::kMillisecond); diff --git a/src/rpc/WorkQueue.cpp b/src/rpc/WorkQueue.cpp index 7756167f3..d51455477 100644 --- a/src/rpc/WorkQueue.cpp +++ b/src/rpc/WorkQueue.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -151,9 +152,11 @@ WorkQueue::stop() ioc_.join(); - auto onTasksComplete = onQueueEmpty_.lock(); - ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true."); - onTasksComplete->operator()(); + { + auto onTasksComplete = onQueueEmpty_.lock(); + ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true."); + onTasksComplete->operator()(); + } } WorkQueue @@ -196,6 +199,7 @@ WorkQueue::executeTask(std::chrono::system_clock::time_point opTime, boost::asio task = state->popNext(); } + ASSERT(task.has_value(), "Queue should not be empty as we spawn a coro with executeTask for each postCoro."); auto const takenAt = std::chrono::system_clock::now(); auto const waited = std::chrono::duration_cast(takenAt - opTime).count(); diff --git a/tests/unit/rpc/WorkQueueTests.cpp b/tests/unit/rpc/WorkQueueTests.cpp index cef14faa4..71b371b7a 100644 --- a/tests/unit/rpc/WorkQueueTests.cpp +++ b/tests/unit/rpc/WorkQueueTests.cpp @@ -19,6 +19,7 @@ #include "rpc/WorkQueue.hpp" #include "util/LoggerFixtures.hpp" +#include "util/MockAssert.hpp" #include "util/MockPrometheus.hpp" #include "util/config/ConfigDefinition.hpp" #include "util/config/ConfigValue.hpp" @@ -235,3 +236,22 @@ TEST_F(WorkQueueMockPrometheusTest, postCoroCounters) ASSERT_TRUE(res); queue.stop(); } + +// Note: not using EXPECT_CLIO_ASSERT_FAIL because exception is swallowed by the WQ context +struct WorkQueueDeathTest : WorkQueueMockPrometheusTest, common::util::WithMockAssert {}; + +TEST_F(WorkQueueDeathTest, ExecuteTaskAssertsWhenQueueIsEmpty) +{ + [[maybe_unused]] auto& queuedMock = makeMock("work_queue_queued_total_number", ""); + [[maybe_unused]] auto& durationMock = makeMock("work_queue_cumulative_tasks_duration_us", ""); + auto& curSizeMock = makeMock("work_queue_current_size", ""); + + EXPECT_CALL(curSizeMock, value()).WillRepeatedly(::testing::Return(1)); // lie about the size + EXPECT_DEATH( + { + WorkQueue queue(WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 2); + queue.startProcessing(); // the actual queue is empty which will lead to assertion failure + }, + ".*" + ); +} From 00475da13a4b7a23dc369c0ddd4af55d1988780a Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Mon, 12 Jan 2026 14:48:55 +0000 Subject: [PATCH 6/9] Save queue time and use it --- src/rpc/WorkQueue.cpp | 22 ++++++++++++---------- src/rpc/WorkQueue.hpp | 24 +++++++++++++++--------- tests/unit/rpc/WorkQueueTests.cpp | 27 +++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 19 deletions(-) diff --git a/src/rpc/WorkQueue.cpp b/src/rpc/WorkQueue.cpp index d51455477..e83db78d0 100644 --- a/src/rpc/WorkQueue.cpp +++ b/src/rpc/WorkQueue.cpp @@ -101,7 +101,7 @@ WorkQueue::startProcessing() // Spawn workers for all tasks that were queued before processing started auto const numTasks = size(); for (auto i = 0uz; i < numTasks; ++i) { - util::spawn(ioc_, [this](auto yield) { executeTask(std::chrono::system_clock::now(), yield); }); + util::spawn(ioc_, [this](auto yield) { executeTask(yield); }); } } @@ -118,8 +118,6 @@ WorkQueue::postCoro(TaskType func, bool isWhiteListed, Priority priority) return false; } - auto const queuedAt = std::chrono::system_clock::now(); - { auto state = queueState_.lock(); state->push(priority, std::move(func)); @@ -130,7 +128,7 @@ WorkQueue::postCoro(TaskType func, bool isWhiteListed, Priority priority) if (not processingStarted_) return true; - util::spawn(ioc_, [this, queuedAt](auto yield) { executeTask(queuedAt, yield); }); + util::spawn(ioc_, [this](auto yield) { executeTask(yield); }); return true; } @@ -191,23 +189,27 @@ WorkQueue::size() const } void -WorkQueue::executeTask(std::chrono::system_clock::time_point opTime, boost::asio::yield_context yield) +WorkQueue::executeTask(boost::asio::yield_context yield) { - std::optional task; + std::optional taskWithTimestamp; { auto state = queueState_.lock(); - task = state->popNext(); + taskWithTimestamp = state->popNext(); } - ASSERT(task.has_value(), "Queue should not be empty as we spawn a coro with executeTask for each postCoro."); + ASSERT( + taskWithTimestamp.has_value(), + "Queue should not be empty as we spawn a coro with executeTask for each postCoro." + ); auto const takenAt = std::chrono::system_clock::now(); - auto const waited = std::chrono::duration_cast(takenAt - opTime).count(); + auto const waited = + std::chrono::duration_cast(takenAt - taskWithTimestamp->queuedAt).count(); ++queued_.get(); durationUs_.get() += waited; LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size(); - task->operator()(yield); + taskWithTimestamp->task(yield); --curSize_.get(); } diff --git a/src/rpc/WorkQueue.hpp b/src/rpc/WorkQueue.hpp index 21841aa75..30ea4d7b3 100644 --- a/src/rpc/WorkQueue.hpp +++ b/src/rpc/WorkQueue.hpp @@ -61,7 +61,13 @@ struct Reportable { */ class WorkQueue : public Reportable { using TaskType = std::function; - using QueueType = std::queue; + + struct TaskWithTimestamp { + TaskType task; + std::chrono::system_clock::time_point queuedAt; + }; + + using QueueType = std::queue; public: /** @@ -80,14 +86,14 @@ class WorkQueue : public Reportable { size_t highPriorityCounter = 0; void - push(Priority priority, auto&& task) + push(Priority priority, TaskType&& task) { auto& queue = [this, priority] -> QueueType& { if (priority == Priority::High) return high; return normal; }(); - queue.push(std::forward(task)); + queue.push(TaskWithTimestamp{.task = std::move(task), .queuedAt = std::chrono::system_clock::now()}); } [[nodiscard]] bool @@ -96,21 +102,21 @@ class WorkQueue : public Reportable { return high.empty() and normal.empty(); } - [[nodiscard]] std::optional + [[nodiscard]] std::optional popNext() { if (not high.empty() and (highPriorityCounter < kTAKE_HIGH_PRIO or normal.empty())) { - auto task = std::move(high.front()); + auto taskWithTimestamp = std::move(high.front()); high.pop(); ++highPriorityCounter; - return task; + return taskWithTimestamp; } if (not normal.empty()) { - auto task = std::move(normal.front()); + auto taskWithTimestamp = std::move(normal.front()); normal.pop(); highPriorityCounter = 0; - return task; + return taskWithTimestamp; } return std::nullopt; @@ -242,7 +248,7 @@ class WorkQueue : public Reportable { private: void - executeTask(std::chrono::system_clock::time_point opTime, boost::asio::yield_context yield); + executeTask(boost::asio::yield_context yield); }; } // namespace rpc diff --git a/tests/unit/rpc/WorkQueueTests.cpp b/tests/unit/rpc/WorkQueueTests.cpp index 71b371b7a..c918181a1 100644 --- a/tests/unit/rpc/WorkQueueTests.cpp +++ b/tests/unit/rpc/WorkQueueTests.cpp @@ -31,10 +31,12 @@ #include #include +#include #include #include #include #include +#include #include using namespace util; @@ -113,6 +115,31 @@ TEST_F(WorkQueueTest, NonWhitelistedPreventSchedulingAtQueueLimitExceeded) EXPECT_TRUE(unblocked); } +struct WorkQueueDelayedStartTest : WithPrometheus, LoggerFixture { + WorkQueue queue{WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 100}; +}; + +TEST_F(WorkQueueDelayedStartTest, WaitTimeIncludesDelayBeforeStartProcessing) +{ + std::atomic_bool taskExecuted = false; + + ASSERT_TRUE(queue.postCoro( + [&taskExecuted](auto /* yield */) { taskExecuted = true; }, + /* isWhiteListed = */ true + )); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + queue.startProcessing(); + queue.stop(); + + EXPECT_TRUE(taskExecuted); + + auto const report = queue.report(); + auto const durationUs = report.at("queued_duration_us").as_uint64(); + + EXPECT_GE(durationUs, 50000u) << "Wait time should include the delay before startProcessing"; +} + struct WorkQueuePriorityTest : WithPrometheus, virtual ::testing::Test { WorkQueue queue{WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 100}; }; From 50328fab539eae3c1f0d3a6fd37b1b5fb30767e6 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Mon, 12 Jan 2026 14:49:59 +0000 Subject: [PATCH 7/9] Small fix --- tests/unit/rpc/WorkQueueTests.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/rpc/WorkQueueTests.cpp b/tests/unit/rpc/WorkQueueTests.cpp index c918181a1..e6d7a5404 100644 --- a/tests/unit/rpc/WorkQueueTests.cpp +++ b/tests/unit/rpc/WorkQueueTests.cpp @@ -140,7 +140,7 @@ TEST_F(WorkQueueDelayedStartTest, WaitTimeIncludesDelayBeforeStartProcessing) EXPECT_GE(durationUs, 50000u) << "Wait time should include the delay before startProcessing"; } -struct WorkQueuePriorityTest : WithPrometheus, virtual ::testing::Test { +struct WorkQueuePriorityTest : WithPrometheus { WorkQueue queue{WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 100}; }; From 75c904f3abe33b3b294d17da70e3e67b77961e58 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Mon, 12 Jan 2026 14:52:30 +0000 Subject: [PATCH 8/9] Remove iostream --- src/rpc/WorkQueue.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rpc/WorkQueue.cpp b/src/rpc/WorkQueue.cpp index e83db78d0..a0a968823 100644 --- a/src/rpc/WorkQueue.cpp +++ b/src/rpc/WorkQueue.cpp @@ -32,7 +32,6 @@ #include #include #include -#include #include #include From e653c515f007423c164ffa9821208e2a0dc89dda Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Mon, 12 Jan 2026 16:57:36 +0000 Subject: [PATCH 9/9] Disable test --- tests/unit/rpc/WorkQueueTests.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/unit/rpc/WorkQueueTests.cpp b/tests/unit/rpc/WorkQueueTests.cpp index e6d7a5404..c716c808d 100644 --- a/tests/unit/rpc/WorkQueueTests.cpp +++ b/tests/unit/rpc/WorkQueueTests.cpp @@ -18,7 +18,6 @@ //============================================================================== #include "rpc/WorkQueue.hpp" -#include "util/LoggerFixtures.hpp" #include "util/MockAssert.hpp" #include "util/MockPrometheus.hpp" #include "util/config/ConfigDefinition.hpp" @@ -58,7 +57,7 @@ struct RPCWorkQueueTestBase : public virtual ::testing::Test { } }; -struct WorkQueueTest : WithPrometheus, RPCWorkQueueTestBase, LoggerFixture { +struct WorkQueueTest : WithPrometheus, RPCWorkQueueTestBase { WorkQueueTest() : RPCWorkQueueTestBase(/* workers = */ 4, /* maxQueueSize = */ 2) { } @@ -115,7 +114,7 @@ TEST_F(WorkQueueTest, NonWhitelistedPreventSchedulingAtQueueLimitExceeded) EXPECT_TRUE(unblocked); } -struct WorkQueueDelayedStartTest : WithPrometheus, LoggerFixture { +struct WorkQueueDelayedStartTest : WithPrometheus { WorkQueue queue{WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 100}; }; @@ -265,9 +264,10 @@ TEST_F(WorkQueueMockPrometheusTest, postCoroCounters) } // Note: not using EXPECT_CLIO_ASSERT_FAIL because exception is swallowed by the WQ context +// TODO [https://github.com/XRPLF/clio/issues/2906]: Enable the test once we figure out a better way to do it without +// using up >2 minutes of CI time struct WorkQueueDeathTest : WorkQueueMockPrometheusTest, common::util::WithMockAssert {}; - -TEST_F(WorkQueueDeathTest, ExecuteTaskAssertsWhenQueueIsEmpty) +TEST_F(WorkQueueDeathTest, DISABLED_ExecuteTaskAssertsWhenQueueIsEmpty) { [[maybe_unused]] auto& queuedMock = makeMock("work_queue_queued_total_number", ""); [[maybe_unused]] auto& durationMock = makeMock("work_queue_cumulative_tasks_duration_us", "");