Skip to content

Commit 2327e81

Browse files
fix: WorkQueue contention (#2866)
Co-authored-by: Ayaz Salikhov <mathbunnyru@users.noreply.github.com>
1 parent 5269ea0 commit 2327e81

File tree

4 files changed

+173
-33
lines changed

4 files changed

+173
-33
lines changed

benchmarks/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ target_sources(
99
util/async/ExecutionContextBenchmarks.cpp
1010
# Logger
1111
util/log/LoggerBenchmark.cpp
12+
# WorkQueue
13+
rpc/WorkQueueBenchmarks.cpp
1214
)
1315

1416
include(deps/gbench)
1517

1618
target_include_directories(clio_benchmark PRIVATE .)
17-
target_link_libraries(clio_benchmark PUBLIC clio_util benchmark::benchmark_main spdlog::spdlog)
19+
target_link_libraries(clio_benchmark PUBLIC clio_util clio_rpc benchmark::benchmark_main spdlog::spdlog)
1820
set_target_properties(clio_benchmark PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
//------------------------------------------------------------------------------
2+
/*
3+
This file is part of clio: https://github.com/XRPLF/clio
4+
Copyright (c) 2025, the clio developers.
5+
6+
Permission to use, copy, modify, and distribute this software for any
7+
purpose with or without fee is hereby granted, provided that the above
8+
copyright notice and this permission notice appear in all copies.
9+
10+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11+
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12+
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13+
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14+
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15+
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16+
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17+
*/
18+
//==============================================================================
19+
20+
#include "rpc/WorkQueue.hpp"
21+
#include "util/Assert.hpp"
22+
#include "util/config/Array.hpp"
23+
#include "util/config/ConfigConstraints.hpp"
24+
#include "util/config/ConfigDefinition.hpp"
25+
#include "util/config/ConfigValue.hpp"
26+
#include "util/config/Types.hpp"
27+
#include "util/log/Logger.hpp"
28+
#include "util/prometheus/Prometheus.hpp"
29+
30+
#include <benchmark/benchmark.h>
31+
#include <boost/asio.hpp>
32+
#include <boost/asio/spawn.hpp>
33+
#include <boost/asio/steady_timer.hpp>
34+
#include <boost/asio/thread_pool.hpp>
35+
#include <boost/json.hpp>
36+
#include <boost/json/object.hpp>
37+
38+
#include <atomic>
39+
#include <cassert>
40+
#include <chrono>
41+
#include <cstddef>
42+
#include <cstdint>
43+
#include <mutex>
44+
45+
using namespace rpc;
46+
using namespace util::config;
47+
48+
namespace {
49+
50+
auto const kCONFIG = ClioConfigDefinition{
51+
{"prometheus.compress_reply", ConfigValue{ConfigType::Boolean}.defaultValue(true)},
52+
{"prometheus.enabled", ConfigValue{ConfigType::Boolean}.defaultValue(true)},
53+
{"log.channels.[].channel", Array{ConfigValue{ConfigType::String}}},
54+
{"log.channels.[].level", Array{ConfigValue{ConfigType::String}}},
55+
{"log.level", ConfigValue{ConfigType::String}.defaultValue("info")},
56+
{"log.format", ConfigValue{ConfigType::String}.defaultValue(R"(%Y-%m-%d %H:%M:%S.%f %^%3!l:%n%$ - %v)")},
57+
{"log.is_async", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
58+
{"log.enable_console", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
59+
{"log.directory", ConfigValue{ConfigType::String}.optional()},
60+
{"log.rotation_size", ConfigValue{ConfigType::Integer}.defaultValue(2048).withConstraint(gValidateUint32)},
61+
{"log.directory_max_files", ConfigValue{ConfigType::Integer}.defaultValue(25).withConstraint(gValidateUint32)},
62+
{"log.tag_style", ConfigValue{ConfigType::String}.defaultValue("none")},
63+
};
64+
65+
// this should be a fixture but it did not work with Args very well
66+
void
67+
init()
68+
{
69+
static std::once_flag kONCE;
70+
std::call_once(kONCE, [] {
71+
PrometheusService::init(kCONFIG);
72+
(void)util::LogService::init(kCONFIG);
73+
});
74+
}
75+
76+
} // namespace
77+
78+
static void
79+
benchmarkWorkQueue(benchmark::State& state)
80+
{
81+
init();
82+
83+
auto const total = static_cast<size_t>(state.range(0));
84+
auto const numThreads = static_cast<uint32_t>(state.range(1));
85+
auto const maxSize = static_cast<uint32_t>(state.range(2));
86+
auto const delayMs = static_cast<uint32_t>(state.range(3));
87+
88+
for (auto _ : state) {
89+
std::atomic_size_t totalExecuted = 0uz;
90+
std::atomic_size_t totalQueued = 0uz;
91+
92+
state.PauseTiming();
93+
WorkQueue queue(numThreads, maxSize);
94+
state.ResumeTiming();
95+
96+
for (auto i = 0uz; i < total; ++i) {
97+
totalQueued += static_cast<std::size_t>(queue.postCoro(
98+
[&delayMs, &totalExecuted](auto yield) {
99+
++totalExecuted;
100+
101+
boost::asio::steady_timer timer(yield.get_executor(), std::chrono::milliseconds{delayMs});
102+
timer.async_wait(yield);
103+
},
104+
/* isWhiteListed = */ false
105+
));
106+
}
107+
108+
queue.stop();
109+
110+
ASSERT(totalExecuted == totalQueued, "Totals don't match");
111+
ASSERT(totalQueued <= total, "Queued more than requested");
112+
ASSERT(totalQueued >= maxSize, "Queued less than maxSize");
113+
}
114+
}
115+
116+
// Usage example:
117+
/*
118+
./clio_benchmark \
119+
--benchmark_repetitions=10 \
120+
--benchmark_display_aggregates_only=true \
121+
--benchmark_min_time=1x \
122+
--benchmark_filter="WorkQueue"
123+
*/
124+
// TODO: figure out what happens on 1 thread
125+
BENCHMARK(benchmarkWorkQueue)
126+
->ArgsProduct({{1'000, 10'000, 100'000}, {2, 4, 8}, {0, 5'000}, {10, 100, 250}})
127+
->Unit(benchmark::kMillisecond);

src/rpc/WorkQueue.cpp

Lines changed: 20 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
#include <cstddef>
3535
#include <cstdint>
3636
#include <functional>
37+
#include <optional>
3738
#include <utility>
38-
#include <vector>
3939

4040
namespace rpc {
4141

@@ -122,51 +122,39 @@ WorkQueue::dispatcherLoop(boost::asio::yield_context yield)
122122

123123
// all ongoing tasks must be completed before stopping fully
124124
while (not stopping_ or size() > 0) {
125-
std::vector<TaskType> batch;
125+
std::optional<TaskType> task;
126126

127127
{
128128
auto state = dispatcherState_.lock();
129129

130130
if (state->empty()) {
131131
state->isIdle = true;
132132
} else {
133-
for (auto count = 0uz; count < kTAKE_HIGH_PRIO and not state->high.empty(); ++count) {
134-
batch.push_back(std::move(state->high.front()));
135-
state->high.pop();
136-
}
137-
138-
if (not state->normal.empty()) {
139-
batch.push_back(std::move(state->normal.front()));
140-
state->normal.pop();
141-
}
133+
task = state->popNext();
142134
}
143135
}
144136

145-
if (not stopping_ and batch.empty()) {
137+
if (not stopping_ and not task.has_value()) {
146138
waitTimer_.expires_at(std::chrono::steady_clock::time_point::max());
147139
boost::system::error_code ec;
148140
waitTimer_.async_wait(yield[ec]);
149-
} else {
150-
for (auto task : std::move(batch)) {
151-
util::spawn(
152-
ioc_,
153-
[this, spawnedAt = std::chrono::system_clock::now(), task = std::move(task)](auto yield) mutable {
154-
auto const takenAt = std::chrono::system_clock::now();
155-
auto const waited =
156-
std::chrono::duration_cast<std::chrono::microseconds>(takenAt - spawnedAt).count();
157-
158-
++queued_.get();
159-
durationUs_.get() += waited;
160-
LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size();
161-
162-
task(yield);
163-
164-
--curSize_.get();
165-
}
166-
);
167-
}
141+
} else if (task.has_value()) {
142+
util::spawn(
143+
ioc_,
144+
[this, spawnedAt = std::chrono::system_clock::now(), task = std::move(*task)](auto yield) mutable {
145+
auto const takenAt = std::chrono::system_clock::now();
146+
auto const waited =
147+
std::chrono::duration_cast<std::chrono::microseconds>(takenAt - spawnedAt).count();
168148

169-
boost::asio::post(ioc_.get_executor(), yield); // yield back to avoid hijacking the thread
149+
++queued_.get();
150+
durationUs_.get() += waited;
151+
LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size();
152+
153+
task(yield);
154+
155+
--curSize_.get();
156+
}
157+
);
170158
}
171159
}
172160

src/rpc/WorkQueue.hpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
#include <cstdint>
3939
#include <functional>
4040
#include <limits>
41+
#include <optional>
4142
#include <queue>
43+
#include <utility>
4244

4345
namespace rpc {
4446

@@ -79,6 +81,7 @@ class WorkQueue : public Reportable {
7981
QueueType normal;
8082

8183
bool isIdle = false;
84+
size_t highPriorityCounter = 0;
8285

8386
void
8487
push(Priority priority, auto&& task)
@@ -96,6 +99,26 @@ class WorkQueue : public Reportable {
9699
{
97100
return high.empty() and normal.empty();
98101
}
102+
103+
[[nodiscard]] std::optional<TaskType>
104+
popNext()
105+
{
106+
if (not high.empty() and (highPriorityCounter < kTAKE_HIGH_PRIO or normal.empty())) {
107+
auto task = std::move(high.front());
108+
high.pop();
109+
++highPriorityCounter;
110+
return task;
111+
}
112+
113+
if (not normal.empty()) {
114+
auto task = std::move(normal.front());
115+
normal.pop();
116+
highPriorityCounter = 0;
117+
return task;
118+
}
119+
120+
return std::nullopt;
121+
}
99122
};
100123

101124
private:

0 commit comments

Comments
 (0)