Skip to content

Commit 8c434ca

Browse files
author
yannan.wyn
committed
fix(bthread): refactor sharded priority queue with per-ED shard
Each EventDispatcher gets its own WorkStealingQueue, making concurrent push from multiple EDs naturally SPMC-safe without spinlocks.
1 parent 0565d8d commit 8c434ca

9 files changed

Lines changed: 285 additions & 11 deletions

src/brpc/event_dispatcher.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32
2424
#include "bvar/latency_recorder.h" // bvar::LatencyRecorder
2525
#include "bthread/bthread.h" // bthread_start_background
26+
#include "bthread/task_group.h" // TaskGroup::address_meta
2627
#include "brpc/event_dispatcher.h"
2728

2829
DECLARE_int32(task_group_ntags);
@@ -68,6 +69,7 @@ void InitializeGlobalDispatchers() {
6869
bthread_attr_t attr =
6970
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
7071
attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags;
72+
g_edisp[i * FLAGS_event_dispatcher_num + j].set_priority_index(j);
7173
CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + j].Start(&attr));
7274
}
7375
}

src/brpc/event_dispatcher.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ template <typename T> friend class IOEvent;
114114
// Stop bthread of this dispatcher.
115115
void Stop();
116116

117+
void set_priority_index(int idx) { _priority_index = idx; }
118+
117119
// Suspend calling thread until bthread of this dispatcher stops.
118120
void Join();
119121

@@ -188,6 +190,8 @@ template <typename T> friend class IOEvent;
188190

189191
// Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
190192
int _wakeup_fds[2];
193+
194+
int _priority_index{-1};
191195
};
192196

193197
EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag);

src/brpc/event_dispatcher_epoll.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,13 @@ int EventDispatcher::RemoveConsumer(int fd) {
190190
}
191191

192192
void* EventDispatcher::RunThis(void* arg) {
193-
((EventDispatcher*)arg)->Run();
193+
EventDispatcher* ed = (EventDispatcher*)arg;
194+
if (ed->_priority_index >= 0) {
195+
bthread::TaskMeta* meta =
196+
bthread::TaskGroup::address_meta(bthread_self());
197+
meta->priority_index = ed->_priority_index;
198+
}
199+
ed->Run();
194200
return NULL;
195201
}
196202

src/bthread/task_control.cpp

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ DEFINE_string(cpu_set, "",
5151
"Set of CPUs to which cores are bound. "
5252
"for example, 0-3,5,7; default: disable");
5353

54+
namespace brpc { DECLARE_int32(event_dispatcher_num); }
55+
5456
namespace bthread {
5557

5658
DEFINE_bool(parking_lot_no_signal_when_no_waiter, false,
@@ -205,7 +207,8 @@ TaskControl::TaskControl()
205207
, _status(print_rq_sizes_in_the_tc, this)
206208
, _nbthreads("bthread_count")
207209
, _enable_priority_queue(FLAGS_enable_bthread_priority_queue)
208-
, _priority_queues(FLAGS_task_group_ntags)
210+
, _pq_num_of_each_tag(brpc::FLAGS_event_dispatcher_num)
211+
, _priority_queues(FLAGS_task_group_ntags * brpc::FLAGS_event_dispatcher_num)
209212
, _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
210213
, _tagged_pl(FLAGS_task_group_ntags)
211214
{}
@@ -238,9 +241,14 @@ int TaskControl::init(int concurrency) {
238241
_tagged_worker_usage_second.push_back(new bvar::PerSecond<bvar::PassiveStatus<double>>(
239242
"bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1));
240243
_tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count", tag_str));
241-
if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) {
242-
LOG(ERROR) << "Fail to init _priority_q";
243-
return -1;
244+
if (_enable_priority_queue) {
245+
for (int j = 0; j < _pq_num_of_each_tag; ++j) {
246+
if (priority_queue(i, j).init(4096) != 0) {
247+
LOG(ERROR) << "Fail to init priority queue for tag=" << i
248+
<< " ed=" << j;
249+
return -1;
250+
}
251+
}
244252
}
245253
}
246254

@@ -445,7 +453,7 @@ TaskControl::~TaskControl() {
445453
_switch_per_second.hide();
446454
_signal_per_second.hide();
447455
_status.hide();
448-
456+
449457
stop_and_join();
450458
}
451459

@@ -528,8 +536,12 @@ int TaskControl::_destroy_group(TaskGroup* g) {
528536
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
529537
auto tag = tls_task_group->tag();
530538

531-
if (_priority_queues[tag].steal(tid)) {
532-
return true;
539+
if (_enable_priority_queue) {
540+
for (int i = 0; i < _pq_num_of_each_tag; ++i) {
541+
if (priority_queue(tag, i).steal(tid)) {
542+
return true;
543+
}
544+
}
533545
}
534546

535547
// 1: Acquiring fence is paired with releasing fence in _add_group to
@@ -689,4 +701,5 @@ std::vector<bthread_t> TaskControl::get_living_bthreads() {
689701
return living_bthread_ids;
690702
}
691703

704+
692705
} // namespace bthread

src/bthread/task_control.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,12 @@ friend bthread_t init_for_pthread_stack_trace();
101101
std::string stack_trace(bthread_t tid);
102102
#endif // BRPC_BTHREAD_TRACER
103103

104-
void push_priority_queue(bthread_tag_t tag, bthread_t tid) {
105-
_priority_queues[tag].push(tid);
104+
void push_priority_queue(bthread_tag_t tag, int priority_index, bthread_t tid) {
105+
priority_queue(tag, priority_index).push(tid);
106106
}
107107

108108
std::vector<bthread_t> get_living_bthreads();
109+
109110
private:
110111
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
111112
typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot;
@@ -123,6 +124,11 @@ friend bthread_t init_for_pthread_stack_trace();
123124
// Tag parking slot
124125
TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; }
125126

127+
// Priority queue for a specific ED within a tag
128+
WorkStealingQueue<bthread_t>& priority_queue(bthread_tag_t tag, int index) {
129+
return _priority_queues[tag * _pq_num_of_each_tag + index];
130+
}
131+
126132
static void delete_task_group(void* arg);
127133

128134
static void* worker_thread(void* task_control);
@@ -164,6 +170,7 @@ friend bthread_t init_for_pthread_stack_trace();
164170
std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
165171

166172
bool _enable_priority_queue;
173+
int _pq_num_of_each_tag;
167174
std::vector<WorkStealingQueue<bthread_t>> _priority_queues;
168175

169176
size_t _pl_num_of_each_tag;

src/bthread/task_group.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,7 @@ void TaskGroup::ending_sched(TaskGroup** pg) {
640640
TaskGroup* g = *pg;
641641
bthread_t next_tid = 0;
642642
// Find next task to run, if none, switch to idle thread of the group.
643+
643644
#ifndef BTHREAD_FAIR_WSQ
644645
// When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
645646
// WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
@@ -908,7 +909,8 @@ void TaskGroup::priority_to_run(void* args_in) {
908909
tls_task_group->_control->_task_tracer.set_status(
909910
TASK_STATUS_READY, args->meta);
910911
#endif // BRPC_BTHREAD_TRACER
911-
return tls_task_group->control()->push_priority_queue(args->tag, args->meta->tid);
912+
return tls_task_group->control()->push_priority_queue(
913+
args->tag, args->meta->priority_index, args->meta->tid);
912914
}
913915

914916
struct SleepArgs {

src/bthread/task_meta.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ struct TaskMeta {
8888
// simplified if they can get tid from TaskMeta.
8989
bthread_t tid{INVALID_BTHREAD};
9090

91+
int priority_index{-1};
92+
9193
// User function and argument
9294
void* (*fn)(void*){NULL};
9395
void* arg{NULL};

test/BUILD.bazel

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ cc_test(
240240
"bthread_butex_multi_tag_unittest.cpp",
241241
"bthread_rwlock_unittest.cpp",
242242
"bthread_semaphore_unittest.cpp",
243+
# Have custom main() that conflicts with gtest_main
244+
"bthread_priority_queue_unittest.cpp",
243245
],
244246
),
245247
copts = COPTS,
@@ -252,6 +254,17 @@ cc_test(
252254
],
253255
)
254256

257+
cc_test(
258+
name = "bthread_priority_queue_test",
259+
srcs = ["bthread_priority_queue_unittest.cpp"],
260+
copts = COPTS,
261+
deps = [
262+
":sstream_workaround",
263+
"//:brpc",
264+
"@com_google_googletest//:gtest",
265+
],
266+
)
267+
255268
cc_test(
256269
name = "brpc_prometheus_test",
257270
srcs = glob(

0 commit comments

Comments
 (0)