-
Notifications
You must be signed in to change notification settings - Fork 4k
/
Copy pathtask_group.h
295 lines (242 loc) · 10.3 KB
/
task_group.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// bthread - An M:N threading library to make applications more concurrent.
// Date: Tue Jul 10 17:40:58 CST 2012
#ifndef BTHREAD_TASK_GROUP_H
#define BTHREAD_TASK_GROUP_H
#include <unordered_set>
#include "butil/time.h" // cpuwide_time_ns
#include "bthread/task_control.h"
#include "bthread/task_meta.h" // bthread_t, TaskMeta
#include "bthread/work_stealing_queue.h" // WorkStealingQueue
#include "bthread/remote_task_queue.h" // RemoteTaskQueue
#include "butil/resource_pool.h" // ResourceId
#include "bthread/parking_lot.h"
namespace bthread {
// For exiting a bthread.
class ExitException : public std::exception {
public:
explicit ExitException(void* value) : _value(value) {}
~ExitException() throw() {}
const char* what() const throw() override {
return "ExitException";
}
void* value() const {
return _value;
}
private:
void* _value;
};
// Thread-local group of tasks.
// Notice that most methods involving context switching are static otherwise
// pointer `this' may change after wakeup. The **pg parameters in following
// function are updated before returning.
class TaskGroup {
public:
// Create task `fn(arg)' with attributes `attr' in TaskGroup *pg and put
// the identifier into `tid'. Switch to the new task and schedule old task
// to run.
// Return 0 on success, errno otherwise.
static int start_foreground(TaskGroup** pg,
bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg);
// Create task `fn(arg)' with attributes `attr' in this TaskGroup, put the
// identifier into `tid'. Schedule the new thread to run.
// Called from worker: start_background<false>
// Called from non-worker: start_background<true>
// Return 0 on success, errno otherwise.
template <bool REMOTE>
int start_background(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg);
// Suspend caller and run next bthread in TaskGroup *pg.
static void sched(TaskGroup** pg);
static void ending_sched(TaskGroup** pg);
// Suspend caller and run bthread `next_tid' in TaskGroup *pg.
// Purpose of this function is to avoid pushing `next_tid' to _rq and
// then being popped by sched(pg), which is not necessary.
static void sched_to(TaskGroup** pg, TaskMeta* next_meta);
static void sched_to(TaskGroup** pg, bthread_t next_tid);
static void exchange(TaskGroup** pg, TaskMeta* next_meta);
// The callback will be run in the beginning of next-run bthread.
// Can't be called by current bthread directly because it often needs
// the target to be suspended already.
typedef void (*RemainedFn)(void*);
void set_remained(RemainedFn cb, void* arg) {
_last_context_remained = cb;
_last_context_remained_arg = arg;
}
// Suspend caller for at least |timeout_us| microseconds.
// If |timeout_us| is 0, this function does nothing.
// If |group| is NULL or current thread is non-bthread, call usleep(3)
// instead. This function does not create thread-local TaskGroup.
// Returns: 0 on success, -1 otherwise and errno is set.
static int usleep(TaskGroup** pg, uint64_t timeout_us);
// Suspend caller and run another bthread. When the caller will resume
// is undefined.
static void yield(TaskGroup** pg);
// Suspend caller until bthread `tid' terminates.
static int join(bthread_t tid, void** return_value);
// Returns true iff the bthread `tid' still exists. Notice that it is
// just the result at this very moment which may change soon.
// Don't use this function unless you have to. Never write code like this:
// if (exists(tid)) {
// Wait for events of the thread. // Racy, may block indefinitely.
// }
static bool exists(bthread_t tid);
// Put attribute associated with `tid' into `*attr'.
// Returns 0 on success, -1 otherwise and errno is set.
static int get_attr(bthread_t tid, bthread_attr_t* attr);
// Get/set TaskMeta.stop of the tid.
static void set_stopped(bthread_t tid);
static bool is_stopped(bthread_t tid);
// The bthread running run_main_task();
bthread_t main_tid() const { return _main_tid; }
TaskStatistics main_stat() const;
// Routine of the main task which should be called from a dedicated pthread.
void run_main_task();
// current_task is a function in macOS 10.0+
#ifdef current_task
#undef current_task
#endif
// Meta/Identifier of current task in this group.
TaskMeta* current_task() const { return _cur_meta; }
bthread_t current_tid() const { return _cur_meta->tid; }
// Uptime of current task in nanoseconds.
int64_t current_uptime_ns() const
{ return butil::cpuwide_time_ns() - _cur_meta->cpuwide_start_ns; }
// True iff current task is the one running run_main_task()
bool is_current_main_task() const { return current_tid() == _main_tid; }
// True iff current task is in pthread-mode.
bool is_current_pthread_task() const
{ return _cur_meta->stack == _main_stack; }
// Active time in nanoseconds spent by this TaskGroup.
int64_t cumulated_cputime_ns() const { return _cumulated_cputime_ns; }
// Push a bthread into the runqueue
void ready_to_run(TaskMeta* meta, bool nosignal = false);
// Flush tasks pushed to rq but signalled.
void flush_nosignal_tasks();
// Push a bthread into the runqueue from another non-worker thread.
void ready_to_run_remote(TaskMeta* meta, bool nosignal = false);
void flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex);
void flush_nosignal_tasks_remote();
// Automatically decide the caller is remote or local, and call
// the corresponding function.
void ready_to_run_general(TaskMeta* meta, bool nosignal = false);
void flush_nosignal_tasks_general();
// The TaskControl that this TaskGroup belongs to.
TaskControl* control() const { return _control; }
// Call this instead of delete.
void destroy_self();
// Wake up blocking ops in the thread.
// Returns 0 on success, errno otherwise.
static int interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag);
// Get the meta associate with the task.
static TaskMeta* address_meta(bthread_t tid);
// Push a task into _rq, if _rq is full, retry after some time. This
// process make go on indefinitely.
void push_rq(bthread_t tid);
// Returns size of local run queue.
size_t rq_size() const {
return _rq.volatile_size();
}
bthread_tag_t tag() const { return _tag; }
pid_t tid() const { return _tid; }
int64_t current_task_cpu_clock_ns() {
if (_last_cpu_clock_ns == 0) {
return 0;
}
int64_t total_ns = _cur_meta->stat.cpu_usage_ns;
total_ns += butil::cputhread_time_ns() - _last_cpu_clock_ns;
return total_ns;
}
// Thread Unsafe
void add_epoll_tid(bthread_t tid) { _epoll_tids.emplace(tid); }
bool cur_epoll_tid() { return _epoll_tids.count(current_tid()) > 0; }
private:
friend class TaskControl;
// You shall use TaskControl::create_group to create new instance.
explicit TaskGroup(TaskControl*);
int init(size_t runqueue_capacity);
// You shall call destroy_selfm() instead of destructor because deletion
// of groups are postponed to avoid race.
~TaskGroup();
static void task_runner(intptr_t skip_remained);
// Callbacks for set_remained()
static void _release_last_context(void*);
static void _add_sleep_event(void*);
struct ReadyToRunArgs {
bthread_tag_t tag;
TaskMeta* meta;
bool nosignal;
};
static void ready_to_run_in_worker(void*);
static void ready_to_run_in_worker_ignoresignal(void*);
static void priority_to_run(void*);
// Wait for a task to run.
// Returns true on success, false is treated as permanent error and the
// loop calling this function should end.
bool wait_task(bthread_t* tid);
bool steal_task(bthread_t* tid) {
if (_remote_rq.pop(tid)) {
return true;
}
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
_last_pl_state = _pl->get_state();
#endif
return _control->steal_task(tid, &_steal_seed, _steal_offset);
}
void set_tag(bthread_tag_t tag) { _tag = tag; }
void set_pl(ParkingLot* pl) { _pl = pl; }
TaskMeta* _cur_meta;
// the control that this group belongs to
TaskControl* _control;
int _num_nosignal;
int _nsignaled;
// last scheduling time
int64_t _last_run_ns;
int64_t _cumulated_cputime_ns;
// last thread cpu clock
int64_t _last_cpu_clock_ns;
size_t _nswitch;
RemainedFn _last_context_remained;
void* _last_context_remained_arg;
ParkingLot* _pl;
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
ParkingLot::State _last_pl_state;
#endif
size_t _steal_seed;
size_t _steal_offset;
ContextualStack* _main_stack;
bthread_t _main_tid;
WorkStealingQueue<bthread_t> _rq;
RemoteTaskQueue _remote_rq;
int _remote_num_nosignal;
int _remote_nsignaled;
int _sched_recursive_guard;
// tag of this taskgroup
bthread_tag_t _tag;
// Worker thread id.
pid_t _tid;
std::unordered_set<bthread_t> _epoll_tids;
};
} // namespace bthread
#include "task_group_inl.h"
#endif // BTHREAD_TASK_GROUP_H