-
Notifications
You must be signed in to change notification settings - Fork 121
fix(runtime): drop the runnables manually, and forget them on other threads #493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,10 @@ | ||
| use crate::runtime::scheduler::{local_queue::LocalQueue, send_wrapper::SendWrapper}; | ||
| use std::{future::Future, marker::PhantomData, sync::Arc}; | ||
|
|
||
| use async_task::{Runnable, Task}; | ||
| use compio_driver::NotifyHandle; | ||
| use crossbeam_queue::SegQueue; | ||
| use std::{future::Future, marker::PhantomData, sync::Arc}; | ||
|
|
||
| use crate::runtime::scheduler::{local_queue::LocalQueue, send_wrapper::SendWrapper}; | ||
|
|
||
| mod local_queue; | ||
| mod send_wrapper; | ||
|
|
@@ -24,8 +26,8 @@ impl TaskQueue { | |
|
|
||
| /// Pushes a `Runnable` task to the appropriate queue. | ||
| /// | ||
| /// If the current thread is the same as the creator thread, push to the local queue. | ||
| /// Otherwise, push to the sync queue. | ||
| /// If the current thread is the same as the creator thread, push to the | ||
| /// local queue. Otherwise, push to the sync queue. | ||
| fn push(&self, runnable: Runnable, notify: &NotifyHandle) { | ||
| if let Some(local_queue) = self.local_queue.get() { | ||
| local_queue.push(runnable); | ||
|
|
@@ -37,7 +39,8 @@ impl TaskQueue { | |
| } | ||
| } | ||
|
|
||
| /// Pops at most one task from each queue and returns them as `(local_task, sync_task)`. | ||
| /// Pops at most one task from each queue and returns them as `(local_task, | ||
| /// sync_task)`. | ||
| /// | ||
| /// # Safety | ||
| /// | ||
|
|
@@ -48,7 +51,8 @@ impl TaskQueue { | |
|
|
||
| let local_task = local_queue.pop(); | ||
|
|
||
| // Perform an empty check as a fast path, since `SegQueue::pop()` is more expensive. | ||
| // Perform an empty check as a fast path, since `SegQueue::pop()` is more | ||
| // expensive. | ||
| let sync_task = if self.sync_queue.is_empty() { | ||
| None | ||
| } else { | ||
|
|
@@ -105,10 +109,14 @@ impl Scheduler { | |
| // Use `Weak` to break reference cycle. | ||
| // `TaskQueue` -> `Runnable` -> `TaskQueue` | ||
| let task_queue = Arc::downgrade(&self.task_queue); | ||
| let thread_guard = SendWrapper::new(()); | ||
|
|
||
| move |runnable| { | ||
| if let Some(task_queue) = task_queue.upgrade() { | ||
| task_queue.push(runnable, ¬ify); | ||
| } else if thread_guard.get().is_none() { | ||
| // It's not safe to drop the runnable in another thread. | ||
| std::mem::forget(runnable); | ||
| } | ||
| } | ||
| }; | ||
|
|
@@ -151,4 +159,17 @@ impl Scheduler { | |
| // on `TaskQueue`'s creator thread. | ||
| !unsafe { self.task_queue.is_empty() } | ||
| } | ||
|
|
||
| pub(crate) fn clear(&self) { | ||
| loop { | ||
| // SAFETY: | ||
| // `Scheduler` is `!Send` and `!Sync`, so this method is only called | ||
| // on `TaskQueue`'s creator thread. | ||
| let tasks = unsafe { self.task_queue.pop() }; | ||
|
|
||
| if let (None, None) = tasks { | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+163
to
+174
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| #[test] | ||
| fn test_drop() { | ||
| compio_runtime::Runtime::new().unwrap().block_on(async { | ||
| compio_runtime::spawn(async { | ||
| loop { | ||
| compio_runtime::time::sleep(std::time::Duration::from_secs(1)).await; | ||
| } | ||
| }) | ||
| .detach(); | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment should explain why it's not safe to drop the runnable in another thread, and clarify that the condition
thread_guard.get().is_none()means we're on a different thread. Consider expanding: 'If we're on a different thread (thread_guard.get().is_none()) and the task queue has been dropped, we cannot safely drop the runnable because [reason], so we forget it to prevent the drop from running.'