Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion compio-runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "compio-runtime"
version = "0.9.4"
version = "0.9.5"
description = "High-level runtime for compio"
categories = ["asynchronous"]
keywords = ["async", "runtime"]
Expand Down
24 changes: 12 additions & 12 deletions compio-runtime/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use std::{cell::RefCell, future::Future, marker::PhantomData, rc::Rc, sync::Arc, task::Waker};
use std::{cell::RefCell, future::Future, marker::PhantomData, rc::Rc, task::Waker};

use async_task::{Runnable, Task};
use compio_driver::NotifyHandle;
use crossbeam_queue::SegQueue;
use slab::Slab;

use crate::runtime::scheduler::{
drop_hook::DropHook, local_queue::LocalQueue, send_wrapper::SendWrapper,
drop_hook::DropHook, local_queue::LocalQueue, raw_ref::Own, send_wrapper::SendWrapper,
};

mod drop_hook;
mod local_queue;
mod raw_ref;
mod send_wrapper;

/// A task queue consisting of a local queue and a synchronized queue.
Expand Down Expand Up @@ -99,7 +100,7 @@ impl TaskQueue {
/// A scheduler for managing and executing tasks.
pub(crate) struct Scheduler {
/// Queue for scheduled tasks.
task_queue: Arc<TaskQueue>,
task_queue: Own<TaskQueue>,

/// `Waker` of active tasks.
active_tasks: Rc<RefCell<Slab<Waker>>>,
Expand All @@ -115,7 +116,7 @@ impl Scheduler {
/// Creates a new `Scheduler`.
pub(crate) fn new(event_interval: usize) -> Self {
Self {
task_queue: Arc::new(TaskQueue::new()),
task_queue: Own::new(TaskQueue::new()),
active_tasks: Rc::new(RefCell::new(Slab::new())),
event_interval,
_local_marker: PhantomData,
Expand Down Expand Up @@ -149,16 +150,15 @@ impl Scheduler {
};

let schedule = {
// The schedule closure is managed by the `Waker` and may be dropped on another
// thread, so use `Weak` to ensure the `TaskQueue` is always dropped
// on the creator thread.
let task_queue = Arc::downgrade(&self.task_queue);
let task_queue = self.task_queue.raw_ref();

move |runnable| {
// The `upgrade()` never fails because all tasks are dropped when the
// `Scheduler` is dropped, if a `Waker` is used after that, the
// schedule closure will never be called.
task_queue.upgrade().unwrap().push(runnable, &notify);
// SAFETY:
// `Scheduler` drains and drops all `Runnable`s before it is dropped.
// If this closure is still invoked and can push a `Runnable`, the
// `Scheduler` is necessarily still alive, so the `task_queue` reference
// obtained from `raw_ref()` is valid.
unsafe { task_queue.as_ref() }.push(runnable, &notify);
Comment on lines +156 to +161
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The safety reasoning here has a potential soundness issue. While the comment states that "Scheduler drains and drops all Runnables before it is dropped", this doesn't account for Wakers that may have been cloned and stored outside of the scheduler's control (e.g., in user code or external futures).

If a Waker is cloned and stored elsewhere, then used after the Runtime/Scheduler is dropped, the schedule closure would dereference the dangling RawRef, causing undefined behavior. The previous implementation using Arc::downgrade() and upgrade() was safer because it would fail gracefully (panic on unwrap) rather than cause UB.

Consider either:

  1. Documenting that Wakers must not outlive the Runtime/Scheduler, or
  2. Adding runtime checks to detect use-after-drop, or
  3. Reverting to a reference-counted approach with better drop ordering to avoid the SendWrapper issue

Copilot uses AI. Check for mistakes.
Comment on lines +157 to +161
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this closure is still invoked and can push a Runnable, the Scheduler is necessarily still alive

How about the scheduler is being dropped while the waker is kept on another thread? Scheduler::drop and Waker::wake could be executed simultaneously.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a Waker::wake() is called from another thread and the schedule closure is successfully invoked, then the Scheduler cannot be in the process of being dropped. The latest possible state is that Scheduler::clear() is running (the Scheduler’s documentation requires calling Scheduler::clear() before drop, and the Scheduler must always be dropped with an empty TaskQueue). Once clear() finishes, any Waker::wake() becomes a no-op, and the schedule closure will no longer be invoked.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no such guarantee. Here's a possible case:

  1. Waker::wake() is called and enters the schedule closure.
  2. TaskQueue::push enters.
  3. The runtime is going to drop, and Scheduler::clear().
  4. The runtime is dropped, so is the task queue.
  5. TaskQueue::push tries to push a runnable. BOOM.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There’s an important detail that was overlooked. In Scheduler::clear(), it first explicitly wakes all the Wakers in active_tasks. So the latest state at which a Waker from another thread can invoke the schedule closure is before Scheduler::clear() wakes the corresponding Waker in active_tasks (async-task avoids duplicate scheduling).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the given case says that the waker is waked before Scheduler::clear. There's no duplicate scheduling.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain about the memory leaks? Especially in #513 .

At least memory leaks are safe and sound. Or we might need to rethink about writing our own Task.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean by “memory leak” refers to the case where the closure captures an Arc instead of a Weak, because it seems that using Weak has a chance of failing to upgrade if the closure can run at the same time as the runtime drop.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weak is atomic so the upgrade is always safe. But yes, if it fails, the only safe solution is to panic, I think...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that’s the choice right now: a memory leak or a panic. As a temporary workaround, the small chance of a memory leak is still the better option. Later we can look for a cleaner solution (maybe rewrite the task).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's focus on #513, then. I'll push a commit to forget instead of panic.

}
};

Expand Down
57 changes: 57 additions & 0 deletions compio-runtime/src/runtime/scheduler/raw_ref.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::{ops::Deref, ptr::NonNull};

/// A value with ownership.
pub(crate) struct Own<T: ?Sized>(Box<T>);

impl<T> Own<T> {
/// Creates a new [`Own`].
pub(crate) fn new(value: T) -> Self {
Own(Box::new(value))
}
}

impl<T: ?Sized> Own<T> {
/// Returns a [`RawRef`] to the owned value.
pub(crate) fn raw_ref(&self) -> RawRef<T> {
RawRef(NonNull::from(&*self.0))
}
}

impl<T: ?Sized> Deref for Own<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.0
}
}

/// A reference to an owned value without lifetime tracking.
pub(crate) struct RawRef<T: ?Sized>(NonNull<T>);

impl<T: ?Sized> RawRef<T> {
/// Returns a shared reference to the value.
///
/// # Safety
///
/// The caller must ensure the associated [`Own<T>`] outlives the returned
/// reference.
pub(crate) const unsafe fn as_ref(&self) -> &T {
// SAFETY:
// - The `NonNull` is created from a valid reference in `Own::raw_ref()`.
// - Only shared reference is returned, so aliasing rules are not violated.
// - The validity of the returned reference is ensured by the caller.
unsafe { self.0.as_ref() }
}
}

/// `Sync` and `Send` implementations follow `&T`.
unsafe impl<T: ?Sized + Sync> Sync for RawRef<T> {}
unsafe impl<T: ?Sized + Sync> Send for RawRef<T> {}

impl<T: ?Sized> Clone for RawRef<T> {
fn clone(&self) -> Self {
*self
}
}

impl<T: ?Sized> Copy for RawRef<T> {}
Loading