-
Notifications
You must be signed in to change notification settings - Fork 4k
epoll bthread deal first #2819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
epoll bthread deal first #2819
Changes from 7 commits
c8531be
0b18def
95e5b84
76cbfa3
56276a2
01aba4d
708725a
d10137c
c331f11
4bf0042
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -183,6 +183,7 @@ TaskControl::TaskControl() | |
, _signal_per_second(&_cumulated_signal_count) | ||
, _status(print_rq_sizes_in_the_tc, this) | ||
, _nbthreads("bthread_count") | ||
, _priority_qs(FLAGS_task_group_ntags) | ||
, _pl(FLAGS_task_group_ntags) | ||
{} | ||
|
||
|
@@ -207,6 +208,10 @@ int TaskControl::init(int concurrency) { | |
_tagged_worker_usage_second.push_back(new bvar::PerSecond<bvar::PassiveStatus<double>>( | ||
"bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1)); | ||
_tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count", tag_str)); | ||
if (_priority_qs[i].init(BTHREAD_MAX_CONCURRENCY) != 0) { | ||
LOG(FATAL) << "Fail to init _priority_q"; | ||
return -1; | ||
} | ||
} | ||
|
||
// Make sure TimerThread is ready. | ||
|
@@ -430,6 +435,11 @@ int TaskControl::_destroy_group(TaskGroup* g) { | |
|
||
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { | ||
auto tag = tls_task_group->tag(); | ||
|
||
if (_priority_qs[tag].steal(tid)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 所有线程都访问这个是否会造成全局竞争? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 不是说线程安全的问题,我的意思是这个可能影响性能。我不确定,因为实际场景有很多,可能在你的场景里这个性能不是问题,但是在某些场景里可能会有问题,比如event_dispatcher_num很大,有很多epoll thread的情况? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 同一个 tag 下的 worker 肯定会存在竞争的,这个应该算是预期之内,性能损失量级跟下文的 g->_rq.steal(tid) 相当,原来 steal 的竞争产生的损失约为 N * M(N 为 tag 数量,常数级,M 为 g->_rq.steal(tid) 单次时间),现在为 (N + 1) * M |
||
return true; | ||
} | ||
|
||
// 1: Acquiring fence is paired with releasing fence in _add_group to | ||
// avoid accessing uninitialized slot of _groups. | ||
const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire/*1*/); | ||
|
@@ -472,6 +482,7 @@ void TaskControl::signal_task(int num_task, bthread_tag_t tag) { | |
} | ||
auto& pl = tag_pl(tag); | ||
int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM; | ||
// WARNING: This allow some bad case happen when wait_count is not accurente. | ||
num_task -= pl[start_index].signal(1); | ||
if (num_task > 0) { | ||
for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) { | ||
|
@@ -575,4 +586,11 @@ bvar::LatencyRecorder* TaskControl::create_exposed_pending_time() { | |
return pt; | ||
} | ||
|
||
void TaskControl::set_group_epoll_tid(bthread_tag_t tag, bthread_t tid) { | ||
auto groups = tag_group(tag); | ||
const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire); | ||
chenBright marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for (size_t i = 0; i < ngroup; i++) { | ||
groups[i]->add_epoll_tid(tid); | ||
} | ||
} | ||
} // namespace bthread |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
#include <vector> | ||
#include <array> | ||
#include <memory> | ||
#include <unordered_map> | ||
#include "butil/atomicops.h" // butil::atomic | ||
#include "bvar/bvar.h" // bvar::PassiveStatus | ||
#include "bthread/task_tracer.h" | ||
|
@@ -96,6 +97,12 @@ friend bthread_t init_for_pthread_stack_trace(); | |
void stack_trace(std::ostream& os, bthread_t tid); | ||
std::string stack_trace(bthread_t tid); | ||
#endif // BRPC_BTHREAD_TRACER | ||
// Only deal once when init epoll bthread. | ||
void set_group_epoll_tid(bthread_tag_t tag, bthread_t tid); | ||
|
||
void push_priority_q(bthread_tag_t tag, bthread_t tid) { | ||
chenBright marked this conversation as resolved.
Show resolved
Hide resolved
|
||
_priority_qs[tag].push(tid); | ||
chenBright marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
private: | ||
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups; | ||
|
@@ -153,13 +160,13 @@ friend bthread_t init_for_pthread_stack_trace(); | |
std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time; | ||
std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> _tagged_worker_usage_second; | ||
std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads; | ||
std::vector<WorkStealingQueue<bthread_t>> _priority_qs; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. _priority_queues |
||
|
||
std::vector<TaggedParkingLot> _pl; | ||
|
||
#ifdef BRPC_BTHREAD_TRACER | ||
TaskTracer _task_tracer; | ||
#endif // BRPC_BTHREAD_TRACER | ||
|
||
}; | ||
|
||
inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() { | ||
|
Uh oh!
There was an error while loading. Please reload this page.