From c8531be81d790493d550bd796df0834844a49466 Mon Sep 17 00:00:00 2001 From: "jiazheng.jia" Date: Wed, 6 Nov 2024 11:12:49 +0800 Subject: [PATCH 1/8] add global priority queue --- src/brpc/event_dispatcher.cpp | 1 + src/brpc/event_dispatcher.h | 2 ++ src/bthread/bthread.cpp | 6 ++++++ src/bthread/bthread.h | 3 +++ src/bthread/parking_lot.cpp | 7 +++++++ src/bthread/parking_lot.h | 2 ++ src/bthread/task_control.cpp | 20 ++++++++++++++++++++ src/bthread/task_control.h | 9 +++++++++ src/bthread/task_group.cpp | 34 ++++++++++++++++++++++++++-------- src/bthread/task_group.h | 8 ++++++++ src/bthread/task_group_inl.h | 2 +- 11 files changed, 85 insertions(+), 9 deletions(-) create mode 100644 src/bthread/parking_lot.cpp diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp index a8f1b9dc0e..76e889bb97 100644 --- a/src/brpc/event_dispatcher.cpp +++ b/src/brpc/event_dispatcher.cpp @@ -62,6 +62,7 @@ void InitializeGlobalDispatchers() { FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags; CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + j].Start(&attr)); + bthread_epoll_tid_set(i, g_edisp[i].Tid()); } } // This atexit is will be run before g_task_control.stop() because above diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h index 51c404e2a0..61a0710b58 100644 --- a/src/brpc/event_dispatcher.h +++ b/src/brpc/event_dispatcher.h @@ -133,6 +133,8 @@ template friend class IOEvent; // Returns 0 on success, -1 otherwise and errno is set int UnregisterEvent(IOEventDataId event_data_id, int fd, bool pollin); + bthread_t Tid() const { return _tid; } + private: DISALLOW_COPY_AND_ASSIGN(EventDispatcher); diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index e124f4bc6f..f127011038 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -625,4 +625,10 @@ uint64_t bthread_cpu_clock_ns(void) { return 0; } +void bthread_epoll_tid_set(bthread_tag_t tag, bthread_t tid) { + CHECK(tag >= BTHREAD_TAG_DEFAULT && tag < FLAGS_task_group_ntags); + auto c = bthread::get_task_control(); + return c->set_group_epoll_tid(tag, tid); +} + } // extern "C" diff --git a/src/bthread/bthread.h b/src/bthread/bthread.h index 7e42c96c9f..b05df28c89 100644 --- a/src/bthread/bthread.h +++ b/src/bthread/bthread.h @@ -401,6 +401,9 @@ extern void* bthread_getspecific(bthread_key_t key); // Return current bthread tag extern bthread_tag_t bthread_self_tag(void); +// set task_groups epoll tid by tag +extern void bthread_epoll_tid_set(bthread_tag_t tag, bthread_t tid); + // The first call to bthread_once() by any thread in a process, with a given // once_control, will call the init_routine() with no arguments. Subsequent // calls of bthread_once() with the same once_control will not call the diff --git a/src/bthread/parking_lot.cpp b/src/bthread/parking_lot.cpp new file mode 100644 index 0000000000..3796d0755a --- /dev/null +++ b/src/bthread/parking_lot.cpp @@ -0,0 +1,7 @@ +#include "parking_lot.h" + +namespace bthread { + +butil::atomic ParkingLot::_waiting_count{0}; + +} // namespace bthread \ No newline at end of file diff --git a/src/bthread/parking_lot.h b/src/bthread/parking_lot.h index d597c409f4..d095155a0b 100644 --- a/src/bthread/parking_lot.h +++ b/src/bthread/parking_lot.h @@ -30,6 +30,7 @@ namespace bthread { // Park idle workers. class BAIDU_CACHELINE_ALIGNMENT ParkingLot { public: + static butil::atomic _waiting_count; class State { public: State(): val(0) {} @@ -70,6 +71,7 @@ class BAIDU_CACHELINE_ALIGNMENT ParkingLot { _pending_signal.fetch_or(1); futex_wake_private(&_pending_signal, 10000); } + private: // higher 31 bits for signalling, LSB for stopping. butil::atomic _pending_signal; diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 55ed1f2e42..bb31d73825 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -184,6 +184,7 @@ TaskControl::TaskControl() , _status(print_rq_sizes_in_the_tc, this) , _nbthreads("bthread_count") , _pl(FLAGS_task_group_ntags) + , _epoll_tid_states(FLAGS_task_group_ntags) {} int TaskControl::init(int concurrency) { @@ -430,6 +431,17 @@ int TaskControl::_destroy_group(TaskGroup* g) { bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { auto tag = tls_task_group->tag(); + bool except_state = true; + // epoll tid should be stolen first. + for (auto &epoll_state : _epoll_tid_states[tag]) { + if (epoll_state.second.compare_exchange_strong( + except_state, false, butil::memory_order_seq_cst, + butil::memory_order_relaxed)) { + *tid = epoll_state.first; + return true; + } + } + // 1: Acquiring fence is paired with releasing fence in _add_group to // avoid accessing uninitialized slot of _groups. const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire/*1*/); @@ -575,4 +587,12 @@ bvar::LatencyRecorder* TaskControl::create_exposed_pending_time() { return pt; } +void TaskControl::set_group_epoll_tid(bthread_tag_t tag, bthread_t tid) { + _epoll_tid_states[tag][tid] = false; + auto groups = tag_group(tag); + const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire); + for (size_t i = 0; i < ngroup; i++) { + groups[i]->add_epoll_tid(tid); + } +} } // namespace bthread diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 95820a86f9..75fb32995d 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -30,6 +30,7 @@ #include #include #include +#include #include "butil/atomicops.h" // butil::atomic #include "bvar/bvar.h" // bvar::PassiveStatus #include "bthread/task_tracer.h" @@ -96,11 +97,18 @@ friend bthread_t init_for_pthread_stack_trace(); void stack_trace(std::ostream& os, bthread_t tid); std::string stack_trace(bthread_t tid); #endif // BRPC_BTHREAD_TRACER + // Only deal once when init epoll bthread. + void set_group_epoll_tid(bthread_tag_t tag, bthread_t tid); + + void epoll_waiting(bthread_tag_t tag, bthread_t tid) { + _epoll_tid_states[tag][tid].store(true, butil::memory_order_release); + } private: typedef std::array TaggedGroups; static const int PARKING_LOT_NUM = 4; typedef std::array TaggedParkingLot; + typedef std::unordered_map> EpollTidState; // Add/Remove a TaskGroup. // Returns 0 on success, -1 otherwise. int _add_group(TaskGroup*, bthread_tag_t tag); @@ -160,6 +168,7 @@ friend bthread_t init_for_pthread_stack_trace(); TaskTracer _task_tracer; #endif // BRPC_BTHREAD_TRACER + std::vector _epoll_tid_states; }; inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() { diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 361efa5936..705dc0810c 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -429,12 +429,18 @@ int TaskGroup::start_foreground(TaskGroup** pg, } else { // NOSIGNAL affects current task, not the new task. RemainedFn fn = NULL; - if (g->current_task()->about_to_quit) { + if (g->cur_epoll_tid()) { + fn = ready_to_run_epoll; + } else if (g->current_task()->about_to_quit) { fn = ready_to_run_in_worker_ignoresignal; } else { fn = ready_to_run_in_worker; } - ReadyToRunArgs args = { g->_cur_meta, (bool)(using_attr.flags & BTHREAD_NOSIGNAL) }; + ReadyToRunArgs args = { + g->tag(), + g->_cur_meta, + (bool)(using_attr.flags & BTHREAD_NOSIGNAL) + }; g->set_remained(fn, &args); TaskGroup::sched_to(pg, m->tid); } @@ -559,7 +565,6 @@ void TaskGroup::ending_sched(TaskGroup** pg) { // Jump to main task if there's no task to run. next_tid = g->_main_tid; } - TaskMeta* const cur_meta = g->_cur_meta; TaskMeta* next_meta = address_meta(next_tid); if (next_meta->stack == NULL) { @@ -721,7 +726,9 @@ void TaskGroup::ready_to_run(TaskMeta* meta, bool nosignal) { const int additional_signal = _num_nosignal; _num_nosignal = 0; _nsignaled += 1 + additional_signal; - _control->signal_task(1 + additional_signal, _tag); + if (ParkingLot::_waiting_count.load(std::memory_order_acquire) > 0) { + _control->signal_task(1 + additional_signal, _tag); + } } } @@ -730,7 +737,9 @@ void TaskGroup::flush_nosignal_tasks() { if (val) { _num_nosignal = 0; _nsignaled += val; - _control->signal_task(val, _tag); + if (ParkingLot::_waiting_count.load(std::memory_order_acquire) > 0) { + _control->signal_task(val, _tag); + } } } @@ -754,7 +763,9 @@ void TaskGroup::ready_to_run_remote(TaskMeta* meta, bool nosignal) { _remote_num_nosignal = 0; _remote_nsignaled += 1 + additional_signal; _remote_rq._mutex.unlock(); - _control->signal_task(1 + additional_signal, _tag); + if (ParkingLot::_waiting_count.load(std::memory_order_acquire) > 0) { + _control->signal_task(1 + additional_signal, _tag); + } } } @@ -767,7 +778,9 @@ void TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) { _remote_num_nosignal = 0; _remote_nsignaled += val; locked_mutex.unlock(); - _control->signal_task(val, _tag); + if (ParkingLot::_waiting_count.load(std::memory_order_acquire) > 0) { + _control->signal_task(val, _tag); + } } void TaskGroup::ready_to_run_general(TaskMeta* meta, bool nosignal) { @@ -798,6 +811,11 @@ void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) { return tls_task_group->push_rq(args->meta->tid); } +void TaskGroup::ready_to_run_epoll(void* args_in) { + ReadyToRunArgs* args = static_cast(args_in); + return tls_task_group->control()->epoll_waiting(args->tag, args->meta->tid); +} + struct SleepArgs { uint64_t timeout_us; bthread_t tid; @@ -972,7 +990,7 @@ int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) { void TaskGroup::yield(TaskGroup** pg) { TaskGroup* g = *pg; - ReadyToRunArgs args = { g->_cur_meta, false }; + ReadyToRunArgs args = { g->tag(), g->_cur_meta, false }; g->set_remained(ready_to_run_in_worker, &args); sched(pg); } diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h index c3d2ae463e..0de0692ce4 100644 --- a/src/bthread/task_group.h +++ b/src/bthread/task_group.h @@ -22,6 +22,7 @@ #ifndef BTHREAD_TASK_GROUP_H #define BTHREAD_TASK_GROUP_H +#include #include "butil/time.h" // cpuwide_time_ns #include "bthread/task_control.h" #include "bthread/task_meta.h" // bthread_t, TaskMeta @@ -199,6 +200,10 @@ class TaskGroup { total_ns += butil::cputhread_time_ns() - _last_cpu_clock_ns; return total_ns; } + // Thread Unsafe + void add_epoll_tid(bthread_t tid) { _epoll_tids.emplace(tid); } + + bool cur_epoll_tid() { return _epoll_tids.count(current_tid()) > 0; } private: friend class TaskControl; @@ -218,11 +223,13 @@ friend class TaskControl; static void _release_last_context(void*); static void _add_sleep_event(void*); struct ReadyToRunArgs { + bthread_tag_t tag; TaskMeta* meta; bool nosignal; }; static void ready_to_run_in_worker(void*); static void ready_to_run_in_worker_ignoresignal(void*); + static void ready_to_run_epoll(void*); // Wait for a task to run. // Returns true on success, false is treated as permanent error and the @@ -278,6 +285,7 @@ friend class TaskControl; // Worker thread id. pid_t _tid; + std::unordered_set _epoll_tids; }; } // namespace bthread diff --git a/src/bthread/task_group_inl.h b/src/bthread/task_group_inl.h index 75c377e12f..8ca29fde3a 100644 --- a/src/bthread/task_group_inl.h +++ b/src/bthread/task_group_inl.h @@ -51,7 +51,7 @@ inline void TaskGroup::exchange(TaskGroup** pg, TaskMeta* next_meta) { if (g->is_current_pthread_task()) { return g->ready_to_run(next_meta); } - ReadyToRunArgs args = { g->_cur_meta, false }; + ReadyToRunArgs args = { g->tag(), g->_cur_meta, false }; g->set_remained((g->current_task()->about_to_quit ? ready_to_run_in_worker_ignoresignal : ready_to_run_in_worker), From 0b18defd24146e7dd9653866f531833b5ef64c2a Mon Sep 17 00:00:00 2001 From: "jiazheng.jia" Date: Thu, 14 Nov 2024 10:58:38 +0800 Subject: [PATCH 2/8] add parking lot license --- src/bthread/parking_lot.cpp | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/bthread/parking_lot.cpp b/src/bthread/parking_lot.cpp index 3796d0755a..8f4b8b46d2 100644 --- a/src/bthread/parking_lot.cpp +++ b/src/bthread/parking_lot.cpp @@ -1,3 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// bthread - An M:N threading library to make applications more concurrent. + +// Date: Thu Nov 14 13:40:57 CST 2024 #include "parking_lot.h" namespace bthread { From 95e5b8453bd5fefce568628163c4d932e99ce9a4 Mon Sep 17 00:00:00 2001 From: "jiazheng.jia" Date: Tue, 19 Nov 2024 16:58:59 +0800 Subject: [PATCH 3/8] fix scale workers --- src/bthread/task_control.cpp | 12 ++++++++++++ src/bthread/task_group.cpp | 16 ++++------------ 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index bb31d73825..b2ff1448fc 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -482,6 +482,18 @@ void TaskControl::signal_task(int num_task, bthread_tag_t tag) { if (num_task > 2) { num_task = 2; } + if (ParkingLot::_waiting_count.load(std::memory_order_acquire) == 0) { + if (FLAGS_bthread_min_concurrency > 0 && + _concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) { + // TODO: Reduce this lock + BAIDU_SCOPED_LOCK(g_task_control_mutex); + if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) { + add_workers(1, tag); + } + } else { + return; + } + } auto& pl = tag_pl(tag); int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM; num_task -= pl[start_index].signal(1); diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 705dc0810c..0a82f214f3 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -726,9 +726,7 @@ void TaskGroup::ready_to_run(TaskMeta* meta, bool nosignal) { const int additional_signal = _num_nosignal; _num_nosignal = 0; _nsignaled += 1 + additional_signal; - if (ParkingLot::_waiting_count.load(std::memory_order_acquire) > 0) { - _control->signal_task(1 + additional_signal, _tag); - } + _control->signal_task(1 + additional_signal, _tag); } } @@ -737,9 +735,7 @@ void TaskGroup::flush_nosignal_tasks() { if (val) { _num_nosignal = 0; _nsignaled += val; - if (ParkingLot::_waiting_count.load(std::memory_order_acquire) > 0) { - _control->signal_task(val, _tag); - } + _control->signal_task(val, _tag); } } @@ -763,9 +759,7 @@ void TaskGroup::ready_to_run_remote(TaskMeta* meta, bool nosignal) { _remote_num_nosignal = 0; _remote_nsignaled += 1 + additional_signal; _remote_rq._mutex.unlock(); - if (ParkingLot::_waiting_count.load(std::memory_order_acquire) > 0) { - _control->signal_task(1 + additional_signal, _tag); - } + _control->signal_task(1 + additional_signal, _tag); } } @@ -778,9 +772,7 @@ void TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) { _remote_num_nosignal = 0; _remote_nsignaled += val; locked_mutex.unlock(); - if (ParkingLot::_waiting_count.load(std::memory_order_acquire) > 0) { - _control->signal_task(val, _tag); - } + _control->signal_task(val, _tag); } void TaskGroup::ready_to_run_general(TaskMeta* meta, bool nosignal) { From 76cbfa36ee1c2b0b8d2ba3eeaad5c27399d2a1aa Mon Sep 17 00:00:00 2001 From: "jiazheng.jia" Date: Tue, 19 Nov 2024 17:04:31 +0800 Subject: [PATCH 4/8] fix expected state --- src/bthread/task_control.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index b2ff1448fc..1b8a83cac2 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -431,11 +431,11 @@ int TaskControl::_destroy_group(TaskGroup* g) { bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { auto tag = tls_task_group->tag(); - bool except_state = true; // epoll tid should be stolen first. for (auto &epoll_state : _epoll_tid_states[tag]) { + bool expected_state = true; if (epoll_state.second.compare_exchange_strong( - except_state, false, butil::memory_order_seq_cst, + expected_state, false, butil::memory_order_seq_cst, butil::memory_order_relaxed)) { *tid = epoll_state.first; return true; From 56276a2103c83fe656de14a4f8c9ad2b1d41cfbc Mon Sep 17 00:00:00 2001 From: "jiazheng.jia" Date: Tue, 7 Jan 2025 14:04:29 +0800 Subject: [PATCH 5/8] add global priority queue --- src/bthread/task_control.cpp | 41 +++++++++++++----------------------- src/bthread/task_control.h | 8 +++---- src/bthread/task_group.cpp | 6 +++--- src/bthread/task_group.h | 2 +- 4 files changed, 22 insertions(+), 35 deletions(-) diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 1b8a83cac2..7ef58a0a69 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -183,8 +183,8 @@ TaskControl::TaskControl() , _signal_per_second(&_cumulated_signal_count) , _status(print_rq_sizes_in_the_tc, this) , _nbthreads("bthread_count") + , _priority_qs(FLAGS_task_group_ntags) , _pl(FLAGS_task_group_ntags) - , _epoll_tid_states(FLAGS_task_group_ntags) {} int TaskControl::init(int concurrency) { @@ -208,6 +208,10 @@ int TaskControl::init(int concurrency) { _tagged_worker_usage_second.push_back(new bvar::PerSecond>( "bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1)); _tagged_nbthreads.push_back(new bvar::Adder("bthread_count", tag_str)); + if (_priority_qs[i].init(BTHREAD_MAX_CONCURRENCY) != 0) { + LOG(FATAL) << "Fail to init _priority_q"; + return -1; + } } // Make sure TimerThread is ready. @@ -431,15 +435,9 @@ int TaskControl::_destroy_group(TaskGroup* g) { bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { auto tag = tls_task_group->tag(); - // epoll tid should be stolen first. - for (auto &epoll_state : _epoll_tid_states[tag]) { - bool expected_state = true; - if (epoll_state.second.compare_exchange_strong( - expected_state, false, butil::memory_order_seq_cst, - butil::memory_order_relaxed)) { - *tid = epoll_state.first; - return true; - } + + if (_priority_qs[tag].steal(tid)) { + return true; } // 1: Acquiring fence is paired with releasing fence in _add_group to @@ -482,22 +480,14 @@ void TaskControl::signal_task(int num_task, bthread_tag_t tag) { if (num_task > 2) { num_task = 2; } - if (ParkingLot::_waiting_count.load(std::memory_order_acquire) == 0) { - if (FLAGS_bthread_min_concurrency > 0 && - _concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) { - // TODO: Reduce this lock - BAIDU_SCOPED_LOCK(g_task_control_mutex); - if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) { - add_workers(1, tag); - } - } else { - return; - } - } auto& pl = tag_pl(tag); int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM; - num_task -= pl[start_index].signal(1); - if (num_task > 0) { + // WARNING: This allow some bad case happen when wait_count is not accurente. + auto wait_count = ParkingLot::_waiting_count.load(butil::memory_order_relaxed); + if (wait_count > 0) { + num_task -= pl[start_index].signal(1); + } + if (num_task > 0 && wait_count > 0) { for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) { if (++start_index >= PARKING_LOT_NUM) { start_index = 0; @@ -505,7 +495,7 @@ void TaskControl::signal_task(int num_task, bthread_tag_t tag) { num_task -= pl[start_index].signal(1); } } - if (num_task > 0 && + if (num_task > 0 && wait_count >0 && FLAGS_bthread_min_concurrency > 0 && // test min_concurrency for performance _concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) { // TODO: Reduce this lock @@ -600,7 +590,6 @@ bvar::LatencyRecorder* TaskControl::create_exposed_pending_time() { } void TaskControl::set_group_epoll_tid(bthread_tag_t tag, bthread_t tid) { - _epoll_tid_states[tag][tid] = false; auto groups = tag_group(tag); const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire); for (size_t i = 0; i < ngroup; i++) { diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 75fb32995d..44b21cbdbb 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -100,15 +100,14 @@ friend bthread_t init_for_pthread_stack_trace(); // Only deal once when init epoll bthread. void set_group_epoll_tid(bthread_tag_t tag, bthread_t tid); - void epoll_waiting(bthread_tag_t tag, bthread_t tid) { - _epoll_tid_states[tag][tid].store(true, butil::memory_order_release); + void push_priority_q(bthread_tag_t tag, bthread_t tid) { + _priority_qs[tag].push(tid); } private: typedef std::array TaggedGroups; static const int PARKING_LOT_NUM = 4; typedef std::array TaggedParkingLot; - typedef std::unordered_map> EpollTidState; // Add/Remove a TaskGroup. // Returns 0 on success, -1 otherwise. int _add_group(TaskGroup*, bthread_tag_t tag); @@ -161,14 +160,13 @@ friend bthread_t init_for_pthread_stack_trace(); std::vector*> _tagged_cumulated_worker_time; std::vector>*> _tagged_worker_usage_second; std::vector*> _tagged_nbthreads; + std::vector> _priority_qs; std::vector _pl; #ifdef BRPC_BTHREAD_TRACER TaskTracer _task_tracer; #endif // BRPC_BTHREAD_TRACER - - std::vector _epoll_tid_states; }; inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() { diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 0a82f214f3..90bb6bc21d 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -430,7 +430,7 @@ int TaskGroup::start_foreground(TaskGroup** pg, // NOSIGNAL affects current task, not the new task. RemainedFn fn = NULL; if (g->cur_epoll_tid()) { - fn = ready_to_run_epoll; + fn = priority_to_run; } else if (g->current_task()->about_to_quit) { fn = ready_to_run_in_worker_ignoresignal; } else { @@ -803,9 +803,9 @@ void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) { return tls_task_group->push_rq(args->meta->tid); } -void TaskGroup::ready_to_run_epoll(void* args_in) { +void TaskGroup::priority_to_run(void* args_in) { ReadyToRunArgs* args = static_cast(args_in); - return tls_task_group->control()->epoll_waiting(args->tag, args->meta->tid); + return tls_task_group->control()->push_priority_q(args->tag, args->meta->tid); } struct SleepArgs { diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h index 0de0692ce4..658454650f 100644 --- a/src/bthread/task_group.h +++ b/src/bthread/task_group.h @@ -229,7 +229,7 @@ friend class TaskControl; }; static void ready_to_run_in_worker(void*); static void ready_to_run_in_worker_ignoresignal(void*); - static void ready_to_run_epoll(void*); + static void priority_to_run(void*); // Wait for a task to run. // Returns true on success, false is treated as permanent error and the From 01aba4d7288f7882b6f939eda5e3d6abcf3b6273 Mon Sep 17 00:00:00 2001 From: "jiazheng.jia" Date: Fri, 10 Jan 2025 15:18:51 +0800 Subject: [PATCH 6/8] fix setconcurrency ut error --- src/bthread/task_control.cpp | 11 ++++++----- test/bthread_setconcurrency_unittest.cpp | 1 - 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 7ef58a0a69..b10fc535a5 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -483,19 +483,20 @@ void TaskControl::signal_task(int num_task, bthread_tag_t tag) { auto& pl = tag_pl(tag); int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM; // WARNING: This allow some bad case happen when wait_count is not accurente. - auto wait_count = ParkingLot::_waiting_count.load(butil::memory_order_relaxed); - if (wait_count > 0) { + if (ParkingLot::_waiting_count.load(butil::memory_order_relaxed) > 0) { num_task -= pl[start_index].signal(1); } - if (num_task > 0 && wait_count > 0) { + if (num_task > 0) { for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) { if (++start_index >= PARKING_LOT_NUM) { start_index = 0; } - num_task -= pl[start_index].signal(1); + if (ParkingLot::_waiting_count.load(butil::memory_order_relaxed) > 0) { + num_task -= pl[start_index].signal(1); + } } } - if (num_task > 0 && wait_count >0 && + if (num_task > 0 && FLAGS_bthread_min_concurrency > 0 && // test min_concurrency for performance _concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) { // TODO: Reduce this lock diff --git a/test/bthread_setconcurrency_unittest.cpp b/test/bthread_setconcurrency_unittest.cpp index 4d269b642b..f9272c5e84 100644 --- a/test/bthread_setconcurrency_unittest.cpp +++ b/test/bthread_setconcurrency_unittest.cpp @@ -214,7 +214,6 @@ int concurrency_by_tag(int num) { TEST(BthreadTest, concurrency_by_tag) { ASSERT_EQ(concurrency_by_tag(1), false); - auto tag_con = bthread_getconcurrency_by_tag(0); auto con = bthread_getconcurrency(); ASSERT_EQ(concurrency_by_tag(con), true); ASSERT_EQ(concurrency_by_tag(con + 1), true); From 708725a8c240850a3064b66f988fa9ce0fc73ce0 Mon Sep 17 00:00:00 2001 From: "jiazheng.jia" Date: Wed, 12 Mar 2025 15:59:09 +0800 Subject: [PATCH 7/8] fix rebase error --- src/bthread/parking_lot.cpp | 27 --------------------------- src/bthread/parking_lot.h | 1 - src/bthread/task_control.cpp | 8 ++------ 3 files changed, 2 insertions(+), 34 deletions(-) delete mode 100644 src/bthread/parking_lot.cpp diff --git a/src/bthread/parking_lot.cpp b/src/bthread/parking_lot.cpp deleted file mode 100644 index 8f4b8b46d2..0000000000 --- a/src/bthread/parking_lot.cpp +++ /dev/null @@ -1,27 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// bthread - An M:N threading library to make applications more concurrent. - -// Date: Thu Nov 14 13:40:57 CST 2024 -#include "parking_lot.h" - -namespace bthread { - -butil::atomic ParkingLot::_waiting_count{0}; - -} // namespace bthread \ No newline at end of file diff --git a/src/bthread/parking_lot.h b/src/bthread/parking_lot.h index d095155a0b..b8c3f36c5f 100644 --- a/src/bthread/parking_lot.h +++ b/src/bthread/parking_lot.h @@ -30,7 +30,6 @@ namespace bthread { // Park idle workers. class BAIDU_CACHELINE_ALIGNMENT ParkingLot { public: - static butil::atomic _waiting_count; class State { public: State(): val(0) {} diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index b10fc535a5..2bbd2118f0 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -483,17 +483,13 @@ void TaskControl::signal_task(int num_task, bthread_tag_t tag) { auto& pl = tag_pl(tag); int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM; // WARNING: This allow some bad case happen when wait_count is not accurente. - if (ParkingLot::_waiting_count.load(butil::memory_order_relaxed) > 0) { - num_task -= pl[start_index].signal(1); - } + num_task -= pl[start_index].signal(1); if (num_task > 0) { for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) { if (++start_index >= PARKING_LOT_NUM) { start_index = 0; } - if (ParkingLot::_waiting_count.load(butil::memory_order_relaxed) > 0) { - num_task -= pl[start_index].signal(1); - } + num_task -= pl[start_index].signal(1); } } if (num_task > 0 && From d10137cc5a5d6e287a0551e577d50c3ba2079888 Mon Sep 17 00:00:00 2001 From: "jiazheng.jia" Date: Fri, 14 Mar 2025 16:32:21 +0800 Subject: [PATCH 8/8] add global priority bthread flags --- src/brpc/event_dispatcher.cpp | 1 - src/brpc/event_dispatcher.h | 2 -- src/brpc/event_dispatcher_epoll.cpp | 5 +++-- src/bthread/bthread.cpp | 6 ------ src/bthread/bthread.h | 3 --- src/bthread/task_control.cpp | 8 -------- src/bthread/task_control.h | 4 +--- src/bthread/task_group.cpp | 4 +++- src/bthread/task_group.h | 6 +----- src/bthread/types.h | 5 +++++ 10 files changed, 13 insertions(+), 31 deletions(-) diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp index 76e889bb97..a8f1b9dc0e 100644 --- a/src/brpc/event_dispatcher.cpp +++ b/src/brpc/event_dispatcher.cpp @@ -62,7 +62,6 @@ void InitializeGlobalDispatchers() { FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags; CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + j].Start(&attr)); - bthread_epoll_tid_set(i, g_edisp[i].Tid()); } } // This atexit is will be run before g_task_control.stop() because above diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h index 61a0710b58..51c404e2a0 100644 --- a/src/brpc/event_dispatcher.h +++ b/src/brpc/event_dispatcher.h @@ -133,8 +133,6 @@ template friend class IOEvent; // Returns 0 on success, -1 otherwise and errno is set int UnregisterEvent(IOEventDataId event_data_id, int fd, bool pollin); - bthread_t Tid() const { return _tid; } - private: DISALLOW_COPY_AND_ASSIGN(EventDispatcher); diff --git a/src/brpc/event_dispatcher_epoll.cpp b/src/brpc/event_dispatcher_epoll.cpp index 0ea404fff6..fac9512d54 100644 --- a/src/brpc/event_dispatcher_epoll.cpp +++ b/src/brpc/event_dispatcher_epoll.cpp @@ -26,7 +26,7 @@ EventDispatcher::EventDispatcher() : _event_dispatcher_fd(-1) , _stop(false) , _tid(0) - , _thread_attr(BTHREAD_ATTR_NORMAL) { + , _thread_attr(BTHREAD_ATTR_EPOLL) { _event_dispatcher_fd = epoll_create(1024 * 1024); if (_event_dispatcher_fd < 0) { PLOG(FATAL) << "Fail to create epoll"; @@ -70,7 +70,8 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { // Set _thread_attr before creating epoll thread to make sure // everyting seems sane to the thread. _thread_attr = consumer_thread_attr ? - *consumer_thread_attr : BTHREAD_ATTR_NORMAL; + *consumer_thread_attr : _thread_attr; + _thread_attr = _thread_attr | BTHREAD_GLOBAL_PRIORITY; //_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread // that created by epoll_wait() never to quit. diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index f127011038..e124f4bc6f 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -625,10 +625,4 @@ uint64_t bthread_cpu_clock_ns(void) { return 0; } -void bthread_epoll_tid_set(bthread_tag_t tag, bthread_t tid) { - CHECK(tag >= BTHREAD_TAG_DEFAULT && tag < FLAGS_task_group_ntags); - auto c = bthread::get_task_control(); - return c->set_group_epoll_tid(tag, tid); -} - } // extern "C" diff --git a/src/bthread/bthread.h b/src/bthread/bthread.h index b05df28c89..7e42c96c9f 100644 --- a/src/bthread/bthread.h +++ b/src/bthread/bthread.h @@ -401,9 +401,6 @@ extern void* bthread_getspecific(bthread_key_t key); // Return current bthread tag extern bthread_tag_t bthread_self_tag(void); -// set task_groups epoll tid by tag -extern void bthread_epoll_tid_set(bthread_tag_t tag, bthread_t tid); - // The first call to bthread_once() by any thread in a process, with a given // once_control, will call the init_routine() with no arguments. Subsequent // calls of bthread_once() with the same once_control will not call the diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 2bbd2118f0..e04793379e 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -482,7 +482,6 @@ void TaskControl::signal_task(int num_task, bthread_tag_t tag) { } auto& pl = tag_pl(tag); int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM; - // WARNING: This allow some bad case happen when wait_count is not accurente. num_task -= pl[start_index].signal(1); if (num_task > 0) { for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) { @@ -586,11 +585,4 @@ bvar::LatencyRecorder* TaskControl::create_exposed_pending_time() { return pt; } -void TaskControl::set_group_epoll_tid(bthread_tag_t tag, bthread_t tid) { - auto groups = tag_group(tag); - const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire); - for (size_t i = 0; i < ngroup; i++) { - groups[i]->add_epoll_tid(tid); - } -} } // namespace bthread diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 44b21cbdbb..87ddee5450 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -30,7 +30,6 @@ #include #include #include -#include #include "butil/atomicops.h" // butil::atomic #include "bvar/bvar.h" // bvar::PassiveStatus #include "bthread/task_tracer.h" @@ -97,8 +96,6 @@ friend bthread_t init_for_pthread_stack_trace(); void stack_trace(std::ostream& os, bthread_t tid); std::string stack_trace(bthread_t tid); #endif // BRPC_BTHREAD_TRACER - // Only deal once when init epoll bthread. - void set_group_epoll_tid(bthread_tag_t tag, bthread_t tid); void push_priority_q(bthread_tag_t tag, bthread_t tid) { _priority_qs[tag].push(tid); @@ -167,6 +164,7 @@ friend bthread_t init_for_pthread_stack_trace(); #ifdef BRPC_BTHREAD_TRACER TaskTracer _task_tracer; #endif // BRPC_BTHREAD_TRACER + }; inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() { diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 90bb6bc21d..0b60e2f518 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -429,7 +429,8 @@ int TaskGroup::start_foreground(TaskGroup** pg, } else { // NOSIGNAL affects current task, not the new task. RemainedFn fn = NULL; - if (g->cur_epoll_tid()) { + auto& cur_attr = g->get_current_attr(); + if (cur_attr.flags & BTHREAD_GLOBAL_PRIORITY) { fn = priority_to_run; } else if (g->current_task()->about_to_quit) { fn = ready_to_run_in_worker_ignoresignal; @@ -565,6 +566,7 @@ void TaskGroup::ending_sched(TaskGroup** pg) { // Jump to main task if there's no task to run. next_tid = g->_main_tid; } + TaskMeta* const cur_meta = g->_cur_meta; TaskMeta* next_meta = address_meta(next_tid); if (next_meta->stack == NULL) { diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h index 658454650f..58a0426238 100644 --- a/src/bthread/task_group.h +++ b/src/bthread/task_group.h @@ -22,7 +22,6 @@ #ifndef BTHREAD_TASK_GROUP_H #define BTHREAD_TASK_GROUP_H -#include #include "butil/time.h" // cpuwide_time_ns #include "bthread/task_control.h" #include "bthread/task_meta.h" // bthread_t, TaskMeta @@ -200,10 +199,8 @@ class TaskGroup { total_ns += butil::cputhread_time_ns() - _last_cpu_clock_ns; return total_ns; } - // Thread Unsafe - void add_epoll_tid(bthread_t tid) { _epoll_tids.emplace(tid); } - bool cur_epoll_tid() { return _epoll_tids.count(current_tid()) > 0; } + const bthread_attr_t& get_current_attr() { return _cur_meta->attr; } private: friend class TaskControl; @@ -285,7 +282,6 @@ friend class TaskControl; // Worker thread id. pid_t _tid; - std::unordered_set _epoll_tids; }; } // namespace bthread diff --git a/src/bthread/types.h b/src/bthread/types.h index 60f5c7fee6..c0f23f1c29 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -53,6 +53,7 @@ static const bthread_attrflags_t BTHREAD_LOG_CONTEXT_SWITCH = 16; static const bthread_attrflags_t BTHREAD_NOSIGNAL = 32; static const bthread_attrflags_t BTHREAD_NEVER_QUIT = 64; static const bthread_attrflags_t BTHREAD_INHERIT_SPAN = 128; +static const bthread_attrflags_t BTHREAD_GLOBAL_PRIORITY = 256; // Key of thread-local data, created by bthread_key_create. typedef struct { @@ -137,6 +138,10 @@ static const bthread_attr_t BTHREAD_ATTR_NORMAL = {BTHREAD_STACKTYPE_NORMAL, 0, static const bthread_attr_t BTHREAD_ATTR_LARGE = {BTHREAD_STACKTYPE_LARGE, 0, NULL, BTHREAD_TAG_INVALID}; +// epoll bthread +static const bthread_attr_t BTHREAD_ATTR_EPOLL = { + BTHREAD_STACKTYPE_NORMAL, BTHREAD_GLOBAL_PRIORITY, NULL, BTHREAD_TAG_INVALID}; + // bthreads created with this attribute will print log when it's started, // context-switched, finished. static const bthread_attr_t BTHREAD_ATTR_DEBUG = {