Skip to content

Commit 0895897

Browse files
authored
fix: wake event loop when V8 posts foreground tasks from background threads (#32450)
When V8 background threads post foreground tasks (module compilation callbacks, Atomics.waitAsync timeouts, worker lifecycle events), the event loop had no way to know it should wake up and pump those tasks. This caused stalls where programs would hang until an unrelated timer or I/O event happened to fire. The fix uses rusty_v8's new PlatformImpl trait to create a custom V8 platform (DenoPlatformImpl) that intercepts foreground task posts. When a V8 background thread posts a task for an isolate, the callback looks up that isolate's AtomicWaker in a global registry and wakes the corresponding Tokio task.
1 parent bbd5282 commit 0895897

File tree

9 files changed

+299
-12
lines changed

9 files changed

+299
-12
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ deno_task_shell = "=0.29.0"
9999
deno_terminal = "=0.2.3"
100100
deno_unsync = { version = "0.4.4", default-features = false }
101101
deno_whoami = "0.1.0"
102-
v8 = { version = "146.3.0", default-features = false }
102+
v8 = { version = "146.4.0", default-features = false }
103103

104104
denokv_proto = "0.13.0"
105105
denokv_remote = "0.13.0"

libs/core/runtime/jsruntime.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ impl InnerIsolateState {
180180
self.main_realm.0.context_state.pending_ops.shutdown();
181181
let inspector = self.state.inspector.take();
182182

183+
// Unregister isolate waker before dropping the isolate
184+
let isolate_ptr = unsafe { self.v8_isolate.as_raw_isolate_ptr() };
185+
setup::unregister_isolate_waker(setup::isolate_ptr_to_key(isolate_ptr));
186+
183187
let state_ptr = self.v8_isolate.get_data(STATE_DATA_OFFSET);
184188
// SAFETY: We are sure that it's a valid pointer for whole lifetime of
185189
// the runtime.
@@ -450,6 +454,7 @@ pub struct JsRuntimeState {
450454
pub(crate) function_templates: Rc<RefCell<FunctionTemplateData>>,
451455
pub(crate) callsite_prototype: RefCell<Option<v8::Global<v8::Object>>>,
452456
waker: Arc<AtomicWaker>,
457+
safety_net_active: Arc<std::sync::atomic::AtomicBool>,
453458
/// Accessed through [`JsRuntimeState::with_inspector`].
454459
inspector: RefCell<Option<Rc<JsRuntimeInspector>>>,
455460
has_inspector: Cell<bool>,
@@ -776,7 +781,8 @@ impl JsRuntime {
776781
eval_context_code_cache_ready_cb: RefCell::new(
777782
eval_context_set_code_cache_cb,
778783
),
779-
waker,
784+
waker: waker.clone(),
785+
safety_net_active: Arc::new(std::sync::atomic::AtomicBool::new(false)),
780786
// Some fields are initialized later after isolate is created
781787
inspector: None.into(),
782788
has_inspector: false.into(),
@@ -852,6 +858,14 @@ impl JsRuntime {
852858
);
853859

854860
let isolate_ptr = unsafe { isolate.as_raw_isolate_ptr() };
861+
862+
// Register this isolate's waker so the custom platform can wake
863+
// the event loop when V8 posts foreground tasks from background threads.
864+
setup::register_isolate_waker(
865+
setup::isolate_ptr_to_key(isolate_ptr),
866+
waker.clone(),
867+
);
868+
855869
// ...isolate is fully set up, we can forward its pointer to the ops to finish
856870
// their' setup...
857871
for op_ctx in op_ctxs.iter_mut() {
@@ -2268,6 +2282,24 @@ impl JsRuntime {
22682282
scope.perform_microtask_checkpoint();
22692283
}
22702284

2285+
// Safety net: if V8 has pending background tasks (e.g. module compilation),
2286+
// schedule a delayed wake to pump the message loop in case the platform
2287+
// callback was missed due to a race condition.
2288+
if pending_state.has_pending_background_tasks
2289+
&& !self
2290+
.inner
2291+
.state
2292+
.safety_net_active
2293+
.swap(true, std::sync::atomic::Ordering::SeqCst)
2294+
{
2295+
cx.waker().wake_by_ref();
2296+
self
2297+
.inner
2298+
.state
2299+
.safety_net_active
2300+
.store(false, std::sync::atomic::Ordering::SeqCst);
2301+
}
2302+
22712303
// Re-wake logic for next iteration
22722304
#[allow(clippy::suspicious_else_formatting, clippy::if_same_then_else)]
22732305
{

libs/core/runtime/setup.rs

Lines changed: 214 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,223 @@
11
// Copyright 2018-2026 the Deno authors. MIT license.
22

33
use std::borrow::Cow;
4+
use std::collections::BinaryHeap;
5+
use std::collections::HashMap;
6+
use std::ffi::c_void;
47
use std::sync::Mutex;
58
use std::sync::Once;
69
use std::sync::atomic::AtomicBool;
710
use std::sync::atomic::Ordering;
11+
use std::time::Duration;
12+
use std::time::Instant;
13+
14+
use futures::task::AtomicWaker;
815

916
use super::bindings;
1017
use super::snapshot;
1118
use super::snapshot::V8Snapshot;
1219

20+
/// Extract the raw isolate address from an `UnsafeRawIsolatePtr`.
21+
///
22+
/// `UnsafeRawIsolatePtr` is `#[repr(transparent)]` over `*mut RealIsolate`,
23+
/// so its bit-pattern is a single pointer-sized value. We use transmute
24+
/// because the inner field is private.
25+
///
26+
/// The compile-time assert below guarantees the layout assumption holds.
27+
const _: () = assert!(
28+
std::mem::size_of::<v8::UnsafeRawIsolatePtr>()
29+
== std::mem::size_of::<usize>()
30+
);
31+
32+
pub(crate) fn isolate_ptr_to_key(ptr: v8::UnsafeRawIsolatePtr) -> usize {
33+
// SAFETY: UnsafeRawIsolatePtr is #[repr(transparent)] over *mut RealIsolate,
34+
// which is pointer-sized. The compile-time assert above guarantees this.
35+
unsafe { std::mem::transmute::<v8::UnsafeRawIsolatePtr, usize>(ptr) }
36+
}
37+
38+
/// Per-isolate state shared between the V8 platform callback and
39+
/// the event loop.
40+
struct IsolateWakeEntry {
41+
waker: std::sync::Arc<AtomicWaker>,
42+
}
43+
44+
/// Global registry mapping isolate pointers to their event loop wake state.
45+
/// When V8 posts a foreground task for an isolate, the callback looks up
46+
/// the state here, sets the notification flag, and wakes the event loop.
47+
/// Isolates that received a notification before their state was registered
48+
/// are tracked in `pending_wakes` so `register_isolate_waker` can wake
49+
/// them immediately.
50+
struct IsolateWakerRegistry {
51+
entries: HashMap<usize, IsolateWakeEntry>,
52+
pending_wakes: std::collections::HashSet<usize>,
53+
}
54+
55+
static ISOLATE_WAKERS: std::sync::LazyLock<Mutex<IsolateWakerRegistry>> =
56+
std::sync::LazyLock::new(|| {
57+
Mutex::new(IsolateWakerRegistry {
58+
entries: HashMap::new(),
59+
pending_wakes: std::collections::HashSet::new(),
60+
})
61+
});
62+
63+
/// Register a waker and notification flag for an isolate so foreground
64+
/// task notifications wake the correct event loop. If a notification
65+
/// arrived before registration, the waker is triggered immediately.
66+
pub fn register_isolate_waker(
67+
isolate_ptr: usize,
68+
waker: std::sync::Arc<AtomicWaker>,
69+
) {
70+
let mut reg = ISOLATE_WAKERS.lock().unwrap();
71+
if reg.pending_wakes.remove(&isolate_ptr) {
72+
waker.wake();
73+
}
74+
reg.entries.insert(isolate_ptr, IsolateWakeEntry { waker });
75+
}
76+
77+
/// Unregister an isolate's wake state (called on isolate drop).
78+
pub fn unregister_isolate_waker(isolate_ptr: usize) {
79+
let mut reg = ISOLATE_WAKERS.lock().unwrap();
80+
reg.entries.remove(&isolate_ptr);
81+
reg.pending_wakes.remove(&isolate_ptr);
82+
}
83+
84+
/// Wake the event loop for a given isolate. Sets the notification flag
85+
/// and wakes the AtomicWaker. If the isolate's state is not yet
86+
/// registered, marks it as pending so registration notifies it.
87+
fn wake_isolate(key: usize) {
88+
let mut reg = ISOLATE_WAKERS.lock().unwrap();
89+
if let Some(entry) = reg.entries.get(&key) {
90+
entry.waker.wake();
91+
} else {
92+
reg.pending_wakes.insert(key);
93+
}
94+
}
95+
96+
/// Entry in the delayed-task timer queue.
97+
struct TimerEntry {
98+
deadline: Instant,
99+
isolate_key: usize,
100+
}
101+
102+
impl PartialEq for TimerEntry {
103+
fn eq(&self, other: &Self) -> bool {
104+
self.deadline == other.deadline
105+
}
106+
}
107+
108+
impl Eq for TimerEntry {}
109+
110+
impl PartialOrd for TimerEntry {
111+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
112+
Some(self.cmp(other))
113+
}
114+
}
115+
116+
impl Ord for TimerEntry {
117+
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
118+
// Reverse so BinaryHeap (max-heap) yields the earliest deadline first.
119+
other.deadline.cmp(&self.deadline)
120+
}
121+
}
122+
123+
/// Single shared timer thread that processes all delayed V8 foreground
124+
/// task wake-ups, avoiding one OS thread per delayed task.
125+
static DELAYED_TASK_SENDER: std::sync::LazyLock<
126+
Mutex<std::sync::mpsc::Sender<TimerEntry>>,
127+
> = std::sync::LazyLock::new(|| {
128+
let (tx, rx) = std::sync::mpsc::channel();
129+
std::thread::Builder::new()
130+
.name("deno-v8-timer".into())
131+
.spawn(move || delayed_task_thread(rx))
132+
.unwrap();
133+
Mutex::new(tx)
134+
});
135+
136+
fn delayed_task_thread(rx: std::sync::mpsc::Receiver<TimerEntry>) {
137+
let mut heap: BinaryHeap<TimerEntry> = BinaryHeap::new();
138+
loop {
139+
// Block until either a new entry arrives or the next timer fires.
140+
if heap.is_empty() {
141+
match rx.recv() {
142+
Ok(entry) => heap.push(entry),
143+
Err(_) => break,
144+
}
145+
} else {
146+
let timeout = heap
147+
.peek()
148+
.unwrap()
149+
.deadline
150+
.saturating_duration_since(Instant::now());
151+
match rx.recv_timeout(timeout) {
152+
Ok(entry) => heap.push(entry),
153+
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
154+
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
155+
}
156+
}
157+
158+
// Drain any additional entries that arrived.
159+
while let Ok(entry) = rx.try_recv() {
160+
heap.push(entry);
161+
}
162+
163+
// Fire all expired timers.
164+
let now = Instant::now();
165+
while let Some(entry) = heap.peek() {
166+
if entry.deadline <= now {
167+
let entry = heap.pop().unwrap();
168+
wake_isolate(entry.isolate_key);
169+
} else {
170+
break;
171+
}
172+
}
173+
}
174+
}
175+
176+
/// Custom V8 platform implementation that wakes isolate event loops
177+
/// when foreground tasks are posted from any thread (including V8
178+
/// background compilation threads).
179+
struct DenoPlatformImpl;
180+
181+
impl DenoPlatformImpl {
182+
fn wake_immediate(&self, isolate_ptr: *mut c_void) {
183+
wake_isolate(isolate_ptr as usize);
184+
}
185+
186+
fn wake_delayed(&self, isolate_ptr: *mut c_void, delay_in_seconds: f64) {
187+
let entry = TimerEntry {
188+
deadline: Instant::now() + Duration::from_secs_f64(delay_in_seconds),
189+
isolate_key: isolate_ptr as usize,
190+
};
191+
let _ = DELAYED_TASK_SENDER.lock().unwrap().send(entry);
192+
}
193+
}
194+
195+
impl v8::PlatformImpl for DenoPlatformImpl {
196+
fn post_task(&self, isolate_ptr: *mut c_void) {
197+
self.wake_immediate(isolate_ptr);
198+
}
199+
200+
fn post_non_nestable_task(&self, isolate_ptr: *mut c_void) {
201+
self.wake_immediate(isolate_ptr);
202+
}
203+
204+
fn post_delayed_task(&self, isolate_ptr: *mut c_void, delay_in_seconds: f64) {
205+
self.wake_delayed(isolate_ptr, delay_in_seconds);
206+
}
207+
208+
fn post_non_nestable_delayed_task(
209+
&self,
210+
isolate_ptr: *mut c_void,
211+
delay_in_seconds: f64,
212+
) {
213+
self.wake_delayed(isolate_ptr, delay_in_seconds);
214+
}
215+
216+
fn post_idle_task(&self, isolate_ptr: *mut c_void) {
217+
self.wake_immediate(isolate_ptr);
218+
}
219+
}
220+
13221
fn v8_init(
14222
v8_platform: Option<v8::SharedRef<v8::Platform>>,
15223
snapshot: bool,
@@ -51,13 +259,12 @@ fn v8_init(
51259
v8::V8::set_flags_from_string(&flags);
52260

53261
let v8_platform = v8_platform.unwrap_or_else(|| {
54-
if cfg!(any(test, feature = "unsafe_use_unprotected_platform")) {
55-
// We want to use the unprotected platform for unit tests
56-
v8::new_unprotected_default_platform(0, false)
57-
} else {
58-
v8::new_default_platform(0, false)
59-
}
60-
.make_shared()
262+
// Use a custom platform that notifies isolate event loops when V8
263+
// background threads post foreground tasks.
264+
let unprotected =
265+
cfg!(any(test, feature = "unsafe_use_unprotected_platform"));
266+
v8::new_custom_platform(0, false, unprotected, DenoPlatformImpl)
267+
.make_shared()
61268
});
62269
v8::V8::initialize_platform(v8_platform.clone());
63270
v8::V8::initialize();
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"args": "test --quiet --no-check -A main.ts",
3+
"output": "main.out",
4+
"exitCode": 0
5+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[WILDCARD]Atomics.waitAsync resolves in worker ... ok [WILDCARD]
2+
3+
ok | 1 passed | 0 failed [WILDCARD]
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Regression test for https://github.com/denoland/deno/issues/14786
2+
// Atomics.waitAsync in a worker requires V8 to post a foreground task
3+
// to resolve the promise. Without the custom platform waking the event
4+
// loop, the worker hangs forever.
5+
Deno.test("Atomics.waitAsync resolves in worker", async () => {
6+
const sab = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
7+
const ia = new Int32Array(sab);
8+
9+
const w = new Worker(new URL("./worker.ts", import.meta.url), {
10+
type: "module",
11+
});
12+
13+
await new Promise<void>((resolve, reject) => {
14+
w.onmessage = (ev) => {
15+
if (ev.data === "waiting") {
16+
// Worker called waitAsync and is blocked — notify it.
17+
Atomics.notify(ia, 0);
18+
} else if (ev.data.ok) {
19+
resolve();
20+
} else {
21+
reject(new Error(ev.data.err));
22+
}
23+
};
24+
w.onerror = (e) => reject(e);
25+
w.postMessage(ia);
26+
});
27+
28+
w.terminate();
29+
});
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
self.onmessage = async (ev) => {
2+
const ia: Int32Array = ev.data;
3+
try {
4+
const { value } = Atomics.waitAsync(ia, 0, 0);
5+
self.postMessage("waiting");
6+
const result = await value;
7+
self.postMessage({ ok: true, result });
8+
} catch (err) {
9+
self.postMessage({ ok: false, err: String(err) });
10+
}
11+
};

tests/wpt/runner/expectations/html.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -900,7 +900,7 @@
900900
"requires-failure.https.any.html": false,
901901
"requires-success.any.worker.html": true,
902902
"atomics-wait-async.https.any.html": false,
903-
"atomics-wait-async.https.any.worker.html": false
903+
"atomics-wait-async.https.any.worker.html": true
904904
},
905905
"addEventListener.html": false,
906906
"body-onerror-compile-error-data-url.html": false,

0 commit comments

Comments
 (0)