Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ deno_task_shell = "=0.29.0"
deno_terminal = "=0.2.3"
deno_unsync = { version = "0.4.4", default-features = false }
deno_whoami = "0.1.0"
v8 = { version = "146.3.0", default-features = false }
v8 = { version = "146.4.0", default-features = false }

denokv_proto = "0.13.0"
denokv_remote = "0.13.0"
Expand Down
34 changes: 33 additions & 1 deletion libs/core/runtime/jsruntime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ impl InnerIsolateState {
self.main_realm.0.context_state.pending_ops.shutdown();
let inspector = self.state.inspector.take();

// Unregister isolate waker before dropping the isolate
let isolate_ptr = unsafe { self.v8_isolate.as_raw_isolate_ptr() };
setup::unregister_isolate_waker(setup::isolate_ptr_to_key(isolate_ptr));

let state_ptr = self.v8_isolate.get_data(STATE_DATA_OFFSET);
// SAFETY: We are sure that it's a valid pointer for whole lifetime of
// the runtime.
Expand Down Expand Up @@ -450,6 +454,7 @@ pub struct JsRuntimeState {
pub(crate) function_templates: Rc<RefCell<FunctionTemplateData>>,
pub(crate) callsite_prototype: RefCell<Option<v8::Global<v8::Object>>>,
waker: Arc<AtomicWaker>,
safety_net_active: Arc<std::sync::atomic::AtomicBool>,
/// Accessed through [`JsRuntimeState::with_inspector`].
inspector: RefCell<Option<Rc<JsRuntimeInspector>>>,
has_inspector: Cell<bool>,
Expand Down Expand Up @@ -776,7 +781,8 @@ impl JsRuntime {
eval_context_code_cache_ready_cb: RefCell::new(
eval_context_set_code_cache_cb,
),
waker,
waker: waker.clone(),
safety_net_active: Arc::new(std::sync::atomic::AtomicBool::new(false)),
// Some fields are initialized later after isolate is created
inspector: None.into(),
has_inspector: false.into(),
Expand Down Expand Up @@ -852,6 +858,14 @@ impl JsRuntime {
);

let isolate_ptr = unsafe { isolate.as_raw_isolate_ptr() };

// Register this isolate's waker so the custom platform can wake
// the event loop when V8 posts foreground tasks from background threads.
setup::register_isolate_waker(
setup::isolate_ptr_to_key(isolate_ptr),
waker.clone(),
);

// ...isolate is fully set up, we can forward its pointer to the ops to finish
// their' setup...
for op_ctx in op_ctxs.iter_mut() {
Expand Down Expand Up @@ -2268,6 +2282,24 @@ impl JsRuntime {
scope.perform_microtask_checkpoint();
}

// Safety net: if V8 has pending background tasks (e.g. module compilation),
// schedule a delayed wake to pump the message loop in case the platform
// callback was missed due to a race condition.
if pending_state.has_pending_background_tasks
&& !self
.inner
.state
.safety_net_active
.swap(true, std::sync::atomic::Ordering::SeqCst)
{
cx.waker().wake_by_ref();
self
.inner
.state
.safety_net_active
.store(false, std::sync::atomic::Ordering::SeqCst);
}

// Re-wake logic for next iteration
#[allow(clippy::suspicious_else_formatting, clippy::if_same_then_else)]
{
Expand Down
221 changes: 214 additions & 7 deletions libs/core/runtime/setup.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,223 @@
// Copyright 2018-2026 the Deno authors. MIT license.

use std::borrow::Cow;
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::ffi::c_void;
use std::sync::Mutex;
use std::sync::Once;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;

use futures::task::AtomicWaker;

use super::bindings;
use super::snapshot;
use super::snapshot::V8Snapshot;

/// Extract the raw isolate address from an `UnsafeRawIsolatePtr`.
///
/// `UnsafeRawIsolatePtr` is `#[repr(transparent)]` over `*mut RealIsolate`,
/// so its bit-pattern is a single pointer-sized value. We use transmute
/// because the inner field is private.
///
/// The compile-time assert below guarantees the layout assumption holds.
const _: () = assert!(
std::mem::size_of::<v8::UnsafeRawIsolatePtr>()
== std::mem::size_of::<usize>()
);

pub(crate) fn isolate_ptr_to_key(ptr: v8::UnsafeRawIsolatePtr) -> usize {
// SAFETY: UnsafeRawIsolatePtr is #[repr(transparent)] over *mut RealIsolate,
// which is pointer-sized. The compile-time assert above guarantees this.
unsafe { std::mem::transmute::<v8::UnsafeRawIsolatePtr, usize>(ptr) }
}

/// Per-isolate state shared between the V8 platform callback and
/// the event loop.
struct IsolateWakeEntry {
waker: std::sync::Arc<AtomicWaker>,
}

/// Global registry mapping isolate pointers to their event loop wake state.
/// When V8 posts a foreground task for an isolate, the callback looks up
/// the state here, sets the notification flag, and wakes the event loop.
/// Isolates that received a notification before their state was registered
/// are tracked in `pending_wakes` so `register_isolate_waker` can wake
/// them immediately.
struct IsolateWakerRegistry {
entries: HashMap<usize, IsolateWakeEntry>,
pending_wakes: std::collections::HashSet<usize>,
}

static ISOLATE_WAKERS: std::sync::LazyLock<Mutex<IsolateWakerRegistry>> =
std::sync::LazyLock::new(|| {
Mutex::new(IsolateWakerRegistry {
entries: HashMap::new(),
pending_wakes: std::collections::HashSet::new(),
})
});

/// Register a waker and notification flag for an isolate so foreground
/// task notifications wake the correct event loop. If a notification
/// arrived before registration, the waker is triggered immediately.
pub fn register_isolate_waker(
isolate_ptr: usize,
waker: std::sync::Arc<AtomicWaker>,
) {
let mut reg = ISOLATE_WAKERS.lock().unwrap();
if reg.pending_wakes.remove(&isolate_ptr) {
waker.wake();
}
reg.entries.insert(isolate_ptr, IsolateWakeEntry { waker });
}

/// Unregister an isolate's wake state (called on isolate drop).
pub fn unregister_isolate_waker(isolate_ptr: usize) {
let mut reg = ISOLATE_WAKERS.lock().unwrap();
reg.entries.remove(&isolate_ptr);
reg.pending_wakes.remove(&isolate_ptr);
}

/// Wake the event loop for a given isolate. Sets the notification flag
/// and wakes the AtomicWaker. If the isolate's state is not yet
/// registered, marks it as pending so registration notifies it.
fn wake_isolate(key: usize) {
let mut reg = ISOLATE_WAKERS.lock().unwrap();
if let Some(entry) = reg.entries.get(&key) {
entry.waker.wake();
} else {
reg.pending_wakes.insert(key);
}
}

/// Entry in the delayed-task timer queue.
struct TimerEntry {
deadline: Instant,
isolate_key: usize,
}

impl PartialEq for TimerEntry {
fn eq(&self, other: &Self) -> bool {
self.deadline == other.deadline
}
}

impl Eq for TimerEntry {}

impl PartialOrd for TimerEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for TimerEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Reverse so BinaryHeap (max-heap) yields the earliest deadline first.
other.deadline.cmp(&self.deadline)
}
}

/// Single shared timer thread that processes all delayed V8 foreground
/// task wake-ups, avoiding one OS thread per delayed task.
static DELAYED_TASK_SENDER: std::sync::LazyLock<
Mutex<std::sync::mpsc::Sender<TimerEntry>>,
> = std::sync::LazyLock::new(|| {
let (tx, rx) = std::sync::mpsc::channel();
std::thread::Builder::new()
.name("deno-v8-timer".into())
.spawn(move || delayed_task_thread(rx))
.unwrap();
Mutex::new(tx)
});

fn delayed_task_thread(rx: std::sync::mpsc::Receiver<TimerEntry>) {
let mut heap: BinaryHeap<TimerEntry> = BinaryHeap::new();
loop {
// Block until either a new entry arrives or the next timer fires.
if heap.is_empty() {
match rx.recv() {
Ok(entry) => heap.push(entry),
Err(_) => break,
}
} else {
let timeout = heap
.peek()
.unwrap()
.deadline
.saturating_duration_since(Instant::now());
match rx.recv_timeout(timeout) {
Ok(entry) => heap.push(entry),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
}
}

// Drain any additional entries that arrived.
while let Ok(entry) = rx.try_recv() {
heap.push(entry);
}

// Fire all expired timers.
let now = Instant::now();
while let Some(entry) = heap.peek() {
if entry.deadline <= now {
let entry = heap.pop().unwrap();
wake_isolate(entry.isolate_key);
} else {
break;
}
}
}
}

/// Custom V8 platform implementation that wakes isolate event loops
/// when foreground tasks are posted from any thread (including V8
/// background compilation threads).
struct DenoPlatformImpl;

impl DenoPlatformImpl {
fn wake_immediate(&self, isolate_ptr: *mut c_void) {
wake_isolate(isolate_ptr as usize);
}

fn wake_delayed(&self, isolate_ptr: *mut c_void, delay_in_seconds: f64) {
let entry = TimerEntry {
deadline: Instant::now() + Duration::from_secs_f64(delay_in_seconds),
isolate_key: isolate_ptr as usize,
};
let _ = DELAYED_TASK_SENDER.lock().unwrap().send(entry);
}
}

impl v8::PlatformImpl for DenoPlatformImpl {
fn post_task(&self, isolate_ptr: *mut c_void) {
self.wake_immediate(isolate_ptr);
}

fn post_non_nestable_task(&self, isolate_ptr: *mut c_void) {
self.wake_immediate(isolate_ptr);
}

fn post_delayed_task(&self, isolate_ptr: *mut c_void, delay_in_seconds: f64) {
self.wake_delayed(isolate_ptr, delay_in_seconds);
}

fn post_non_nestable_delayed_task(
&self,
isolate_ptr: *mut c_void,
delay_in_seconds: f64,
) {
self.wake_delayed(isolate_ptr, delay_in_seconds);
}

fn post_idle_task(&self, isolate_ptr: *mut c_void) {
self.wake_immediate(isolate_ptr);
}
}

fn v8_init(
v8_platform: Option<v8::SharedRef<v8::Platform>>,
snapshot: bool,
Expand Down Expand Up @@ -51,13 +259,12 @@ fn v8_init(
v8::V8::set_flags_from_string(&flags);

let v8_platform = v8_platform.unwrap_or_else(|| {
if cfg!(any(test, feature = "unsafe_use_unprotected_platform")) {
// We want to use the unprotected platform for unit tests
v8::new_unprotected_default_platform(0, false)
} else {
v8::new_default_platform(0, false)
}
.make_shared()
// Use a custom platform that notifies isolate event loops when V8
// background threads post foreground tasks.
let unprotected =
cfg!(any(test, feature = "unsafe_use_unprotected_platform"));
v8::new_custom_platform(0, false, unprotected, DenoPlatformImpl)
.make_shared()
});
v8::V8::initialize_platform(v8_platform.clone());
v8::V8::initialize();
Expand Down
5 changes: 5 additions & 0 deletions tests/specs/worker/atomics_wait_async_notify/__test__.jsonc
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"args": "test --quiet --no-check -A main.ts",
"output": "main.out",
"exitCode": 0
}
3 changes: 3 additions & 0 deletions tests/specs/worker/atomics_wait_async_notify/main.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[WILDCARD]Atomics.waitAsync resolves in worker ... ok [WILDCARD]

ok | 1 passed | 0 failed [WILDCARD]
29 changes: 29 additions & 0 deletions tests/specs/worker/atomics_wait_async_notify/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Regression test for https://github.com/denoland/deno/issues/14786
// Atomics.waitAsync in a worker requires V8 to post a foreground task
// to resolve the promise. Without the custom platform waking the event
// loop, the worker hangs forever.
Deno.test("Atomics.waitAsync resolves in worker", async () => {
const sab = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
const ia = new Int32Array(sab);

const w = new Worker(new URL("./worker.ts", import.meta.url), {
type: "module",
});

await new Promise<void>((resolve, reject) => {
w.onmessage = (ev) => {
if (ev.data === "waiting") {
// Worker called waitAsync and is blocked — notify it.
Atomics.notify(ia, 0);
} else if (ev.data.ok) {
resolve();
} else {
reject(new Error(ev.data.err));
}
};
w.onerror = (e) => reject(e);
w.postMessage(ia);
});

w.terminate();
});
11 changes: 11 additions & 0 deletions tests/specs/worker/atomics_wait_async_notify/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
self.onmessage = async (ev) => {
const ia: Int32Array = ev.data;
try {
const { value } = Atomics.waitAsync(ia, 0, 0);
self.postMessage("waiting");
const result = await value;
self.postMessage({ ok: true, result });
} catch (err) {
self.postMessage({ ok: false, err: String(err) });
}
};
2 changes: 1 addition & 1 deletion tests/wpt/runner/expectations/html.json
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@
"requires-failure.https.any.html": false,
"requires-success.any.worker.html": true,
"atomics-wait-async.https.any.html": false,
"atomics-wait-async.https.any.worker.html": false
"atomics-wait-async.https.any.worker.html": true
},
"addEventListener.html": false,
"body-onerror-compile-error-data-url.html": false,
Expand Down
Loading