diff --git a/core/Cargo.toml b/core/Cargo.toml index b4621b78..84ee1c7f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -20,6 +20,7 @@ phf = { version = "0.13", optional = true } indexmap = { version = "2", optional = true } either = { version = "1", optional = true } async-lock = { version = "3", optional = true, default-features = false } +parking_lot = { version = "0.12", optional = true } chrono = { version = "0.4", optional = true } dlopen2 = { version = "0.8", optional = true } relative-path = { version = "2.0", optional = true, default-features = false, features = [ @@ -57,7 +58,7 @@ chrono = ["dep:chrono"] bindgen = ["rquickjs-sys/bindgen"] # Enable support of parallel execution -parallel = ["std", "tokio/rt-multi-thread"] +parallel = ["std", "tokio/rt-multi-thread", "parking_lot"] # Enable user-defined module loader support loader = ["relative-path"] diff --git a/core/src/context/async.rs b/core/src/context/async.rs index 0fda4aef..0bff66ef 100644 --- a/core/src/context/async.rs +++ b/core/src/context/async.rs @@ -91,8 +91,7 @@ macro_rules! async_with{ impl DropContext for AsyncRuntime { unsafe fn drop_context(&self, ctx: NonNull) { - //TODO - let guard = match self.inner.try_lock() { + let guard = match self.try_lock() { Some(x) => x, None => { #[cfg(not(feature = "parallel"))] @@ -100,10 +99,6 @@ impl DropContext for AsyncRuntime { let p = unsafe { &mut *(ctx.as_ptr() as *mut crate::context::ctx::RefCountHeader) }; if p.ref_count <= 1 { - // Lock was poisoned, this should only happen on a panic. - // We should still free the context. - // TODO see if there is a way to recover from a panic which could cause the - // following assertion to trigger #[cfg(feature = "std")] assert!(std::thread::panicking()); } @@ -121,7 +116,6 @@ impl DropContext for AsyncRuntime { }; guard.runtime.update_stack_top(); unsafe { qjs::JS_FreeContext(ctx.as_ptr()) } - // Explicitly drop the guard to ensure it is valid during the entire use of runtime mem::drop(guard); } } @@ -145,6 +139,7 @@ impl AsyncContext { } /// Creates a base context with only the required functions registered. + /// /// If additional functions are required use [`AsyncContext::custom`], /// [`AsyncContext::builder`] or [`AsyncContext::full`]. pub async fn base(runtime: &AsyncRuntime) -> Result { @@ -152,10 +147,11 @@ impl AsyncContext { } /// Creates a context with only the required intrinsics registered. + /// /// If additional functions are required use [`AsyncContext::custom`], /// [`AsyncContext::builder`] or [`AsyncContext::full`]. pub async fn custom(runtime: &AsyncRuntime) -> Result { - let guard = runtime.inner.lock().await; + let guard = runtime.lock().await; let ctx = NonNull::new(unsafe { qjs::JS_NewContextRaw(guard.runtime.rt.as_ptr()) }) .ok_or(Error::Allocation)?; unsafe { qjs::JS_AddIntrinsicBaseObjects(ctx.as_ptr()) }; @@ -168,14 +164,14 @@ impl AsyncContext { } /// Creates a context with all standard available intrinsics registered. + /// /// If precise control is required of which functions are available use /// [`AsyncContext::custom`] or [`AsyncContext::builder`]. pub async fn full(runtime: &AsyncRuntime) -> Result { - let guard = runtime.inner.lock().await; + let guard = runtime.lock().await; let ctx = NonNull::new(unsafe { qjs::JS_NewContext(guard.runtime.rt.as_ptr()) }) .ok_or(Error::Allocation)?; let res = unsafe { ContextOwner::new(ctx, runtime.clone()) }; - // Explicitly drop the guard to ensure it is valid during the entire use of runtime guard.drop_pending(); mem::drop(guard); @@ -218,7 +214,7 @@ impl AsyncContext { F: for<'js> FnOnce(Ctx<'js>) -> R + ParallelSend, R: ParallelSend, { - let guard = self.0.rt().inner.lock().await; + let guard = self.0.rt().lock().await; guard.runtime.update_stack_top(); let ctx = unsafe { Ctx::new_async(self) }; let res = f(ctx); diff --git a/core/src/context/async/future.rs b/core/src/context/async/future.rs index 7b5706f9..0ba4df89 100644 --- a/core/src/context/async/future.rs +++ b/core/src/context/async/future.rs @@ -1,43 +1,23 @@ use alloc::boxed::Box; use core::{ future::Future, - mem::{self, ManuallyDrop}, + mem, pin::Pin, - task::{ready, Context, Poll}, + task::{Context, Poll}, }; -use async_lock::futures::Lock; - -use crate::{ - markers::ParallelSend, - runtime::{schedular::SchedularPoll, InnerRuntime}, - AsyncContext, Ctx, -}; +use crate::{markers::ParallelSend, runtime::task_queue::TaskPoll, AsyncContext, Ctx}; pub struct WithFuture<'a, F, R> { context: &'a AsyncContext, - lock_state: LockState<'a>, state: WithFutureState<'a, F, R>, } -enum LockState<'a> { - Initial, - Pending(ManuallyDrop>), -} - -impl<'a> Drop for LockState<'a> { - fn drop(&mut self) { - if let LockState::Pending(ref mut x) = self { - unsafe { ManuallyDrop::drop(x) } - } - } -} - enum WithFutureState<'a, F, R> { Initial { closure: F, }, - FutureCreated { + Running { future: Pin + 'a + Send>>, }, Done, @@ -51,7 +31,6 @@ where pub fn new(context: &'a AsyncContext, f: F) -> Self { Self { context, - lock_state: LockState::Initial, state: WithFutureState::Initial { closure: f }, } } @@ -63,98 +42,53 @@ where R: ParallelSend + 'static, { type Output = R; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Implementation ensures we don't break pin guarantees. let this = unsafe { self.get_unchecked_mut() }; - let mut lock = loop { - // We don't move the lock_state as long as it is pending - if let LockState::Pending(ref mut fut) = &mut this.lock_state { - // SAFETY: Sound as we don't move future while it is pending. - let pin = unsafe { Pin::new_unchecked(&mut **fut) }; - let lock = ready!(pin.poll(cx)); - // at this point we have acquired a lock, so we will now drop the future allowing - // us to reused the memory space. - unsafe { ManuallyDrop::drop(fut) }; - // The pinned memory is dropped so now we can freely move into it. - this.lock_state = LockState::Initial; - break lock; - } else { - // we assign a state with manually drop so we can drop the value when we need to - // replace it. - // Assign - this.lock_state = - LockState::Pending(ManuallyDrop::new(this.context.0.rt().inner.lock())); - } + // Try to get lock - yields to executor if unavailable + let Some(mut lock) = this.context.0.rt().try_lock() else { + cx.waker().wake_by_ref(); + return Poll::Pending; }; lock.runtime.update_stack_top(); - // At this point we have locked the runtime so we start running the actual future - // we can move this memory since the future is boxed and thus movable. + // Create or get the future let mut future = match mem::replace(&mut this.state, WithFutureState::Done) { WithFutureState::Initial { closure } => { - // SAFETY: we have a lock, so creating this ctx is save. let ctx = unsafe { Ctx::new_async(this.context) }; Box::pin(closure(ctx)) } - WithFutureState::FutureCreated { future } => future, - // The future was called an additional time, - // We don't have anything valid to do here so just panic. - WithFutureState::Done => panic!("With future called after it returned"), + WithFutureState::Running { future } => future, + WithFutureState::Done => panic!("WithFuture polled after completion"), }; - let res = loop { - let mut made_progress = false; - + // Poll the future and spawned tasks + loop { if let Poll::Ready(x) = future.as_mut().poll(cx) { - break Poll::Ready(x); - }; + return Poll::Ready(x); + } - let opaque = lock.runtime.get_opaque(); - match opaque.poll(cx) { - SchedularPoll::Empty => { - // if the schedular is empty that means the future is waiting on an external or - // on a promise. - } - SchedularPoll::ShouldYield => { - this.state = WithFutureState::FutureCreated { future }; - return Poll::Pending; - } - SchedularPoll::Pending => { - // we couldn't drive any futures so we should run some jobs to see we can get - // some progress. - } - SchedularPoll::PendingProgress => { - // We did make some progress so the root future might not be blocked, but it is - // probably still a good idea to run some jobs as most futures first require a - // single job to run before unblocking. - made_progress = true; - } - }; + let mut made_progress = false; + + if lock.runtime.get_opaque().poll(cx) == TaskPoll::Progress { + made_progress = true; + } loop { match lock.runtime.execute_pending_job() { Ok(false) => break, Ok(true) => made_progress = true, - Err(_ctx) => { - // TODO figure out what to do with a job error. - made_progress = true; - } + Err(_) => made_progress = true, } } - // If no work could be done we should yield back. if !made_progress { - this.state = WithFutureState::FutureCreated { future }; - break Poll::Pending; + this.state = WithFutureState::Running { future }; + return Poll::Pending; } - }; - - // Manually drop the lock so it isn't accidentally moved into somewhere. - mem::drop(lock); - - res + } } } diff --git a/core/src/context/ctx.rs b/core/src/context/ctx.rs index 570fc4a3..2846edd4 100644 --- a/core/src/context/ctx.rs +++ b/core/src/context/ctx.rs @@ -403,7 +403,8 @@ impl<'js> Ctx<'js> { Opaque::from_runtime_ptr(qjs::JS_GetRuntime(self.ctx.as_ptr())) } - /// Spawn future using configured async runtime + /// Spawn future on QuickJS's task queue (same thread as JS). + /// Use this when the future needs to access JS values. #[cfg(feature = "futures")] #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))] pub fn spawn(&self, future: F) @@ -413,6 +414,67 @@ impl<'js> Ctx<'js> { unsafe { self.get_opaque().push(future) } } + /// Execute pending JS jobs and poll spawned futures once. + #[cfg(feature = "futures")] + fn poll_once(&self, cx: &mut core::task::Context) -> bool { + let mut did_work = false; + + // Execute JS pending jobs + while self.execute_pending_job() { + did_work = true; + } + + // Poll spawned futures + let opaque = unsafe { self.get_opaque() }; + match opaque.poll(cx) { + crate::runtime::task_queue::TaskPoll::Progress + | crate::runtime::task_queue::TaskPoll::Done => did_work = true, + _ => {} + } + + did_work + } + + /// Await a promise, blocking until resolved while driving QuickJS jobs. + /// + /// The `yield_fn` is called **only when idle** (no JS jobs or futures made progress) + /// to let the async runtime process timers and I/O. + /// + /// For tokio multi-threaded runtime: + /// ```ignore + /// ctx.await_promise(promise, || { + /// tokio::task::block_in_place(|| { + /// tokio::runtime::Handle::current().block_on(tokio::task::yield_now()) + /// }) + /// }) + /// ``` + #[cfg(feature = "futures")] + #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))] + pub fn await_promise>( + &self, + promise: Promise<'js>, + yield_fn: impl Fn(), + ) -> Result { + use crate::value::promise::PromiseState; + use core::task::{Context, Waker}; + + let waker = Waker::noop(); + let mut cx = Context::from_waker(waker); + + loop { + match promise.state() { + PromiseState::Resolved => return promise.result().unwrap(), + PromiseState::Rejected => return promise.result().unwrap(), + PromiseState::Pending => { + // Only yield if no work was done - avoids expensive runtime calls + if !self.poll_once(&mut cx) { + yield_fn(); + } + } + } + } + } + /// Create a new `Ctx` from a pointer to the context and a invariant lifetime. /// /// # Safety diff --git a/core/src/runtime.rs b/core/src/runtime.rs index dabe4dac..be8b04b9 100644 --- a/core/src/runtime.rs +++ b/core/src/runtime.rs @@ -8,18 +8,16 @@ mod userdata; #[cfg(feature = "futures")] mod r#async; #[cfg(feature = "futures")] -pub(crate) mod schedular; -#[cfg(feature = "futures")] mod spawner; #[cfg(feature = "futures")] +pub(crate) mod task_queue; +#[cfg(feature = "futures")] pub use spawner::DriveFuture; use alloc::boxed::Box; pub use base::{Runtime, WeakRuntime}; pub use userdata::{UserDataError, UserDataGuard}; -#[cfg(feature = "futures")] -pub(crate) use r#async::InnerRuntime; #[cfg(feature = "futures")] pub use r#async::{AsyncRuntime, AsyncWeakRuntime}; diff --git a/core/src/runtime/async.rs b/core/src/runtime/async.rs index 45f6e22b..7df98dd4 100644 --- a/core/src/runtime/async.rs +++ b/core/src/runtime/async.rs @@ -1,32 +1,50 @@ -use alloc::{ - ffi::CString, - sync::{Arc, Weak}, - vec::Vec, -}; +use alloc::{ffi::CString, vec::Vec}; use core::{ptr::NonNull, result::Result as StdResult, task::Poll}; -#[cfg(feature = "std")] -use std::println; +#[cfg(feature = "parallel")] +use alloc::sync::Arc; +#[cfg(feature = "parallel")] +use async_lock::Mutex; #[cfg(feature = "parallel")] use std::sync::mpsc::{self, Receiver, Sender}; -use async_lock::Mutex; +#[cfg(not(feature = "parallel"))] +use alloc::rc::Rc; +#[cfg(not(feature = "parallel"))] +use core::cell::RefCell; use super::{ - opaque::Opaque, raw::RawRuntime, schedular::SchedularPoll, spawner::DriveFuture, - InterruptHandler, MemoryUsage, PromiseHook, RejectionTracker, + opaque::Opaque, raw::RawRuntime, spawner::DriveFuture, task_queue::TaskPoll, InterruptHandler, + MemoryUsage, PromiseHook, RejectionTracker, }; use crate::allocator::Allocator; #[cfg(feature = "loader")] use crate::loader::{Loader, Resolver}; -use crate::{ - context::AsyncContext, result::AsyncJobException, util::ManualPoll, Ctx, Exception, Result, -}; #[cfg(feature = "parallel")] -use crate::{ - qjs, - util::{AssertSendFuture, AssertSyncFuture}, -}; +use crate::qjs; +use crate::{context::AsyncContext, result::AsyncJobException, Ctx, Result}; + +// Type aliases for lock abstraction +#[cfg(feature = "parallel")] +pub(crate) type RuntimeLock = Mutex; +#[cfg(not(feature = "parallel"))] +pub(crate) type RuntimeLock = RefCell; + +#[cfg(feature = "parallel")] +pub(crate) type RuntimeRef = Arc; +#[cfg(not(feature = "parallel"))] +pub(crate) type RuntimeRef = Rc; + +#[cfg(feature = "parallel")] +pub(crate) type RuntimeWeak = alloc::sync::Weak; +#[cfg(not(feature = "parallel"))] +pub(crate) type RuntimeWeak = alloc::rc::Weak; + +// Guard type aliases +#[cfg(feature = "parallel")] +pub(crate) type RuntimeGuard<'a, T> = async_lock::MutexGuard<'a, T>; +#[cfg(not(feature = "parallel"))] +pub(crate) type RuntimeGuard<'a, T> = core::cell::RefMut<'a, T>; #[derive(Debug)] pub(crate) struct InnerRuntime { @@ -36,6 +54,7 @@ pub(crate) struct InnerRuntime { } impl InnerRuntime { + #[inline] pub fn drop_pending(&self) { #[cfg(feature = "parallel")] while let Ok(x) = self.drop_recv.try_recv() { @@ -59,9 +78,9 @@ unsafe impl Send for InnerRuntime {} #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))] #[derive(Clone)] pub struct AsyncWeakRuntime { - inner: Weak>, + pub(crate) inner: RuntimeWeak>, #[cfg(feature = "parallel")] - drop_send: Sender>, + pub(crate) drop_send: Sender>, } impl AsyncWeakRuntime { @@ -78,22 +97,15 @@ impl AsyncWeakRuntime { #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))] #[derive(Clone)] pub struct AsyncRuntime { - // use Arc instead of Ref so we can use OwnedLock - pub(crate) inner: Arc>, + pub(crate) inner: RuntimeRef>, #[cfg(feature = "parallel")] pub(crate) drop_send: Sender>, } -// Since all functions which use runtime are behind a mutex -// sending the runtime to other threads should be fine. #[cfg(feature = "parallel")] unsafe impl Send for AsyncRuntime {} #[cfg(feature = "parallel")] unsafe impl Send for AsyncWeakRuntime {} - -// Since a global lock needs to be locked for safe use -// using runtime in a sync way should be safe as -// simultaneous accesses is synchronized behind a lock. #[cfg(feature = "parallel")] unsafe impl Sync for AsyncRuntime {} #[cfg(feature = "parallel")] @@ -106,43 +118,27 @@ impl AsyncRuntime { /// /// # Features /// *If the `"rust-alloc"` feature is enabled the Rust's global allocator will be used in favor of libc's one.* - // Annoying false positive clippy lint #[allow(clippy::arc_with_non_send_sync)] pub fn new() -> Result { - let opaque = Opaque::with_spawner(); - let runtime = unsafe { RawRuntime::new(opaque) }?; - - #[cfg(feature = "parallel")] - let (drop_send, drop_recv) = mpsc::channel(); - - Ok(Self { - inner: Arc::new(Mutex::new(InnerRuntime { - runtime, - #[cfg(feature = "parallel")] - drop_recv, - })), - #[cfg(feature = "parallel")] - drop_send, - }) + Self::new_inner(unsafe { RawRuntime::new(Opaque::with_spawner()) }?) } - /// Create a new runtime using specified allocator + /// Create a new runtime using specified allocator. /// /// Will generally only fail if not enough memory was available. - // Annoying false positive clippy lint #[allow(clippy::arc_with_non_send_sync)] - pub fn new_with_alloc(allocator: A) -> Result - where - A: Allocator + 'static, - { - let opaque = Opaque::with_spawner(); - let runtime = unsafe { RawRuntime::new_with_allocator(opaque, allocator) }?; + pub fn new_with_alloc(allocator: A) -> Result { + Self::new_inner(unsafe { + RawRuntime::new_with_allocator(Opaque::with_spawner(), allocator) + }?) + } + fn new_inner(runtime: RawRuntime) -> Result { #[cfg(feature = "parallel")] let (drop_send, drop_recv) = mpsc::channel(); Ok(Self { - inner: Arc::new(Mutex::new(InnerRuntime { + inner: RuntimeRef::new(RuntimeLock::new(InnerRuntime { runtime, #[cfg(feature = "parallel")] drop_recv, @@ -152,68 +148,76 @@ impl AsyncRuntime { }) } - /// Get weak ref to runtime + /// Get weak ref to runtime. pub fn weak(&self) -> AsyncWeakRuntime { AsyncWeakRuntime { + #[cfg(feature = "parallel")] inner: Arc::downgrade(&self.inner), + #[cfg(not(feature = "parallel"))] + inner: Rc::downgrade(&self.inner), #[cfg(feature = "parallel")] drop_send: self.drop_send.clone(), } } - /// Set a closure which is called when a Promise is rejected. - #[inline] + // Lock helpers - zero-cost for non-parallel + #[cfg(feature = "parallel")] + pub(crate) async fn lock(&self) -> RuntimeGuard<'_, InnerRuntime> { + self.inner.lock().await + } + + #[cfg(not(feature = "parallel"))] + pub(crate) async fn lock(&self) -> RuntimeGuard<'_, InnerRuntime> { + self.inner.borrow_mut() + } + + #[cfg(feature = "parallel")] + pub(crate) fn try_lock(&self) -> Option> { + self.inner.try_lock() + } + + #[cfg(not(feature = "parallel"))] + pub(crate) fn try_lock(&self) -> Option> { + self.inner.try_borrow_mut().ok() + } + + /// Set a closure which is called when a promise is rejected. pub async fn set_host_promise_rejection_tracker(&self, tracker: Option) { unsafe { - self.inner - .lock() + self.lock() .await .runtime - .set_host_promise_rejection_tracker(tracker); + .set_host_promise_rejection_tracker(tracker) } } /// Set a closure which is called when a promise is created, resolved, or chained. - #[inline] pub async fn set_promise_hook(&self, tracker: Option) { - unsafe { - self.inner.lock().await.runtime.set_promise_hook(tracker); - } + unsafe { self.lock().await.runtime.set_promise_hook(tracker) } } /// Set a closure which is regularly called by the engine when it is executing code. - /// If the provided closure returns `true` the interpreter will raise and uncatchable + /// + /// If the provided closure returns `true` the interpreter will raise an uncatchable /// exception and return control flow to the caller. - #[inline] pub async fn set_interrupt_handler(&self, handler: Option) { - unsafe { - self.inner - .lock() - .await - .runtime - .set_interrupt_handler(handler); - } + unsafe { self.lock().await.runtime.set_interrupt_handler(handler) } } - /// Set the module loader + /// Set the module loader. #[cfg(feature = "loader")] #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "loader")))] - pub async fn set_loader(&self, resolver: R, loader: L) - where - R: Resolver + 'static, - L: Loader + 'static, - { - unsafe { - self.inner.lock().await.runtime.set_loader(resolver, loader); - } + pub async fn set_loader( + &self, + resolver: R, + loader: L, + ) { + unsafe { self.lock().await.runtime.set_loader(resolver, loader) } } - /// Set the info of the runtime + /// Set the info of the runtime. pub async fn set_info>>(&self, info: S) -> Result<()> { - let string = CString::new(info)?; - unsafe { - self.inner.lock().await.runtime.set_info(string); - } + unsafe { self.lock().await.runtime.set_info(CString::new(info)?) }; Ok(()) } @@ -222,27 +226,21 @@ impl AsyncRuntime { /// Setting the limit to 0 is equivalent to unlimited memory. /// /// Note that is a Noop when a custom allocator is being used, - /// as is the case for the "rust-alloc" or "allocator" features. + /// as is the case for the `"rust-alloc"` or `"allocator"` features. pub async fn set_memory_limit(&self, limit: usize) { - unsafe { - self.inner.lock().await.runtime.set_memory_limit(limit); - } + unsafe { self.lock().await.runtime.set_memory_limit(limit) } } /// Set a limit on the max size of stack the runtime will use. /// /// The default values is 256x1024 bytes. pub async fn set_max_stack_size(&self, limit: usize) { - unsafe { - self.inner.lock().await.runtime.set_max_stack_size(limit); - } + unsafe { self.lock().await.runtime.set_max_stack_size(limit) } } /// Set a memory threshold for garbage collection. pub async fn set_gc_threshold(&self, threshold: usize) { - unsafe { - self.inner.lock().await.runtime.set_gc_threshold(threshold); - } + unsafe { self.lock().await.runtime.set_gc_threshold(threshold) } } /// Manually run the garbage collection. @@ -252,111 +250,93 @@ impl AsyncRuntime { /// references. The garbage collector is only for collecting /// cyclic references. pub async fn run_gc(&self) { - unsafe { - let mut lock = self.inner.lock().await; - lock.drop_pending(); - lock.runtime.run_gc(); - } + let mut lock = self.lock().await; + lock.drop_pending(); + unsafe { lock.runtime.run_gc() } } - /// Get memory usage stats + /// Get memory usage stats. pub async fn memory_usage(&self) -> MemoryUsage { - unsafe { self.inner.lock().await.runtime.memory_usage() } + unsafe { self.lock().await.runtime.memory_usage() } } - /// Test for pending jobs + /// Test for pending jobs. /// /// Returns true when at least one job is pending. - #[inline] pub async fn is_job_pending(&self) -> bool { - let lock = self.inner.lock().await; - + let lock = self.lock().await; lock.runtime.is_job_pending() || !lock.runtime.get_opaque().spawner_is_empty() } - /// Execute first pending job + /// Execute first pending job. /// /// Returns true when job was executed or false when queue is empty or error when exception thrown under execution. - #[inline] pub async fn execute_pending_job(&self) -> StdResult { - let mut lock = self.inner.lock().await; + let mut lock = self.lock().await; lock.runtime.update_stack_top(); lock.drop_pending(); - let f = ManualPoll::new(|cx| { - let job_res = lock.runtime.execute_pending_job().map_err(|e| { - let ptr = NonNull::new(e) - .expect("executing pending job returned a null context on error"); - AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) }) - })?; - - if job_res { - return Poll::Ready(Ok(true)); - } - - match lock.runtime.get_opaque().poll(cx) { - SchedularPoll::ShouldYield => Poll::Pending, - SchedularPoll::Empty => Poll::Ready(Ok(false)), - SchedularPoll::Pending => Poll::Ready(Ok(false)), - SchedularPoll::PendingProgress => Poll::Ready(Ok(true)), - } - }); - - #[cfg(feature = "parallel")] - let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) }; + if let Err(e) = lock.runtime.execute_pending_job() { + let ptr = NonNull::new(e).expect("null context on error"); + return Err(AsyncJobException(unsafe { + AsyncContext::from_raw(ptr, self.clone()) + })); + } - f.await + Ok(lock.runtime.is_job_pending() || !lock.runtime.get_opaque().spawner_is_empty()) } - /// Run all futures and jobs in the runtime until all are finished. - #[inline] + /// Run all futures and jobs until finished. pub async fn idle(&self) { - let mut lock = self.inner.lock().await; - lock.runtime.update_stack_top(); - lock.drop_pending(); + core::future::poll_fn(|cx| { + let Some(mut lock) = self.try_lock() else { + cx.waker().wake_by_ref(); + return Poll::Pending; + }; + + lock.runtime.update_stack_top(); + lock.drop_pending(); - let f = ManualPoll::new(|cx| { + // Run all pending JS jobs loop { - let pending = lock.runtime.execute_pending_job().map_err(|e| { - let ptr = NonNull::new(e) - .expect("executing pending job returned a null context on error"); - AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) }) - }); - match pending { + match lock.runtime.execute_pending_job() { + Ok(true) => continue, + Ok(false) => break, Err(e) => { - // SAFETY: Runtime is already locked so creating a context is safe. - let ctx = unsafe { Ctx::from_ptr(e.0 .0.ctx().as_ptr()) }; + let ctx = unsafe { Ctx::from_ptr(e) }; let err = ctx.catch(); - if let Some(_x) = err.clone().into_object().and_then(Exception::from_object) + #[cfg(feature = "std")] { - // TODO do something better with errors. - #[cfg(feature = "std")] - println!("error executing job: {}", _x); - } else { - #[cfg(feature = "std")] - println!("error executing job: {:?}", err); + use std::println; + if let Some(ex) = err + .clone() + .into_object() + .and_then(crate::Exception::from_object) + { + println!("error executing job: {}", ex); + } else { + println!("error executing job: {:?}", err); + } } + let _ = err; } - Ok(true) => continue, - Ok(false) => {} } + } - match lock.runtime.get_opaque().poll(cx) { - SchedularPoll::ShouldYield => return Poll::Pending, - SchedularPoll::Empty => return Poll::Ready(()), - SchedularPoll::Pending => return Poll::Pending, - SchedularPoll::PendingProgress => {} + match lock.runtime.get_opaque().poll(cx) { + TaskPoll::Empty | TaskPoll::Done => Poll::Ready(()), + TaskPoll::Progress => { + cx.waker().wake_by_ref(); + Poll::Pending } + TaskPoll::Pending => Poll::Pending, } - }); - - #[cfg(feature = "parallel")] - let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) }; - - f.await + }) + .await } /// Returns a future that completes when the runtime is dropped. + /// /// If the future is polled it will drive futures spawned inside the runtime completing them /// even if runtime is currently not in use. pub fn drive(&self) -> DriveFuture { @@ -371,34 +351,25 @@ macro_rules! async_test_case { fn $name() { #[cfg(feature = "parallel")] let mut new_thread = tokio::runtime::Builder::new_multi_thread(); - #[cfg(not(feature = "parallel"))] let mut new_thread = tokio::runtime::Builder::new_current_thread(); - let rt = new_thread - .enable_all() - .build() - .unwrap(); + let rt = new_thread.enable_all().build().unwrap(); #[cfg(feature = "parallel")] - { - rt.block_on(async { - let $rt = crate::AsyncRuntime::new().unwrap(); - let $ctx = crate::AsyncContext::full(&$rt).await.unwrap(); - - $($t)* - - }) - } + rt.block_on(async { + let $rt = crate::AsyncRuntime::new().unwrap(); + let $ctx = crate::AsyncContext::full(&$rt).await.unwrap(); + $($t)* + }); #[cfg(not(feature = "parallel"))] { let set = tokio::task::LocalSet::new(); set.block_on(&rt, async { let $rt = crate::AsyncRuntime::new().unwrap(); let $ctx = crate::AsyncContext::full(&$rt).await.unwrap(); - $($t)* - }) + }); } } }; @@ -406,11 +377,9 @@ macro_rules! async_test_case { #[cfg(test)] mod test { - use std::time::Duration; - - use crate::*; - use self::context::EvalOptions; + use crate::*; + use std::time::Duration; async_test_case!(basic => (_rt,ctx){ async_with!(&ctx => |ctx|{ @@ -420,11 +389,9 @@ mod test { }); async_test_case!(sleep_closure => (_rt,ctx){ - let mut a = 1; let a_ref = &mut a; - async_with!(&ctx => |ctx|{ tokio::time::sleep(Duration::from_secs_f64(0.01)).await; ctx.globals().set("foo","bar").unwrap(); @@ -441,7 +408,6 @@ mod test { #[cfg(not(feature = "parallel"))] tokio::task::spawn_local(rt.drive()); - // Give drive time to start. tokio::time::sleep(Duration::from_secs_f64(0.01)).await; let number = Arc::new(AtomicUsize::new(0)); @@ -454,10 +420,8 @@ mod test { }); }).await; assert_eq!(number.load(Ordering::SeqCst),0); - // Give drive time to finish the task. tokio::time::sleep(Duration::from_secs_f64(0.01)).await; assert_eq!(number.load(Ordering::SeqCst),1); - }); async_test_case!(no_drive => (rt,ctx){ @@ -475,7 +439,6 @@ mod test { assert_eq!(number.load(Ordering::SeqCst),0); tokio::time::sleep(Duration::from_secs_f64(0.01)).await; assert_eq!(number.load(Ordering::SeqCst),0); - }); async_test_case!(idle => (rt,ctx){ @@ -493,7 +456,6 @@ mod test { assert_eq!(number.load(Ordering::SeqCst),0); rt.idle().await; assert_eq!(number.load(Ordering::SeqCst),1); - }); async_test_case!(recursive_spawn => (rt,ctx){ @@ -505,9 +467,7 @@ mod test { let (tx2,rx2) = oneshot::channel::<()>(); ctx.spawn(async move { tokio::task::yield_now().await; - let ctx = ctx_clone.clone(); - ctx_clone.spawn(async move { tokio::task::yield_now().await; ctx.spawn(async move { @@ -518,18 +478,11 @@ mod test { tokio::task::yield_now().await; tx.send(()).unwrap(); }); - - // Add a bunch of futures just to make sure possible segfaults are more likely to - // happen - for _ in 0..32{ - ctx_clone.spawn(async move {}) - } - + for _ in 0..32 { ctx_clone.spawn(async move {}) } }); tokio::time::timeout(Duration::from_millis(500), rx).await.unwrap().unwrap(); tokio::time::timeout(Duration::from_millis(500), rx2).await.unwrap().unwrap(); }).await; - }); async_test_case!(recursive_spawn_from_script => (rt,ctx) { @@ -538,93 +491,50 @@ mod test { static COUNT: AtomicUsize = AtomicUsize::new(0); static SCRIPT: &str = r#" - async function main() { - setTimeout(() => { inc_count() - setTimeout(async () => { - inc_count() - }, 100); + setTimeout(async () => { inc_count() }, 100); }, 100); } - main().catch(print); - - "#; - fn inc_count(){ - COUNT.fetch_add(1,Ordering::Relaxed); - } + fn inc_count() { COUNT.fetch_add(1,Ordering::Relaxed); } fn set_timeout_spawn<'js>(ctx: Ctx<'js>, callback: Function<'js>, millis: usize) -> Result<()> { ctx.spawn(async move { tokio::time::sleep(Duration::from_millis(millis as u64)).await; callback.call::<_, ()>(()).unwrap(); }); - Ok(()) } - async_with!(ctx => |ctx|{ - let res: Result = (|| { let globals = ctx.globals(); - globals.set("inc_count", Func::from(inc_count))?; - globals.set("setTimeout", Func::from(set_timeout_spawn))?; - let options = EvalOptions{ - promise: true, - strict: false, - ..EvalOptions::default() - }; - - ctx.eval_with_options(SCRIPT, options)? + ctx.eval_with_options(SCRIPT, EvalOptions { promise: true, strict: false, ..Default::default() }) })(); - match res.catch(&ctx){ - Ok(promise) => { - if let Err(err) = promise.into_future::().await.catch(&ctx){ - eprintln!("{}", err) - } - }, - Err(err) => { - eprintln!("{}", err) - }, + match res.catch(&ctx) { + Ok(promise) => { let _ = promise.into_future::().await.catch(&ctx); }, + Err(err) => { #[cfg(feature = "std")] std::println!("{}", err); }, }; - - }) - .await; + }).await; rt.idle().await; - - assert_eq!(COUNT.load(Ordering::Relaxed),2); + assert_eq!(COUNT.load(Ordering::Relaxed), 2); }); - #[cfg(feature = "parallel")] - fn assert_is_send(t: T) -> T { - t - } - - #[cfg(feature = "parallel")] - fn assert_is_sync(t: T) -> T { - t - } - #[cfg(feature = "parallel")] #[tokio::test] - async fn ensure_types_are_send_sync() { + async fn ensure_types_are_send() { + fn assert_send(_: &T) {} let rt = AsyncRuntime::new().unwrap(); - - std::mem::drop(assert_is_sync(rt.idle())); - std::mem::drop(assert_is_sync(rt.execute_pending_job())); - std::mem::drop(assert_is_sync(rt.drive())); - - std::mem::drop(assert_is_send(rt.idle())); - std::mem::drop(assert_is_send(rt.execute_pending_job())); - std::mem::drop(assert_is_send(rt.drive())); + assert_send(&rt.idle()); + assert_send(&rt.execute_pending_job()); + assert_send(&rt.drive()); } } diff --git a/core/src/runtime/opaque.rs b/core/src/runtime/opaque.rs index 4716b3e7..9910457e 100644 --- a/core/src/runtime/opaque.rs +++ b/core/src/runtime/opaque.rs @@ -22,7 +22,7 @@ use std::collections::{hash_map::Entry, HashMap}; use hashbrown::{hash_map::Entry, HashMap}; #[cfg(feature = "futures")] -use super::{schedular::SchedularPoll, spawner::Spawner}; +use super::task_queue::{TaskPoll, TaskQueue}; #[cfg(feature = "futures")] use core::{ @@ -54,7 +54,7 @@ pub(crate) struct Opaque<'js> { userdata: UserDataMap, #[cfg(feature = "futures")] - spawner: Option>, + queue: Option>, _marker: PhantomData<&'js ()>, } @@ -80,14 +80,14 @@ impl<'js> Opaque<'js> { _marker: PhantomData, #[cfg(feature = "futures")] - spawner: None, + queue: None, } } #[cfg(feature = "futures")] pub fn with_spawner() -> Self { let mut this = Opaque::new(); - this.spawner = Some(UnsafeCell::new(Spawner::new())); + this.queue = Some(UnsafeCell::new(TaskQueue::new())); this } @@ -127,8 +127,8 @@ impl<'js> Opaque<'js> { } #[cfg(feature = "futures")] - fn spawner(&self) -> &UnsafeCell { - self.spawner + fn queue(&self) -> &UnsafeCell { + self.queue .as_ref() .expect("tried to use async function in non async runtime") } @@ -138,22 +138,22 @@ impl<'js> Opaque<'js> { where F: Future, { - (*self.spawner().get()).push(f) + (*self.queue().get()).push(f) } #[cfg(feature = "futures")] pub fn listen(&self, wake: Waker) { - unsafe { (*self.spawner().get()).listen(wake) }; + unsafe { (*self.queue().get()).listen(wake) }; } #[cfg(feature = "futures")] pub fn spawner_is_empty(&self) -> bool { - unsafe { (*self.spawner().get()).is_empty() } + unsafe { (*self.queue().get()).is_empty() } } #[cfg(feature = "futures")] - pub fn poll(&self, cx: &mut Context) -> SchedularPoll { - unsafe { (*self.spawner().get()).poll(cx) } + pub fn poll(&self, cx: &mut Context) -> TaskPoll { + unsafe { (*self.queue().get()).poll(cx) } } pub fn insert_userdata(&self, data: U) -> Result>, UserDataError> @@ -262,7 +262,7 @@ impl<'js> Opaque<'js> { self.panic.take(); self.prototypes.get_mut().clear(); #[cfg(feature = "futures")] - self.spawner.take(); + self.queue.take(); self.userdata.clear() } } diff --git a/core/src/runtime/schedular.rs b/core/src/runtime/schedular.rs deleted file mode 100644 index 15b314a0..00000000 --- a/core/src/runtime/schedular.rs +++ /dev/null @@ -1,240 +0,0 @@ -use alloc::sync::Arc; -use core::{ - cell::Cell, - future::Future, - mem::offset_of, - pin::Pin, - sync::atomic::Ordering, - task::{Context, Poll}, -}; - -mod atomic_waker; -mod queue; -mod task; -mod vtable; -mod waker; - -use crate::{ - runtime::schedular::task::{ErasedTask, Task}, - util::Defer, -}; -use queue::Queue; - -use self::task::ErasedTaskPtr; - -#[derive(Debug)] -pub enum SchedularPoll { - /// Returns that the schedular should yield back to the root schedular. - ShouldYield, - /// There was no work to be done. - Empty, - /// No work could be done. - Pending, - /// Work was done, but we didn't finish. - PendingProgress, -} - -pub struct Schedular { - len: Cell, - should_poll: Arc, - all_next: Cell>, - all_prev: Cell>, -} - -impl Schedular { - /// Create a new schedular. - pub fn new() -> Self { - let queue = Arc::new(Queue::new()); - unsafe { - Pin::new_unchecked(&*queue).init(); - } - Schedular { - len: Cell::new(0), - should_poll: queue, - all_prev: Cell::new(None), - all_next: Cell::new(None), - } - } - - /// Returns if there are no pending tasks. - pub fn is_empty(&self) -> bool { - self.all_next.get().is_none() - } - - /// # Safety - /// This function erases any lifetime associated with the future. - /// Caller must ensure that either the future completes or is dropped before the lifetime - pub unsafe fn push(&self, f: F) - where - F: Future, - { - let queue = Arc::downgrade(&self.should_poll); - - // These should always be the same as task has a repr(C); - assert_eq!(offset_of!(Task, head), offset_of!(Task, head)); - assert_eq!(offset_of!(Task, body), offset_of!(Task, body)); - - let task = Arc::new(Task::new(queue, f)); - - // One count for the all list and one for the should_poll list. - let task = ErasedTask::new(task); - self.push_task_to_all(task.clone()); - - let task_ptr = ErasedTask::into_ptr(task); - Pin::new_unchecked(&*self.should_poll).push(task_ptr.as_node_ptr()); - self.len.set(self.len.get() + 1); - } - - /// Add a new task to the all task list. - /// The all task list owns a reference to the task while it is in the list. - unsafe fn push_task_to_all(&self, task: ErasedTask) { - let task = ErasedTask::into_ptr(task); - - task.body().next.set(self.all_next.get()); - - if let Some(x) = self.all_next.get() { - x.body().prev.set(Some(task)); - } - self.all_next.set(Some(task)); - if self.all_prev.get().is_none() { - self.all_prev.set(Some(task)); - } - } - - /// Removes the task from the all task list. - /// Dropping the ownership the list has. - unsafe fn pop_task_all(&self, task: ErasedTaskPtr) { - task.body().queued.store(true, Ordering::Release); - if !task.body().done.replace(true) { - task.task_drop(); - } - - // detach the task from the all list - if let Some(next) = task.body().next.get() { - next.body().prev.set(task.body().prev.get()) - } else { - self.all_prev.set(task.body().prev.get()); - } - if let Some(prev) = task.body().prev.get() { - prev.body().next.set(task.body().next.get()) - } else { - self.all_next.set(task.body().next.get()); - } - - let _ = unsafe { ErasedTask::from_ptr(task) }; - // drop the ownership of the all list, - // Task is now dropped or only owned by wakers or - self.len.set(self.len.get() - 1); - } - - pub unsafe fn poll(&self, cx: &mut Context) -> SchedularPoll { - // A task it's ownership is shared among a number of different places. - // - The all-task list - // - One or multiple wakers - // - The should_poll list if scheduled. - // - // When a task is retrieved from the should_poll list we transfer it's arc count to a - // waker. When a waker is cloned it also increments the arc count. If the waker is then - // woken up the count is transfered back to the should_poll list. - - if self.is_empty() { - // No tasks, nothing to be done. - return SchedularPoll::Empty; - } - - self.should_poll.waker().register(cx.waker()); - - let mut iteration = 0; - let mut yielded = 0; - - loop { - // Popped a task, ownership taken from the queue - let cur = match Pin::new_unchecked(&*self.should_poll).pop() { - queue::Pop::Empty => { - if iteration > 0 { - return SchedularPoll::PendingProgress; - } else { - return SchedularPoll::Pending; - } - } - queue::Pop::Value(x) => x, - queue::Pop::Inconsistant => { - cx.waker().wake_by_ref(); - return SchedularPoll::ShouldYield; - } - }; - - // Take ownership of the task from the schedular. - let cur_ptr = ErasedTaskPtr::from_nonnull(cur.cast()); - let cur = ErasedTask::from_ptr(cur_ptr); - - if cur.body().done.get() { - continue; - } - - let prev = cur.body().queued.swap(false, Ordering::AcqRel); - assert!(prev); - - // wakers owns the arc count of cur now until the end of the scope. - // So we can use cur_ptr until the end of the scope waker is only dropped then. - let waker = waker::get(cur); - let mut ctx = Context::from_waker(&waker); - - // if drive_task panics we still want to remove the task from the list. - // So handle it with a drop - let remove = Defer::new((), |_| self.pop_task_all(cur_ptr)); - - iteration += 1; - - match cur_ptr.task_drive(&mut ctx) { - Poll::Ready(_) => { - // Nothing todo the defer will remove the task from the list. - } - Poll::Pending => { - // don't remove task from the list. - remove.take(); - - // we had a pending and test if a yielded future immediatily queued itself - // again. - yielded += cur_ptr.body().queued.load(Ordering::Relaxed) as usize; - - // If we polled all the futures atleas once, - // or more then one future immediatily queued itself after being polled, - // yield back to the parent schedular. - if yielded > 2 || iteration > self.len.get() { - cx.waker().wake_by_ref(); - return SchedularPoll::ShouldYield; - } - } - } - } - } - - /// Remove all tasks from the list. - pub fn clear(&self) { - // Clear all pending futures from the all list - while let Some(c) = self.all_next.get() { - unsafe { self.pop_task_all(c) } - } - - loop { - let cur = match unsafe { Pin::new_unchecked(&*self.should_poll).pop() } { - queue::Pop::Empty => break, - queue::Pop::Value(x) => x, - queue::Pop::Inconsistant => { - #[cfg(feature = "std")] - std::thread::yield_now(); - continue; - } - }; - - unsafe { ErasedTask::from_ptr(ErasedTaskPtr::from_nonnull(cur.cast())) }; - } - } -} - -impl Drop for Schedular { - fn drop(&mut self) { - self.clear() - } -} diff --git a/core/src/runtime/schedular/atomic_waker.rs b/core/src/runtime/schedular/atomic_waker.rs deleted file mode 100644 index afda460a..00000000 --- a/core/src/runtime/schedular/atomic_waker.rs +++ /dev/null @@ -1,335 +0,0 @@ -//! The atomic waker from the futures crate pulled into its own file. -//! This is done to avoid having to pull in the entire futures crate just for a single struct. -//! -//! All copyright of this file belongs to the futures authors. - -use atomic::{ - AtomicUsize, - Ordering::{AcqRel, Acquire, Release}, -}; -use core::{cell::UnsafeCell, fmt, sync::atomic, task::Waker}; - -/// A synchronization primitive for task wakeup. -/// -/// Sometimes the task interested in a given event will change over time. -/// An `AtomicWaker` can coordinate concurrent notifications with the consumer -/// potentially "updating" the underlying task to wake up. This is useful in -/// scenarios where a computation completes in another thread and wants to -/// notify the consumer, but the consumer is in the process of being migrated to -/// a new logical task. -/// -/// Consumers should call `register` before checking the result of a computation -/// and producers should call `wake` after producing the computation (this -/// differs from the usual `thread::park` pattern). It is also permitted for -/// `wake` to be called **before** `register`. This results in a no-op. -/// -/// A single `AtomicWaker` may be reused for any number of calls to `register` or -/// `wake`. -/// -/// # Memory ordering -/// -/// Calling `register` "acquires" all memory "released" by calls to `wake` -/// before the call to `register`. Later calls to `wake` will wake the -/// registered waker (on contention this wake might be triggered in `register`). -/// -/// For concurrent calls to `register` (should be avoided) the ordering is only -/// guaranteed for the winning call. -pub struct AtomicWaker { - state: AtomicUsize, - waker: UnsafeCell>, -} - -// `AtomicWaker` is a multi-consumer, single-producer transfer cell. The cell -// stores a `Waker` value produced by calls to `register` and many threads can -// race to take the waker (to wake it) by calling `wake`. -// -// If a new `Waker` instance is produced by calling `register` before an -// existing one is consumed, then the existing one is overwritten. -// -// While `AtomicWaker` is single-producer, the implementation ensures memory -// safety. In the event of concurrent calls to `register`, there will be a -// single winner whose waker will get stored in the cell. The losers will not -// have their tasks woken. As such, callers should ensure to add synchronization -// to calls to `register`. -// -// The implementation uses a single `AtomicUsize` value to coordinate access to -// the `Waker` cell. There are two bits that are operated on independently. -// These are represented by `REGISTERING` and `WAKING`. -// -// The `REGISTERING` bit is set when a producer enters the critical section. The -// `WAKING` bit is set when a consumer enters the critical section. Neither bit -// being set is represented by `WAITING`. -// -// A thread obtains an exclusive lock on the waker cell by transitioning the -// state from `WAITING` to `REGISTERING` or `WAKING`, depending on the operation -// the thread wishes to perform. When this transition is made, it is guaranteed -// that no other thread will access the waker cell. -// -// # Registering -// -// On a call to `register`, an attempt to transition the state from WAITING to -// REGISTERING is made. On success, the caller obtains a lock on the waker cell. -// -// If the lock is obtained, then the thread sets the waker cell to the waker -// provided as an argument. Then it attempts to transition the state back from -// `REGISTERING` -> `WAITING`. -// -// If this transition is successful, then the registering process is complete -// and the next call to `wake` will observe the waker. -// -// If the transition fails, then there was a concurrent call to `wake` that was -// unable to access the waker cell (due to the registering thread holding the -// lock). To handle this, the registering thread removes the waker it just set -// from the cell and calls `wake` on it. This call to wake represents the -// attempt to wake by the other thread (that set the `WAKING` bit). The state is -// then transitioned from `REGISTERING | WAKING` back to `WAITING`. This -// transition must succeed because, at this point, the state cannot be -// transitioned by another thread. -// -// # Waking -// -// On a call to `wake`, an attempt to transition the state from `WAITING` to -// `WAKING` is made. On success, the caller obtains a lock on the waker cell. -// -// If the lock is obtained, then the thread takes ownership of the current value -// in the waker cell, and calls `wake` on it. The state is then transitioned -// back to `WAITING`. This transition must succeed as, at this point, the state -// cannot be transitioned by another thread. -// -// If the thread is unable to obtain the lock, the `WAKING` bit is still. This -// is because it has either been set by the current thread but the previous -// value included the `REGISTERING` bit **or** a concurrent thread is in the -// `WAKING` critical section. Either way, no action must be taken. -// -// If the current thread is the only concurrent call to `wake` and another -// thread is in the `register` critical section, when the other thread **exits** -// the `register` critical section, it will observe the `WAKING` bit and handle -// the wake itself. -// -// If another thread is in the `wake` critical section, then it will handle -// waking the task. -// -// # A potential race (is safely handled). -// -// Imagine the following situation: -// -// * Thread A obtains the `wake` lock and wakes a task. -// -// * Before thread A releases the `wake` lock, the woken task is scheduled. -// -// * Thread B attempts to wake the task. In theory this should result in the -// task being woken, but it cannot because thread A still holds the wake lock. -// -// This case is handled by requiring users of `AtomicWaker` to call `register` -// **before** attempting to observe the application state change that resulted -// in the task being awoken. The wakers also change the application state before -// calling wake. -// -// Because of this, the waker will do one of two things. -// -// 1) Observe the application state change that Thread B is woken for. In this -// case, it is OK for Thread B's wake to be lost. -// -// 2) Call register before attempting to observe the application state. Since -// Thread A still holds the `wake` lock, the call to `register` will result -// in the task waking itself and get scheduled again. - -/// Idle state -const WAITING: usize = 0; - -/// A new waker value is being registered with the `AtomicWaker` cell. -const REGISTERING: usize = 0b01; - -/// The waker currently registered with the `AtomicWaker` cell is being woken. -const WAKING: usize = 0b10; - -impl AtomicWaker { - /// Create an `AtomicWaker`. - pub const fn new() -> Self { - // Make sure that task is Sync - #[allow(dead_code)] - trait AssertSync: Sync {} - impl AssertSync for Waker {} - - Self { - state: AtomicUsize::new(WAITING), - waker: UnsafeCell::new(None), - } - } - - /// Registers the waker to be notified on calls to `wake`. - /// - /// The new task will take place of any previous tasks that were registered - /// by previous calls to `register`. Any calls to `wake` that happen after - /// a call to `register` (as defined by the memory ordering rules), will - /// notify the `register` caller's task and deregister the waker from future - /// notifications. Because of this, callers should ensure `register` gets - /// invoked with a new `Waker` **each** time they require a wakeup. - /// - /// It is safe to call `register` with multiple other threads concurrently - /// calling `wake`. This will result in the `register` caller's current - /// task being notified once. - /// - /// This function is safe to call concurrently, but this is generally a bad - /// idea. Concurrent calls to `register` will attempt to register different - /// tasks to be notified. One of the callers will win and have its task set, - /// but there is no guarantee as to which caller will succeed. - pub fn register(&self, waker: &Waker) { - match self - .state - .compare_exchange(WAITING, REGISTERING, Acquire, Acquire) - .unwrap_or_else(|x| x) - { - WAITING => { - unsafe { - // Locked acquired, update the waker cell - - // Avoid cloning the waker if the old waker will awaken the same task. - match &*self.waker.get() { - Some(old_waker) if old_waker.will_wake(waker) => (), - _ => *self.waker.get() = Some(waker.clone()), - } - - // Release the lock. If the state transitioned to include - // the `WAKING` bit, this means that at least one wake has - // been called concurrently. - // - // Start by assuming that the state is `REGISTERING` as this - // is what we just set it to. If this holds, we know that no - // other writes were performed in the meantime, so there is - // nothing to acquire, only release. In case of concurrent - // wakers, we need to acquire their releases, so success needs - // to do both. - let res = self - .state - .compare_exchange(REGISTERING, WAITING, AcqRel, Acquire); - - match res { - Ok(_) => { - // memory ordering: acquired self.state during CAS - // - if previous wakes went through it syncs with - // their final release (`fetch_and`) - // - if there was no previous wake the next wake - // will wake us, no sync needed. - } - Err(actual) => { - // This branch can only be reached if at least one - // concurrent thread called `wake`. In this - // case, `actual` **must** be `REGISTERING | - // `WAKING`. - debug_assert_eq!(actual, REGISTERING | WAKING); - - // Take the waker to wake once the atomic operation has - // completed. - let waker = (*self.waker.get()).take().unwrap(); - - // We need to return to WAITING state (clear our lock and - // concurrent WAKING flag). This needs to acquire all - // WAKING fetch_or releases and it needs to release our - // update to self.waker, so we need a `swap` operation. - self.state.swap(WAITING, AcqRel); - - // memory ordering: we acquired the state for all - // concurrent wakes, but future wakes might still - // need to wake us in case we can't make progress - // from the pending wakes. - // - // So we simply schedule to come back later (we could - // also simply leave the registration in place above). - waker.wake(); - } - } - } - } - WAKING => { - // Currently in the process of waking the task, i.e., - // `wake` is currently being called on the old task handle. - // - // memory ordering: we acquired the state for all - // concurrent wakes, but future wakes might still - // need to wake us in case we can't make progress - // from the pending wakes. - // - // So we simply schedule to come back later (we - // could also spin here trying to acquire the lock - // to register). - waker.wake_by_ref(); - } - state => { - // In this case, a concurrent thread is holding the - // "registering" lock. This probably indicates a bug in the - // caller's code as racing to call `register` doesn't make much - // sense. - // - // memory ordering: don't care. a concurrent register() is going - // to succeed and provide proper memory ordering. - // - // We just want to maintain memory safety. It is ok to drop the - // call to `register`. - debug_assert!(state == REGISTERING || state == REGISTERING | WAKING); - } - } - } - - /// Calls `wake` on the last `Waker` passed to `register`. - /// - /// If `register` has not been called yet, then this does nothing. - pub fn wake(&self) { - if let Some(waker) = self.take() { - waker.wake(); - } - } - - /// Returns the last `Waker` passed to `register`, so that the user can wake it. - /// - /// - /// Sometimes, just waking the AtomicWaker is not fine grained enough. This allows the user - /// to take the waker and then wake it separately, rather than performing both steps in one - /// atomic action. - /// - /// If a waker has not been registered, this returns `None`. - pub fn take(&self) -> Option { - // AcqRel ordering is used in order to acquire the value of the `task` - // cell as well as to establish a `release` ordering with whatever - // memory the `AtomicWaker` is associated with. - match self.state.fetch_or(WAKING, AcqRel) { - WAITING => { - // The waking lock has been acquired. - let waker = unsafe { (*self.waker.get()).take() }; - - // Release the lock - self.state.fetch_and(!WAKING, Release); - - waker - } - state => { - // There is a concurrent thread currently updating the - // associated task. - // - // Nothing more to do as the `WAKING` bit has been set. It - // doesn't matter if there are concurrent registering threads or - // not. - // - debug_assert!( - state == REGISTERING || state == REGISTERING | WAKING || state == WAKING - ); - None - } - } - } -} - -impl Default for AtomicWaker { - fn default() -> Self { - Self::new() - } -} - -impl fmt::Debug for AtomicWaker { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "AtomicWaker") - } -} - -unsafe impl Send for AtomicWaker {} -unsafe impl Sync for AtomicWaker {} diff --git a/core/src/runtime/schedular/queue.rs b/core/src/runtime/schedular/queue.rs deleted file mode 100644 index 250ffb89..00000000 --- a/core/src/runtime/schedular/queue.rs +++ /dev/null @@ -1,112 +0,0 @@ -use core::{ - cell::Cell, - pin::Pin, - ptr::{self, NonNull}, - sync::atomic::{AtomicPtr, Ordering}, -}; - -use super::atomic_waker::AtomicWaker; - -pub struct NodeHeader { - next: AtomicPtr, -} - -impl NodeHeader { - pub fn new() -> NodeHeader { - NodeHeader { - next: AtomicPtr::new(ptr::null_mut()), - } - } -} - -pub struct Queue { - waker: AtomicWaker, - head: AtomicPtr, - tail: Cell<*const NodeHeader>, - stub: NodeHeader, -} - -unsafe impl Send for Queue {} -unsafe impl Sync for Queue {} - -#[derive(Debug)] -pub enum Pop { - Empty, - Value(NonNull), - Inconsistant, -} - -/// Intrusive MPSC queue from 1024cores blog. -/// Similar to the one used int the FuturesUnordered implementation -impl Queue { - pub fn new() -> Self { - Queue { - waker: AtomicWaker::new(), - head: AtomicPtr::new(ptr::null_mut()), - tail: Cell::new(ptr::null_mut()), - stub: NodeHeader { - next: AtomicPtr::new(ptr::null_mut()), - }, - } - } - - pub fn waker(&self) -> &AtomicWaker { - &self.waker - } - - pub unsafe fn init(self: Pin<&Self>) { - let ptr = &self.stub as *const _ as *mut _; - self.head.store(ptr, Ordering::Release); - self.tail.set(ptr); - } - - /// # Safety - /// - node must be a valid pointer - /// - Queue must have been properly initialized. - pub unsafe fn push(self: Pin<&Self>, node: NonNull) { - node.as_ref().next.store(ptr::null_mut(), Ordering::Release); - - let prev = self.get_ref().head.swap(node.as_ptr(), Ordering::AcqRel); - - (*prev).next.store(node.as_ptr(), Ordering::Release); - } - - /// # Safety - /// - Queue must have been properly initialized. - /// - Can only be called from a single thread. - pub unsafe fn pop(self: Pin<&Self>) -> Pop { - let mut tail = self.tail.get(); - let mut next = (*tail).next.load(Ordering::Acquire); - - if core::ptr::eq(tail, &self.get_ref().stub) { - if next.is_null() { - return Pop::Empty; - } - - self.tail.set(next); - tail = next; - next = (*next).next.load(core::sync::atomic::Ordering::Acquire); - } - - if !next.is_null() { - self.tail.set(next); - return Pop::Value(NonNull::new_unchecked(tail as *mut NodeHeader)); - } - - let head = self.head.load(Ordering::Acquire); - if !core::ptr::eq(head, tail) { - return Pop::Inconsistant; - } - - self.push(NonNull::from(&self.get_ref().stub)); - - next = (*tail).next.load(Ordering::Acquire); - - if !next.is_null() { - self.tail.set(next); - return Pop::Value(NonNull::new_unchecked(tail as *mut NodeHeader)); - } - - Pop::Empty - } -} diff --git a/core/src/runtime/schedular/task.rs b/core/src/runtime/schedular/task.rs deleted file mode 100644 index c13fd185..00000000 --- a/core/src/runtime/schedular/task.rs +++ /dev/null @@ -1,135 +0,0 @@ -use alloc::sync::{Arc, Weak}; -use core::{ - cell::{Cell, UnsafeCell}, - future::Future, - mem::ManuallyDrop, - ptr::{addr_of_mut, NonNull}, - sync::atomic::AtomicBool, - task::{Context, Poll}, -}; - -use super::{ - queue::{NodeHeader, Queue}, - vtable::VTable, -}; - -#[repr(C)] -pub struct Task { - /// Header for the intrusive list, - /// Must be first. - pub(crate) head: NodeHeader, - /// Data the schedular uses to run the future. - pub(crate) body: TaskBody, - /// The future itself. - pub(crate) future: UnsafeCell>, -} - -impl> Task { - pub fn new(queue: Weak, f: F) -> Self { - Self { - head: NodeHeader::new(), - body: TaskBody { - queue, - vtable: VTable::get::(), - next: Cell::new(None), - prev: Cell::new(None), - queued: AtomicBool::new(true), - done: Cell::new(false), - }, - future: UnsafeCell::new(ManuallyDrop::new(f)), - } - } -} - -// Seperate struct to not have everything in task be repr(C) -pub struct TaskBody { - pub(crate) queue: Weak, - pub(crate) vtable: &'static VTable, - // The double linked list of tasks. - pub(crate) next: Cell>, - pub(crate) prev: Cell>, - // wether the task is currently in the queue to be re-polled. - pub(crate) queued: AtomicBool, - pub(crate) done: Cell, -} - -/// A raw pointer to a task with it's type erased. -#[derive(Clone, Copy, Debug)] -#[repr(transparent)] -pub struct ErasedTaskPtr(NonNull>); - -impl ErasedTaskPtr { - pub fn from_nonnull(ptr: NonNull>) -> Self { - Self(ptr) - } - - pub unsafe fn body<'a>(self) -> &'a TaskBody { - let ptr = self.0.as_ptr(); - unsafe { &*addr_of_mut!((*ptr).body) } - } - - pub fn as_node_ptr(self) -> NonNull { - self.0.cast() - } - - pub fn as_nonnull(self) -> NonNull> { - self.0 - } - - pub unsafe fn task_drive(self, cx: &mut Context) -> Poll<()> { - unsafe { (self.body().vtable.task_drive)(self.0, cx) } - } - - pub unsafe fn task_incr(self) { - unsafe { (self.body().vtable.task_incr)(self.0) } - } - - pub unsafe fn task_decr(self) { - unsafe { (self.body().vtable.task_decr)(self.0) } - } - - pub unsafe fn task_drop(self) { - unsafe { (self.body().vtable.task_drop)(self.0) } - } -} - -/// An owning pointer to a task with it's type erased. -pub struct ErasedTask(ErasedTaskPtr); - -impl ErasedTask { - pub unsafe fn from_ptr(ptr: ErasedTaskPtr) -> Self { - Self(ptr) - } - - pub fn into_ptr(this: Self) -> ErasedTaskPtr { - let res = this.0; - core::mem::forget(this); - res - } - - pub fn new(task: Arc>) -> Self { - unsafe { - let ptr = NonNull::new_unchecked(Arc::into_raw(task) as *mut Task).cast(); - Self(ErasedTaskPtr(ptr)) - } - } - - pub fn body(&self) -> &TaskBody { - unsafe { self.0.body() } - } -} - -impl Clone for ErasedTask { - fn clone(&self) -> Self { - unsafe { - self.0.task_incr(); - Self(self.0) - } - } -} - -impl Drop for ErasedTask { - fn drop(&mut self) { - unsafe { self.0.task_decr() } - } -} diff --git a/core/src/runtime/schedular/vtable.rs b/core/src/runtime/schedular/vtable.rs deleted file mode 100644 index 4fd936a9..00000000 --- a/core/src/runtime/schedular/vtable.rs +++ /dev/null @@ -1,55 +0,0 @@ -use alloc::sync::Arc; -use core::{ - future::Future, - mem::ManuallyDrop, - pin::Pin, - ptr::NonNull, - task::{Context, Poll}, -}; - -use super::Task; - -#[derive(Debug, Clone)] -pub(crate) struct VTable { - pub(crate) task_drive: unsafe fn(NonNull>, cx: &mut Context) -> Poll<()>, - pub(crate) task_decr: unsafe fn(NonNull>), - pub(crate) task_incr: unsafe fn(NonNull>), - pub(crate) task_drop: unsafe fn(NonNull>), -} - -impl VTable { - pub const fn get>() -> &'static VTable { - trait HasVTable { - const V_TABLE: VTable; - } - - impl> HasVTable for F { - const V_TABLE: VTable = VTable { - task_decr: VTable::decr::, - task_drive: VTable::drive::, - task_incr: VTable::incr::, - task_drop: VTable::drop::, - }; - } - - &::V_TABLE - } - - unsafe fn decr>(ptr: NonNull>) { - Arc::decrement_strong_count(ptr.cast::>().as_ptr()) - } - - unsafe fn incr>(ptr: NonNull>) { - Arc::increment_strong_count(ptr.cast::>().as_ptr()) - } - - unsafe fn drive>(ptr: NonNull>, cx: &mut Context) -> Poll<()> { - let ptr = ptr.cast::>(); - Pin::new_unchecked(&mut *(*ptr.as_ref().future.get())).poll(cx) - } - - unsafe fn drop>(ptr: NonNull>) { - let ptr = ptr.cast::>(); - unsafe { ManuallyDrop::drop(&mut (*ptr.as_ref().future.get())) } - } -} diff --git a/core/src/runtime/schedular/waker.rs b/core/src/runtime/schedular/waker.rs deleted file mode 100644 index 09fb0e7f..00000000 --- a/core/src/runtime/schedular/waker.rs +++ /dev/null @@ -1,70 +0,0 @@ -use core::{ - pin::Pin, - ptr::NonNull, - sync::atomic::Ordering, - task::{RawWaker, RawWakerVTable, Waker}, -}; - -use super::{ - task::{ErasedTask, ErasedTaskPtr}, - Task, -}; - -unsafe fn inner_clone(ptr: *const ()) { - let task_ptr = ptr.cast::>(); - ErasedTaskPtr::from_nonnull(NonNull::new_unchecked(task_ptr as *mut Task<()>)).task_incr(); -} - -unsafe fn schedular_clone(ptr: *const ()) -> RawWaker { - inner_clone(ptr); - RawWaker::new(ptr, &SCHEDULAR_WAKER_V_TABLE) -} - -unsafe fn schedular_wake(ptr: *const ()) { - // We have ownership so take it. - let task = NonNull::new_unchecked(ptr as *mut ()).cast::>(); - let task = ErasedTaskPtr::from_nonnull(task); - let task = ErasedTask::from_ptr(task); - - if task - .body() - .queued - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) - .is_err() - { - return; - } - - // retrieve the queue, if already dropped, just return as we don't need to awake anything. - let Some(queue) = task.body().queue.upgrade() else { - return; - }; - - // push to the que - Pin::new_unchecked(&*queue).push(ErasedTask::into_ptr(task).as_node_ptr()); - - // wake up the schedular. - queue.waker().wake() -} - -unsafe fn schedular_wake_ref(ptr: *const ()) { - inner_clone(ptr); - schedular_wake(ptr) -} - -unsafe fn schedular_drop(ptr: *const ()) { - let task_ptr = (ptr as *mut ()).cast(); - ErasedTaskPtr::from_nonnull(NonNull::new_unchecked(task_ptr)).task_decr(); -} - -static SCHEDULAR_WAKER_V_TABLE: RawWakerVTable = RawWakerVTable::new( - schedular_clone, - schedular_wake, - schedular_wake_ref, - schedular_drop, -); - -pub unsafe fn get(ptr: ErasedTask) -> Waker { - let ptr = ErasedTask::into_ptr(ptr).as_nonnull().as_ptr(); - unsafe { Waker::from_raw(RawWaker::new(ptr.cast(), &SCHEDULAR_WAKER_V_TABLE)) } -} diff --git a/core/src/runtime/spawner.rs b/core/src/runtime/spawner.rs index 2532cf30..bbf522dd 100644 --- a/core/src/runtime/spawner.rs +++ b/core/src/runtime/spawner.rs @@ -1,64 +1,13 @@ -use super::{ - schedular::{Schedular, SchedularPoll}, - AsyncWeakRuntime, InnerRuntime, -}; -use crate::AsyncRuntime; -use alloc::vec::Vec; +use super::{task_queue::TaskPoll, AsyncWeakRuntime}; use core::{ future::Future, pin::Pin, - task::{ready, Context, Poll, Waker}, + task::{Context, Poll}, }; -use async_lock::futures::LockArc; - -/// A structure to hold futures spawned inside the runtime. -pub struct Spawner { - schedular: Schedular, - wakeup: Vec, -} - -impl Spawner { - pub fn new() -> Self { - Spawner { - schedular: Schedular::new(), - wakeup: Vec::new(), - } - } - - pub unsafe fn push(&mut self, f: F) - where - F: Future, - { - unsafe { self.schedular.push(f) }; - self.wakeup.drain(..).for_each(Waker::wake); - } - - pub fn listen(&mut self, wake: Waker) { - self.wakeup.push(wake); - } - - pub fn is_empty(&mut self) -> bool { - self.schedular.is_empty() - } - - pub fn poll(&mut self, cx: &mut Context) -> SchedularPoll { - unsafe { self.schedular.poll(cx) } - } -} - -enum DriveFutureState { - Initial, - Lock { - lock_future: Option>, - // Here to ensure the lock remains valid. - _runtime: AsyncRuntime, - }, -} - +/// Future that drives the runtime's spawned tasks in the background. pub struct DriveFuture { rt: AsyncWeakRuntime, - state: DriveFutureState, } #[cfg(feature = "parallel")] @@ -68,68 +17,36 @@ unsafe impl Sync for DriveFuture {} impl DriveFuture { pub(crate) fn new(rt: AsyncWeakRuntime) -> Self { - Self { - rt, - state: DriveFutureState::Initial, - } + Self { rt } } } impl Future for DriveFuture { type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll { - // Safety: We manually ensure that pinned values remained properly pinned. - let this = unsafe { self.get_unchecked_mut() }; - loop { - let mut lock = match this.state { - DriveFutureState::Initial => { - let Some(_runtime) = this.rt.try_ref() else { - return Poll::Ready(()); - }; - - let lock_future = _runtime.inner.lock_arc(); - this.state = DriveFutureState::Lock { - lock_future: Some(lock_future), - _runtime, - }; - continue; - } - DriveFutureState::Lock { - ref mut lock_future, - .. - } => { - // Safety: The future will not be moved until it is ready and then dropped. - let res = unsafe { - ready!(Pin::new_unchecked(lock_future.as_mut().unwrap()).poll(cx)) - }; - // Assign none explicitly so it we don't move out of the future. - *lock_future = None; - res - } - }; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Some(runtime) = self.rt.try_ref() else { + return Poll::Ready(()); + }; - lock.runtime.update_stack_top(); - - lock.runtime.get_opaque().listen(cx.waker().clone()); + let Some(mut lock) = runtime.try_lock() else { + cx.waker().wake_by_ref(); + return Poll::Pending; + }; - loop { - // TODO: Handle error. - if let Ok(true) = lock.runtime.execute_pending_job() { - continue; - } + lock.runtime.update_stack_top(); + lock.runtime.get_opaque().listen(cx.waker().clone()); - // TODO: Handle error. - match lock.runtime.get_opaque().poll(cx) { - SchedularPoll::ShouldYield | SchedularPoll::Empty | SchedularPoll::Pending => { - break - } - SchedularPoll::PendingProgress => {} - } + loop { + if let Ok(true) = lock.runtime.execute_pending_job() { + continue; + } + match lock.runtime.get_opaque().poll(cx) { + TaskPoll::Progress => continue, + _ => break, } - - this.state = DriveFutureState::Initial; - return Poll::Pending; } + + Poll::Pending } } diff --git a/core/src/runtime/task_queue.rs b/core/src/runtime/task_queue.rs new file mode 100644 index 00000000..2f247485 --- /dev/null +++ b/core/src/runtime/task_queue.rs @@ -0,0 +1,210 @@ +//! Task queue for spawned futures - optimized for both parallel and non-parallel modes + +#[cfg(not(feature = "parallel"))] +use alloc::{boxed::Box, collections::VecDeque}; +#[cfg(feature = "parallel")] +use alloc::{boxed::Box, collections::VecDeque, vec::Vec}; +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll, Waker}, +}; + +#[cfg(feature = "parallel")] +use parking_lot::Mutex; + +#[cfg(not(feature = "parallel"))] +use core::cell::UnsafeCell; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TaskPoll { + Empty, + Pending, + Progress, + Done, +} + +type BoxedTask = Pin>>; + +#[cfg(feature = "parallel")] +pub struct TaskQueue { + inner: Mutex, +} + +#[cfg(feature = "parallel")] +struct TaskQueueInner { + tasks: VecDeque, + waker: Option, +} + +#[cfg(not(feature = "parallel"))] +pub struct TaskQueue { + inner: UnsafeCell, +} + +#[cfg(not(feature = "parallel"))] +struct TaskQueueInner { + tasks: VecDeque, + waker: Option, +} + +#[cfg(feature = "parallel")] +impl TaskQueue { + pub fn new() -> Self { + TaskQueue { + inner: Mutex::new(TaskQueueInner { + tasks: VecDeque::new(), + waker: None, + }), + } + } + + pub fn is_empty(&self) -> bool { + self.inner.lock().tasks.is_empty() + } + + /// # Safety + /// Caller must ensure future lifetime is valid + pub unsafe fn push>(&self, future: F) { + let future: BoxedTask = + core::mem::transmute(Box::pin(future) as Pin + '_>>); + let mut inner = self.inner.lock(); + inner.tasks.push_back(future); + if let Some(w) = inner.waker.take() { + w.wake(); + } + } + + pub fn listen(&self, waker: Waker) { + self.inner.lock().waker = Some(waker); + } + + /// Poll tasks - optimized to minimize lock contention + pub fn poll(&self, cx: &mut Context) -> TaskPoll { + // Take all tasks out in one lock acquisition + let mut batch: Vec = { + let mut inner = self.inner.lock(); + if inner.tasks.is_empty() { + return TaskPoll::Empty; + } + inner.tasks.drain(..).collect() + }; + + let mut made_progress = false; + let mut pending = Vec::new(); + + // Poll all tasks without holding the lock + for mut task in batch.drain(..) { + match task.as_mut().poll(cx) { + Poll::Ready(()) => made_progress = true, + Poll::Pending => pending.push(task), + } + } + + // Put pending tasks back in one lock acquisition + if !pending.is_empty() { + let mut inner = self.inner.lock(); + for task in pending.into_iter().rev() { + inner.tasks.push_front(task); + } + } + + // Check if new tasks were spawned during polling + let has_tasks = !self.inner.lock().tasks.is_empty(); + + if !has_tasks { + if made_progress { + TaskPoll::Done + } else { + TaskPoll::Empty + } + } else if made_progress { + TaskPoll::Progress + } else { + TaskPoll::Pending + } + } +} + +#[cfg(not(feature = "parallel"))] +impl TaskQueue { + pub fn new() -> Self { + TaskQueue { + inner: UnsafeCell::new(TaskQueueInner { + tasks: VecDeque::new(), + waker: None, + }), + } + } + + #[inline] + #[allow(clippy::mut_from_ref)] + fn inner(&self) -> &mut TaskQueueInner { + unsafe { &mut *self.inner.get() } + } + + pub fn is_empty(&self) -> bool { + self.inner().tasks.is_empty() + } + + /// # Safety + /// Caller must ensure future lifetime is valid + pub unsafe fn push>(&self, future: F) { + let future: BoxedTask = + core::mem::transmute(Box::pin(future) as Pin + '_>>); + let inner = self.inner(); + inner.tasks.push_back(future); + if let Some(w) = inner.waker.take() { + w.wake(); + } + } + + pub fn listen(&self, waker: Waker) { + self.inner().waker = Some(waker); + } + + pub fn poll(&self, cx: &mut Context) -> TaskPoll { + let inner = self.inner(); + + if inner.tasks.is_empty() { + return TaskPoll::Empty; + } + + let mut made_progress = false; + let count = inner.tasks.len(); + + for _ in 0..count { + let Some(mut task) = inner.tasks.pop_front() else { + break; + }; + + match task.as_mut().poll(cx) { + Poll::Ready(()) => made_progress = true, + Poll::Pending => inner.tasks.push_back(task), + } + } + + if inner.tasks.is_empty() { + if made_progress { + TaskPoll::Done + } else { + TaskPoll::Empty + } + } else if made_progress { + TaskPoll::Progress + } else { + TaskPoll::Pending + } + } +} + +impl Default for TaskQueue { + fn default() -> Self { + Self::new() + } +} + +#[cfg(feature = "parallel")] +unsafe impl Send for TaskQueue {} +#[cfg(feature = "parallel")] +unsafe impl Sync for TaskQueue {} diff --git a/core/src/util.rs b/core/src/util.rs index 9cac184b..4cc6bca4 100644 --- a/core/src/util.rs +++ b/core/src/util.rs @@ -5,151 +5,6 @@ use core::panic::UnwindSafe; /// A trait for preventing implementing traits which should not be implemented outside of rquickjs. pub trait Sealed {} -#[cfg(feature = "futures")] -pub use self::futures::*; - -#[cfg(feature = "futures")] -mod futures { - use core::{ - future::Future, - marker::PhantomData, - mem::ManuallyDrop, - ops::{Deref, DerefMut}, - pin::Pin, - task::{Context, Poll}, - }; - - /// Future which allows one to bail out of a async context, back to manually calling poll. - pub struct ManualPoll { - f: F, - _marker: PhantomData, - } - - impl ManualPoll - where - F: FnMut(&mut Context) -> Poll, - { - pub fn new(f: F) -> Self { - ManualPoll { - f, - _marker: PhantomData, - } - } - } - - impl Unpin for ManualPoll {} - - impl Future for ManualPoll - where - F: FnMut(&mut Context) -> Poll, - { - type Output = R; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let p = unsafe { self.get_unchecked_mut() }; - (p.f)(cx) - } - } - - #[cfg(feature = "parallel")] - pub use self::parallel::*; - - #[cfg(feature = "parallel")] - mod parallel { - use super::*; - - pub struct AssertSyncFuture(F); - - impl AssertSyncFuture { - pub unsafe fn assert(f: F) -> Self { - Self(f) - } - } - - unsafe impl Sync for AssertSyncFuture {} - unsafe impl Send for AssertSyncFuture {} - - impl Future for AssertSyncFuture - where - F: Future, - { - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let f = unsafe { self.map_unchecked_mut(|x| &mut x.0) }; - f.poll(cx) - } - } - - pub struct AssertSendFuture(F); - - impl AssertSendFuture { - pub unsafe fn assert(f: F) -> Self { - Self(f) - } - } - - unsafe impl Send for AssertSendFuture {} - unsafe impl Sync for AssertSendFuture {} - - impl Future for AssertSendFuture - where - F: Future, - { - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let f = unsafe { self.map_unchecked_mut(|x| &mut x.0) }; - f.poll(cx) - } - } - } - - pub struct Defer { - value: ManuallyDrop, - f: Option, - } - - impl Defer { - pub fn new(value: T, func: F) -> Self { - Defer { - value: ManuallyDrop::new(value), - f: Some(func), - } - } - - pub fn take(mut self) -> T { - self.f = None; - unsafe { ManuallyDrop::take(&mut self.value) } - } - } - - impl Deref for Defer { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.value - } - } - - impl DerefMut for Defer { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.value - } - } - - impl Drop for Defer - where - F: FnOnce(T), - { - fn drop(&mut self) { - if let Some(x) = self.f.take() { - unsafe { (x)(ManuallyDrop::take(&mut self.value)) }; - } - } - } -} - #[cfg(feature = "std")] pub fn catch_unwind( f: impl FnOnce() -> R + UnwindSafe, @@ -161,7 +16,6 @@ pub fn catch_unwind( pub fn catch_unwind( f: impl FnOnce() -> R + UnwindSafe, ) -> Result> { - // with no-std we can't unwind, just call the function directly Ok(f()) } @@ -172,6 +26,5 @@ pub fn resume_unwind(payload: alloc::boxed::Box) -> ! #[cfg(not(feature = "std"))] pub fn resume_unwind(_payload: alloc::boxed::Box) -> ! { - // with no-std we can't unwind, just panic panic!() }