Skip to content

Commit 2a801eb

Browse files
committed
refactor(scheduler): use Own and RawRef for TaskQueue
1 parent 78afba6 commit 2a801eb

2 files changed

Lines changed: 69 additions & 12 deletions

File tree

compio-runtime/src/runtime/scheduler/mod.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
1-
use std::{cell::RefCell, future::Future, marker::PhantomData, rc::Rc, sync::Arc, task::Waker};
1+
use std::{cell::RefCell, future::Future, marker::PhantomData, rc::Rc, task::Waker};
22

33
use async_task::{Runnable, Task};
44
use compio_driver::NotifyHandle;
55
use crossbeam_queue::SegQueue;
66
use slab::Slab;
77

88
use crate::runtime::scheduler::{
9-
drop_hook::DropHook, local_queue::LocalQueue, send_wrapper::SendWrapper,
9+
drop_hook::DropHook, local_queue::LocalQueue, raw_ref::Own, send_wrapper::SendWrapper,
1010
};
1111

1212
mod drop_hook;
1313
mod local_queue;
14+
mod raw_ref;
1415
mod send_wrapper;
1516

1617
/// A task queue consisting of a local queue and a synchronized queue.
@@ -99,7 +100,7 @@ impl TaskQueue {
99100
/// A scheduler for managing and executing tasks.
100101
pub(crate) struct Scheduler {
101102
/// Queue for scheduled tasks.
102-
task_queue: Arc<TaskQueue>,
103+
task_queue: Own<TaskQueue>,
103104

104105
/// `Waker` of active tasks.
105106
active_tasks: Rc<RefCell<Slab<Waker>>>,
@@ -115,7 +116,7 @@ impl Scheduler {
115116
/// Creates a new `Scheduler`.
116117
pub(crate) fn new(event_interval: usize) -> Self {
117118
Self {
118-
task_queue: Arc::new(TaskQueue::new()),
119+
task_queue: Own::new(TaskQueue::new()),
119120
active_tasks: Rc::new(RefCell::new(Slab::new())),
120121
event_interval,
121122
_local_marker: PhantomData,
@@ -149,16 +150,15 @@ impl Scheduler {
149150
};
150151

151152
let schedule = {
152-
// The schedule closure is managed by the `Waker` and may be dropped on another
153-
// thread, so use `Weak` to ensure the `TaskQueue` is always dropped
154-
// on the creator thread.
155-
let task_queue = Arc::downgrade(&self.task_queue);
153+
let task_queue = self.task_queue.raw_ref();
156154

157155
move |runnable| {
158-
// The `upgrade()` never fails because all tasks are dropped when the
159-
// `Scheduler` is dropped, if a `Waker` is used after that, the
160-
// schedule closure will never be called.
161-
task_queue.upgrade().unwrap().push(runnable, &notify);
156+
// SAFETY:
157+
// `Scheduler` drains and drops all `Runnable`s before it is dropped.
158+
// If this closure is still invoked and can push a `Runnable`, the
159+
// `Scheduler` is necessarily still alive, so the `task_queue` reference
160+
// obtained from `raw_ref()` is valid.
161+
unsafe { task_queue.as_ref() }.push(runnable, &notify);
162162
}
163163
};
164164

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use std::{ops::Deref, ptr::NonNull};
2+
3+
/// A value with ownership.
4+
pub(crate) struct Own<T: ?Sized>(Box<T>);
5+
6+
impl<T> Own<T> {
7+
/// Creates a new [`Own`].
8+
pub(crate) fn new(value: T) -> Self {
9+
Own(Box::new(value))
10+
}
11+
}
12+
13+
impl<T: ?Sized> Own<T> {
14+
/// Returns a [`RawRef`] to the owned value.
15+
pub(crate) fn raw_ref(&self) -> RawRef<T> {
16+
RawRef(NonNull::from(&*self.0))
17+
}
18+
}
19+
20+
impl<T: ?Sized> Deref for Own<T> {
21+
type Target = T;
22+
23+
fn deref(&self) -> &Self::Target {
24+
&self.0
25+
}
26+
}
27+
28+
/// A reference to an owned value without lifetime tracking.
29+
pub(crate) struct RawRef<T: ?Sized>(NonNull<T>);
30+
31+
impl<T: ?Sized> RawRef<T> {
32+
/// Returns a shared reference to the value.
33+
///
34+
/// # Safety
35+
///
36+
/// The caller must ensure the associated [`Own<T>`] outlives the returned
37+
/// reference.
38+
pub(crate) const unsafe fn as_ref(&self) -> &T {
39+
// SAFETY:
40+
// - The `NonNull` is created from a valid reference in `Own::raw_ref()`.
41+
// - Only shared reference is returned, so aliasing rules are not violated.
42+
// - The validity of the returned reference is ensured by the caller.
43+
unsafe { self.0.as_ref() }
44+
}
45+
}
46+
47+
/// `Sync` and `Send` implementations follow `&T`.
48+
unsafe impl<T: ?Sized + Sync> Sync for RawRef<T> {}
49+
unsafe impl<T: ?Sized + Sync> Send for RawRef<T> {}
50+
51+
impl<T: ?Sized> Clone for RawRef<T> {
52+
fn clone(&self) -> Self {
53+
*self
54+
}
55+
}
56+
57+
impl<T: ?Sized> Copy for RawRef<T> {}

0 commit comments

Comments
 (0)