Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ phf = { version = "0.13", optional = true }
indexmap = { version = "2", optional = true }
either = { version = "1", optional = true }
async-lock = { version = "3", optional = true, default-features = false }
parking_lot = { version = "0.12", optional = true }
chrono = { version = "0.4", optional = true }
dlopen2 = { version = "0.8", optional = true }
relative-path = { version = "2.0", optional = true, default-features = false, features = [
Expand Down Expand Up @@ -57,7 +58,7 @@ chrono = ["dep:chrono"]
bindgen = ["rquickjs-sys/bindgen"]

# Enable support of parallel execution
parallel = ["std", "tokio/rt-multi-thread"]
parallel = ["std", "tokio/rt-multi-thread", "parking_lot"]

# Enable user-defined module loader support
loader = ["relative-path"]
Expand Down
18 changes: 7 additions & 11 deletions core/src/context/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,14 @@ macro_rules! async_with{

impl DropContext for AsyncRuntime {
unsafe fn drop_context(&self, ctx: NonNull<qjs::JSContext>) {
//TODO
let guard = match self.inner.try_lock() {
let guard = match self.try_lock() {
Some(x) => x,
None => {
#[cfg(not(feature = "parallel"))]
{
let p =
unsafe { &mut *(ctx.as_ptr() as *mut crate::context::ctx::RefCountHeader) };
if p.ref_count <= 1 {
// Lock was poisoned, this should only happen on a panic.
// We should still free the context.
// TODO see if there is a way to recover from a panic which could cause the
// following assertion to trigger
#[cfg(feature = "std")]
assert!(std::thread::panicking());
}
Expand All @@ -121,7 +116,6 @@ impl DropContext for AsyncRuntime {
};
guard.runtime.update_stack_top();
unsafe { qjs::JS_FreeContext(ctx.as_ptr()) }
// Explicitly drop the guard to ensure it is valid during the entire use of runtime
mem::drop(guard);
}
}
Expand All @@ -145,17 +139,19 @@ impl AsyncContext {
}

/// Creates a base context with only the required functions registered.
///
/// If additional functions are required use [`AsyncContext::custom`],
/// [`AsyncContext::builder`] or [`AsyncContext::full`].
pub async fn base(runtime: &AsyncRuntime) -> Result<Self> {
Self::custom::<intrinsic::None>(runtime).await
}

/// Creates a context with only the required intrinsics registered.
///
/// If additional functions are required use [`AsyncContext::custom`],
/// [`AsyncContext::builder`] or [`AsyncContext::full`].
pub async fn custom<I: Intrinsic>(runtime: &AsyncRuntime) -> Result<Self> {
let guard = runtime.inner.lock().await;
let guard = runtime.lock().await;
let ctx = NonNull::new(unsafe { qjs::JS_NewContextRaw(guard.runtime.rt.as_ptr()) })
.ok_or(Error::Allocation)?;
unsafe { qjs::JS_AddIntrinsicBaseObjects(ctx.as_ptr()) };
Expand All @@ -168,14 +164,14 @@ impl AsyncContext {
}

/// Creates a context with all standard available intrinsics registered.
///
/// If precise control is required of which functions are available use
/// [`AsyncContext::custom`] or [`AsyncContext::builder`].
pub async fn full(runtime: &AsyncRuntime) -> Result<Self> {
let guard = runtime.inner.lock().await;
let guard = runtime.lock().await;
let ctx = NonNull::new(unsafe { qjs::JS_NewContext(guard.runtime.rt.as_ptr()) })
.ok_or(Error::Allocation)?;
let res = unsafe { ContextOwner::new(ctx, runtime.clone()) };
// Explicitly drop the guard to ensure it is valid during the entire use of runtime
guard.drop_pending();
mem::drop(guard);

Expand Down Expand Up @@ -218,7 +214,7 @@ impl AsyncContext {
F: for<'js> FnOnce(Ctx<'js>) -> R + ParallelSend,
R: ParallelSend,
{
let guard = self.0.rt().inner.lock().await;
let guard = self.0.rt().lock().await;
guard.runtime.update_stack_top();
let ctx = unsafe { Ctx::new_async(self) };
let res = f(ctx);
Expand Down
116 changes: 25 additions & 91 deletions core/src/context/async/future.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,23 @@
use alloc::boxed::Box;
use core::{
future::Future,
mem::{self, ManuallyDrop},
mem,
pin::Pin,
task::{ready, Context, Poll},
task::{Context, Poll},
};

use async_lock::futures::Lock;

use crate::{
markers::ParallelSend,
runtime::{schedular::SchedularPoll, InnerRuntime},
AsyncContext, Ctx,
};
use crate::{markers::ParallelSend, runtime::task_queue::TaskPoll, AsyncContext, Ctx};

pub struct WithFuture<'a, F, R> {
context: &'a AsyncContext,
lock_state: LockState<'a>,
state: WithFutureState<'a, F, R>,
}

enum LockState<'a> {
Initial,
Pending(ManuallyDrop<Lock<'a, InnerRuntime>>),
}

impl<'a> Drop for LockState<'a> {
fn drop(&mut self) {
if let LockState::Pending(ref mut x) = self {
unsafe { ManuallyDrop::drop(x) }
}
}
}

enum WithFutureState<'a, F, R> {
Initial {
closure: F,
},
FutureCreated {
Running {
future: Pin<Box<dyn Future<Output = R> + 'a + Send>>,
},
Done,
Expand All @@ -51,7 +31,6 @@ where
pub fn new(context: &'a AsyncContext, f: F) -> Self {
Self {
context,
lock_state: LockState::Initial,
state: WithFutureState::Initial { closure: f },
}
}
Expand All @@ -63,98 +42,53 @@ where
R: ParallelSend + 'static,
{
type Output = R;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Implementation ensures we don't break pin guarantees.
let this = unsafe { self.get_unchecked_mut() };

let mut lock = loop {
// We don't move the lock_state as long as it is pending
if let LockState::Pending(ref mut fut) = &mut this.lock_state {
// SAFETY: Sound as we don't move future while it is pending.
let pin = unsafe { Pin::new_unchecked(&mut **fut) };
let lock = ready!(pin.poll(cx));
// at this point we have acquired a lock, so we will now drop the future allowing
// us to reused the memory space.
unsafe { ManuallyDrop::drop(fut) };
// The pinned memory is dropped so now we can freely move into it.
this.lock_state = LockState::Initial;
break lock;
} else {
// we assign a state with manually drop so we can drop the value when we need to
// replace it.
// Assign
this.lock_state =
LockState::Pending(ManuallyDrop::new(this.context.0.rt().inner.lock()));
}
// Try to get lock - yields to executor if unavailable
let Some(mut lock) = this.context.0.rt().try_lock() else {
cx.waker().wake_by_ref();
return Poll::Pending;
};

lock.runtime.update_stack_top();

// At this point we have locked the runtime so we start running the actual future
// we can move this memory since the future is boxed and thus movable.
// Create or get the future
let mut future = match mem::replace(&mut this.state, WithFutureState::Done) {
WithFutureState::Initial { closure } => {
// SAFETY: we have a lock, so creating this ctx is save.
let ctx = unsafe { Ctx::new_async(this.context) };
Box::pin(closure(ctx))
}
WithFutureState::FutureCreated { future } => future,
// The future was called an additional time,
// We don't have anything valid to do here so just panic.
WithFutureState::Done => panic!("With future called after it returned"),
WithFutureState::Running { future } => future,
WithFutureState::Done => panic!("WithFuture polled after completion"),
};

let res = loop {
let mut made_progress = false;

// Poll the future and spawned tasks
loop {
if let Poll::Ready(x) = future.as_mut().poll(cx) {
break Poll::Ready(x);
};
return Poll::Ready(x);
}

let opaque = lock.runtime.get_opaque();
match opaque.poll(cx) {
SchedularPoll::Empty => {
// if the schedular is empty that means the future is waiting on an external or
// on a promise.
}
SchedularPoll::ShouldYield => {
this.state = WithFutureState::FutureCreated { future };
return Poll::Pending;
}
SchedularPoll::Pending => {
// we couldn't drive any futures so we should run some jobs to see we can get
// some progress.
}
SchedularPoll::PendingProgress => {
// We did make some progress so the root future might not be blocked, but it is
// probably still a good idea to run some jobs as most futures first require a
// single job to run before unblocking.
made_progress = true;
}
};
let mut made_progress = false;

if lock.runtime.get_opaque().poll(cx) == TaskPoll::Progress {
made_progress = true;
}

loop {
match lock.runtime.execute_pending_job() {
Ok(false) => break,
Ok(true) => made_progress = true,
Err(_ctx) => {
// TODO figure out what to do with a job error.
made_progress = true;
}
Err(_) => made_progress = true,
}
}

// If no work could be done we should yield back.
if !made_progress {
this.state = WithFutureState::FutureCreated { future };
break Poll::Pending;
this.state = WithFutureState::Running { future };
return Poll::Pending;
}
};

// Manually drop the lock so it isn't accidentally moved into somewhere.
mem::drop(lock);

res
}
}
}

Expand Down
64 changes: 63 additions & 1 deletion core/src/context/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ impl<'js> Ctx<'js> {
Opaque::from_runtime_ptr(qjs::JS_GetRuntime(self.ctx.as_ptr()))
}

/// Spawn future using configured async runtime
/// Spawn future on QuickJS's task queue (same thread as JS).
/// Use this when the future needs to access JS values.
#[cfg(feature = "futures")]
#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
pub fn spawn<F>(&self, future: F)
Expand All @@ -413,6 +414,67 @@ impl<'js> Ctx<'js> {
unsafe { self.get_opaque().push(future) }
}

/// Execute pending JS jobs and poll spawned futures once.
#[cfg(feature = "futures")]
fn poll_once(&self, cx: &mut core::task::Context) -> bool {
let mut did_work = false;

// Execute JS pending jobs
while self.execute_pending_job() {
did_work = true;
}

// Poll spawned futures
let opaque = unsafe { self.get_opaque() };
match opaque.poll(cx) {
crate::runtime::task_queue::TaskPoll::Progress
| crate::runtime::task_queue::TaskPoll::Done => did_work = true,
_ => {}
}

did_work
}

/// Await a promise, blocking until resolved while driving QuickJS jobs.
///
/// The `yield_fn` is called **only when idle** (no JS jobs or futures made progress)
/// to let the async runtime process timers and I/O.
///
/// For tokio multi-threaded runtime:
/// ```ignore
/// ctx.await_promise(promise, || {
/// tokio::task::block_in_place(|| {
/// tokio::runtime::Handle::current().block_on(tokio::task::yield_now())
/// })
/// })
/// ```
#[cfg(feature = "futures")]
#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
pub fn await_promise<T: FromJs<'js>>(
&self,
promise: Promise<'js>,
yield_fn: impl Fn(),
) -> Result<T> {
use crate::value::promise::PromiseState;
use core::task::{Context, Waker};

let waker = Waker::noop();
let mut cx = Context::from_waker(waker);

loop {
match promise.state() {
PromiseState::Resolved => return promise.result().unwrap(),
PromiseState::Rejected => return promise.result().unwrap(),
PromiseState::Pending => {
// Only yield if no work was done - avoids expensive runtime calls
if !self.poll_once(&mut cx) {
yield_fn();
}
}
}
}
}

/// Create a new `Ctx` from a pointer to the context and a invariant lifetime.
///
/// # Safety
Expand Down
6 changes: 2 additions & 4 deletions core/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,16 @@ mod userdata;
#[cfg(feature = "futures")]
mod r#async;
#[cfg(feature = "futures")]
pub(crate) mod schedular;
#[cfg(feature = "futures")]
mod spawner;
#[cfg(feature = "futures")]
pub(crate) mod task_queue;
#[cfg(feature = "futures")]
pub use spawner::DriveFuture;

use alloc::boxed::Box;
pub use base::{Runtime, WeakRuntime};
pub use userdata::{UserDataError, UserDataGuard};

#[cfg(feature = "futures")]
pub(crate) use r#async::InnerRuntime;
#[cfg(feature = "futures")]
pub use r#async::{AsyncRuntime, AsyncWeakRuntime};

Expand Down
Loading
Loading