forked from compio-rs/compio
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmod.rs
More file actions
261 lines (226 loc) · 8.01 KB
/
mod.rs
File metadata and controls
261 lines (226 loc) · 8.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
use std::{
cell::RefCell,
future::Future,
marker::PhantomData,
rc::Rc,
sync::{Arc, Weak},
task::Waker,
};
use async_task::{Runnable, Task};
use compio_driver::NotifyHandle;
use crossbeam_queue::SegQueue;
use slab::Slab;
use crate::runtime::scheduler::{
drop_hook::DropHook, local_queue::LocalQueue, send_wrapper::SendWrapper,
};
mod drop_hook;
mod local_queue;
mod send_wrapper;
/// A task queue consisting of a local queue and a synchronized queue.
struct TaskQueue {
local_queue: Arc<SendWrapper<LocalQueue<Runnable>>>,
sync_queue: Arc<SegQueue<Runnable>>,
}
impl TaskQueue {
/// Creates a new `TaskQueue`.
fn new() -> Self {
Self {
local_queue: Arc::new(SendWrapper::new(LocalQueue::new())),
sync_queue: Arc::new(SegQueue::new()),
}
}
/// Pops at most one task from each queue and returns them as `(local_task,
/// sync_task)`.
///
/// # Safety
///
/// Call this method in the same thread as the creator.
unsafe fn pop(&self) -> (Option<Runnable>, Option<Runnable>) {
// SAFETY: See the safety comment of this method.
let local_queue = unsafe { self.local_queue.get_unchecked() };
let local_task = local_queue.pop();
// Perform an empty check as a fast path, since `SegQueue::pop()` is more
// expensive.
let sync_task = if self.sync_queue.is_empty() {
None
} else {
self.sync_queue.pop()
};
(local_task, sync_task)
}
/// Returns `true` if both queues are empty.
///
/// # Safety
///
/// Call this method in the same thread as the creator.
unsafe fn is_empty(&self) -> bool {
// SAFETY: See the safety comment of this method.
let local_queue = unsafe { self.local_queue.get_unchecked() };
local_queue.is_empty() && self.sync_queue.is_empty()
}
/// Clears both queues.
///
/// # Safety
///
/// Call this method in the same thread as the creator.
unsafe fn clear(&self) {
// SAFETY: See the safety comment of this method.
let local_queue = unsafe { self.local_queue.get_unchecked() };
while let Some(item) = local_queue.pop() {
drop(item);
}
while let Some(item) = self.sync_queue.pop() {
drop(item);
}
}
/// Downgrades the `TaskQueue` into a `WeakTaskQueue`.
fn downgrade(&self) -> WeakTaskQueue {
WeakTaskQueue {
local_queue: Arc::downgrade(&self.local_queue),
sync_queue: Arc::downgrade(&self.sync_queue),
local_thread: self.local_queue.tracker(),
}
}
}
/// A weak reference to a `TaskQueue`.
struct WeakTaskQueue {
local_queue: Weak<SendWrapper<LocalQueue<Runnable>>>,
sync_queue: Weak<SegQueue<Runnable>>,
// `()` is a trivial type, so it won't panic on drop even if moved to another thread.
local_thread: SendWrapper<()>,
}
impl WeakTaskQueue {
/// Upgrades the `WeakTaskQueue` and pushes the `runnable` into the
/// appropriate queue.
fn upgrade_and_push(&self, runnable: Runnable, notify: &NotifyHandle) {
if self.local_thread.valid() {
// It's ok to drop the runnable on the same thread.
if let Some(local_queue) = self.local_queue.upgrade() {
// SAFETY: already checked
unsafe { local_queue.get_unchecked() }.push(runnable);
#[cfg(feature = "notify-always")]
notify.notify().ok();
}
} else if let Some(sync_queue) = self.sync_queue.upgrade() {
sync_queue.push(runnable);
notify.notify().ok();
} else {
// We have to leak the runnable since it's not safe to drop it on another
// thread.
std::mem::forget(runnable);
}
}
}
/// A scheduler for managing and executing tasks.
pub(crate) struct Scheduler {
/// Queue for scheduled tasks.
task_queue: TaskQueue,
/// `Waker` of active tasks.
active_tasks: Rc<RefCell<Slab<Waker>>>,
/// Number of scheduler ticks for each `run` invocation.
event_interval: usize,
/// Makes this type `!Send` and `!Sync`.
_local_marker: PhantomData<*const ()>,
}
impl Scheduler {
/// Creates a new `Scheduler`.
pub(crate) fn new(event_interval: usize) -> Self {
Self {
task_queue: TaskQueue::new(),
active_tasks: Rc::new(RefCell::new(Slab::new())),
event_interval,
_local_marker: PhantomData,
}
}
/// Spawns a new asynchronous task, returning a [`Task`] for it.
///
/// # Safety
///
/// The caller should ensure the captured lifetime long enough.
pub(crate) unsafe fn spawn_unchecked<F>(
&self,
future: F,
notify: NotifyHandle,
) -> Task<F::Output>
where
F: Future,
{
let mut active_tasks = self.active_tasks.borrow_mut();
let task_entry = active_tasks.vacant_entry();
let future = {
let active_tasks = self.active_tasks.clone();
let index = task_entry.key();
// Wrap the future with a drop hook to remove the waker on drop.
DropHook::new(future, move || {
active_tasks.borrow_mut().remove(index);
})
};
let schedule = {
// The schedule closure is managed by the `Waker` and may be dropped on another
// thread, so use `WeakTaskQueue` to ensure the `TaskQueue` is always dropped
// on the creator thread.
let task_queue = self.task_queue.downgrade();
move |runnable| task_queue.upgrade_and_push(runnable, ¬ify)
};
let (runnable, task) = async_task::spawn_unchecked(future, schedule);
// Store the waker.
task_entry.insert(runnable.waker());
// Schedule the task for execution.
runnable.schedule();
task
}
/// Runs the scheduled tasks.
///
/// Returns `true` if there are still tasks in the queue.
pub(crate) fn run(&self) -> bool {
for _ in 0..self.event_interval {
// SAFETY:
// This method is only called on `TaskQueue`'s creator thread
// because `Scheduler` is `!Send` and `!Sync`.
let tasks = unsafe { self.task_queue.pop() };
// Run the tasks, which will poll the futures.
//
// SAFETY:
// Since spawned tasks are not required to be `Send`, they must always be polled
// on the same thread. Because `Scheduler` is `!Send` and `!Sync`, this is safe.
match tasks {
(Some(local), Some(sync)) => {
local.run();
sync.run();
}
(Some(local), None) => {
local.run();
}
(None, Some(sync)) => {
sync.run();
}
(None, None) => break,
}
}
// SAFETY:
// This method is only called on `TaskQueue`'s creator thread
// because `Scheduler` is `!Send` and `!Sync`.
!unsafe { self.task_queue.is_empty() }
}
/// Clears all active tasks.
///
/// This method **must** be called before the scheduler is dropped.
pub(crate) fn clear(&self) {
// Wake up all active tasks, which schedules them again.
self.active_tasks
.borrow()
.iter()
.for_each(|(_, waker)| waker.wake_by_ref());
// Then drop all scheduled tasks, which drops all futures and removes
// `Waker`s from `active_tasks` by drop hooks.
//
// SAFETY:
// Since spawned tasks are not required to be `Send`, they must always be
// dropped on the same thread. Because `Scheduler` is `!Send` and
// `!Sync`, this is safe.
//
// This method is only called on `TaskQueue`'s creator thread
// because `Scheduler` is `!Send` and `!Sync`.
unsafe { self.task_queue.clear() };
}
}