diff --git a/src/brpc/event_dispatcher_epoll.cpp b/src/brpc/event_dispatcher_epoll.cpp index 0ea404fff6..fac9512d54 100644 --- a/src/brpc/event_dispatcher_epoll.cpp +++ b/src/brpc/event_dispatcher_epoll.cpp @@ -26,7 +26,7 @@ EventDispatcher::EventDispatcher() : _event_dispatcher_fd(-1) , _stop(false) , _tid(0) - , _thread_attr(BTHREAD_ATTR_NORMAL) { + , _thread_attr(BTHREAD_ATTR_EPOLL) { _event_dispatcher_fd = epoll_create(1024 * 1024); if (_event_dispatcher_fd < 0) { PLOG(FATAL) << "Fail to create epoll"; @@ -70,7 +70,8 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { // Set _thread_attr before creating epoll thread to make sure // everyting seems sane to the thread. _thread_attr = consumer_thread_attr ? - *consumer_thread_attr : BTHREAD_ATTR_NORMAL; + *consumer_thread_attr : _thread_attr; + _thread_attr = _thread_attr | BTHREAD_GLOBAL_PRIORITY; //_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread // that created by epoll_wait() never to quit. diff --git a/src/bthread/parking_lot.h b/src/bthread/parking_lot.h index d597c409f4..b8c3f36c5f 100644 --- a/src/bthread/parking_lot.h +++ b/src/bthread/parking_lot.h @@ -70,6 +70,7 @@ class BAIDU_CACHELINE_ALIGNMENT ParkingLot { _pending_signal.fetch_or(1); futex_wake_private(&_pending_signal, 10000); } + private: // higher 31 bits for signalling, LSB for stopping. butil::atomic _pending_signal; diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 55ed1f2e42..e04793379e 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -183,6 +183,7 @@ TaskControl::TaskControl() , _signal_per_second(&_cumulated_signal_count) , _status(print_rq_sizes_in_the_tc, this) , _nbthreads("bthread_count") + , _priority_qs(FLAGS_task_group_ntags) , _pl(FLAGS_task_group_ntags) {} @@ -207,6 +208,10 @@ int TaskControl::init(int concurrency) { _tagged_worker_usage_second.push_back(new bvar::PerSecond>( "bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1)); _tagged_nbthreads.push_back(new bvar::Adder("bthread_count", tag_str)); + if (_priority_qs[i].init(BTHREAD_MAX_CONCURRENCY) != 0) { + LOG(FATAL) << "Fail to init _priority_q"; + return -1; + } } // Make sure TimerThread is ready. @@ -430,6 +435,11 @@ int TaskControl::_destroy_group(TaskGroup* g) { bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { auto tag = tls_task_group->tag(); + + if (_priority_qs[tag].steal(tid)) { + return true; + } + // 1: Acquiring fence is paired with releasing fence in _add_group to // avoid accessing uninitialized slot of _groups. const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire/*1*/); diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 95820a86f9..87ddee5450 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -97,6 +97,10 @@ friend bthread_t init_for_pthread_stack_trace(); std::string stack_trace(bthread_t tid); #endif // BRPC_BTHREAD_TRACER + void push_priority_q(bthread_tag_t tag, bthread_t tid) { + _priority_qs[tag].push(tid); + } + private: typedef std::array TaggedGroups; static const int PARKING_LOT_NUM = 4; @@ -153,6 +157,7 @@ friend bthread_t init_for_pthread_stack_trace(); std::vector*> _tagged_cumulated_worker_time; std::vector>*> _tagged_worker_usage_second; std::vector*> _tagged_nbthreads; + std::vector> _priority_qs; std::vector _pl; diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 361efa5936..0b60e2f518 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -429,12 +429,19 @@ int TaskGroup::start_foreground(TaskGroup** pg, } else { // NOSIGNAL affects current task, not the new task. RemainedFn fn = NULL; - if (g->current_task()->about_to_quit) { + auto& cur_attr = g->get_current_attr(); + if (cur_attr.flags & BTHREAD_GLOBAL_PRIORITY) { + fn = priority_to_run; + } else if (g->current_task()->about_to_quit) { fn = ready_to_run_in_worker_ignoresignal; } else { fn = ready_to_run_in_worker; } - ReadyToRunArgs args = { g->_cur_meta, (bool)(using_attr.flags & BTHREAD_NOSIGNAL) }; + ReadyToRunArgs args = { + g->tag(), + g->_cur_meta, + (bool)(using_attr.flags & BTHREAD_NOSIGNAL) + }; g->set_remained(fn, &args); TaskGroup::sched_to(pg, m->tid); } @@ -798,6 +805,11 @@ void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) { return tls_task_group->push_rq(args->meta->tid); } +void TaskGroup::priority_to_run(void* args_in) { + ReadyToRunArgs* args = static_cast(args_in); + return tls_task_group->control()->push_priority_q(args->tag, args->meta->tid); +} + struct SleepArgs { uint64_t timeout_us; bthread_t tid; @@ -972,7 +984,7 @@ int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) { void TaskGroup::yield(TaskGroup** pg) { TaskGroup* g = *pg; - ReadyToRunArgs args = { g->_cur_meta, false }; + ReadyToRunArgs args = { g->tag(), g->_cur_meta, false }; g->set_remained(ready_to_run_in_worker, &args); sched(pg); } diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h index c3d2ae463e..58a0426238 100644 --- a/src/bthread/task_group.h +++ b/src/bthread/task_group.h @@ -200,6 +200,8 @@ class TaskGroup { return total_ns; } + const bthread_attr_t& get_current_attr() { return _cur_meta->attr; } + private: friend class TaskControl; @@ -218,11 +220,13 @@ friend class TaskControl; static void _release_last_context(void*); static void _add_sleep_event(void*); struct ReadyToRunArgs { + bthread_tag_t tag; TaskMeta* meta; bool nosignal; }; static void ready_to_run_in_worker(void*); static void ready_to_run_in_worker_ignoresignal(void*); + static void priority_to_run(void*); // Wait for a task to run. // Returns true on success, false is treated as permanent error and the diff --git a/src/bthread/task_group_inl.h b/src/bthread/task_group_inl.h index 75c377e12f..8ca29fde3a 100644 --- a/src/bthread/task_group_inl.h +++ b/src/bthread/task_group_inl.h @@ -51,7 +51,7 @@ inline void TaskGroup::exchange(TaskGroup** pg, TaskMeta* next_meta) { if (g->is_current_pthread_task()) { return g->ready_to_run(next_meta); } - ReadyToRunArgs args = { g->_cur_meta, false }; + ReadyToRunArgs args = { g->tag(), g->_cur_meta, false }; g->set_remained((g->current_task()->about_to_quit ? ready_to_run_in_worker_ignoresignal : ready_to_run_in_worker), diff --git a/src/bthread/types.h b/src/bthread/types.h index 60f5c7fee6..c0f23f1c29 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -53,6 +53,7 @@ static const bthread_attrflags_t BTHREAD_LOG_CONTEXT_SWITCH = 16; static const bthread_attrflags_t BTHREAD_NOSIGNAL = 32; static const bthread_attrflags_t BTHREAD_NEVER_QUIT = 64; static const bthread_attrflags_t BTHREAD_INHERIT_SPAN = 128; +static const bthread_attrflags_t BTHREAD_GLOBAL_PRIORITY = 256; // Key of thread-local data, created by bthread_key_create. typedef struct { @@ -137,6 +138,10 @@ static const bthread_attr_t BTHREAD_ATTR_NORMAL = {BTHREAD_STACKTYPE_NORMAL, 0, static const bthread_attr_t BTHREAD_ATTR_LARGE = {BTHREAD_STACKTYPE_LARGE, 0, NULL, BTHREAD_TAG_INVALID}; +// epoll bthread +static const bthread_attr_t BTHREAD_ATTR_EPOLL = { + BTHREAD_STACKTYPE_NORMAL, BTHREAD_GLOBAL_PRIORITY, NULL, BTHREAD_TAG_INVALID}; + // bthreads created with this attribute will print log when it's started, // context-switched, finished. static const bthread_attr_t BTHREAD_ATTR_DEBUG = { diff --git a/test/bthread_setconcurrency_unittest.cpp b/test/bthread_setconcurrency_unittest.cpp index 4d269b642b..f9272c5e84 100644 --- a/test/bthread_setconcurrency_unittest.cpp +++ b/test/bthread_setconcurrency_unittest.cpp @@ -214,7 +214,6 @@ int concurrency_by_tag(int num) { TEST(BthreadTest, concurrency_by_tag) { ASSERT_EQ(concurrency_by_tag(1), false); - auto tag_con = bthread_getconcurrency_by_tag(0); auto con = bthread_getconcurrency(); ASSERT_EQ(concurrency_by_tag(con), true); ASSERT_EQ(concurrency_by_tag(con + 1), true);