Skip to content

Commit 3f99ebb

Browse files
committed
fix: wake event loop when V8 posts foreground tasks from background threads
1 parent b7cc30f commit 3f99ebb

File tree

11 files changed

+326
-21
lines changed

11 files changed

+326
-21
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ deno_task_shell = "=0.29.0"
9999
deno_terminal = "=0.2.3"
100100
deno_unsync = { version = "0.4.4", default-features = false }
101101
deno_whoami = "0.1.0"
102-
v8 = { version = "146.3.0", default-features = false }
102+
v8 = { version = "146.4.0", default-features = false }
103103

104104
denokv_proto = "0.13.0"
105105
denokv_remote = "0.13.0"

ext/node/ops/tls.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,8 @@ struct JSDuplexResource {
336336
readable: Arc<Mutex<tokio::sync::mpsc::Receiver<Bytes>>>,
337337
writable: tokio::sync::mpsc::Sender<Bytes>,
338338
read_buffer: Arc<Mutex<VecDeque<Bytes>>>,
339+
closed: AtomicBool,
340+
close_notify: tokio::sync::Notify,
339341
}
340342

341343
impl JSDuplexResource {
@@ -347,6 +349,8 @@ impl JSDuplexResource {
347349
readable: Arc::new(Mutex::new(readable)),
348350
writable,
349351
read_buffer: Arc::new(Mutex::new(VecDeque::new())),
352+
closed: AtomicBool::new(false),
353+
close_notify: tokio::sync::Notify::new(),
350354
}
351355
}
352356

@@ -355,6 +359,10 @@ impl JSDuplexResource {
355359
self: Rc<Self>,
356360
data: &mut [u8],
357361
) -> Result<usize, std::io::Error> {
362+
if self.closed.load(Ordering::Relaxed) {
363+
return Ok(0);
364+
}
365+
358366
// First check if we have buffered data from previous partial read
359367
if let Ok(mut buffer) = self.read_buffer.lock()
360368
&& let Some(buffered_data) = buffer.pop_front()
@@ -370,13 +378,19 @@ impl JSDuplexResource {
370378
return Ok(len);
371379
}
372380

373-
// No buffered data, receive new data from channel
381+
// No buffered data, receive new data from channel.
382+
// We use select! so that close() can wake us up via close_notify
383+
// even though we hold the readable mutex across the await (the
384+
// close() method uses try_lock to avoid deadlock).
374385
let bytes = {
375386
let mut receiver = self
376387
.readable
377388
.lock()
378389
.map_err(|_| Error::other("Failed to acquire lock"))?;
379-
receiver.recv().await
390+
tokio::select! {
391+
result = receiver.recv() => result,
392+
_ = self.close_notify.notified() => None,
393+
}
380394
};
381395

382396
match bytes {
@@ -394,7 +408,7 @@ impl JSDuplexResource {
394408
Ok(len)
395409
}
396410
None => {
397-
// Channel closed
411+
// Channel closed or resource closing
398412
Ok(0)
399413
}
400414
}
@@ -423,6 +437,28 @@ impl Resource for JSDuplexResource {
423437
fn name(&self) -> Cow<'_, str> {
424438
"JSDuplexResource".into()
425439
}
440+
441+
fn close(self: Rc<Self>) {
442+
// Signal that this resource is closing. The read() method checks
443+
// this flag and the close_notify to break out of pending recv().
444+
//
445+
// Without this cleanup, a circular Rc dependency between
446+
// JSDuplexResource and JSStreamTlsResource prevents either from
447+
// being dropped, keeping the event loop alive indefinitely.
448+
self.closed.store(true, Ordering::Relaxed);
449+
450+
// Wake up any pending read via Notify. We use notify_one() which
451+
// stores a permit if no one is currently waiting, so the next
452+
// notified().await will complete immediately.
453+
self.close_notify.notify_one();
454+
455+
// Also try to close the receiver directly. We use try_lock()
456+
// because read() holds the mutex across an await point; using
457+
// lock() here would deadlock.
458+
if let Ok(mut rx) = self.readable.try_lock() {
459+
rx.close();
460+
}
461+
}
426462
}
427463

428464
#[derive(FromV8)]

ext/node/polyfills/_tls_wrap.js

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,8 @@ export class TLSSocket extends net.Socket {
303303

304304
class JSStreamSocket {
305305
#rid;
306+
#channelRid;
307+
#closed = false;
306308

307309
constructor(stream) {
308310
this.stream = stream;
@@ -311,7 +313,8 @@ class JSStreamSocket {
311313
init(options) {
312314
op_node_tls_start(options, tlsStreamRids);
313315
this.#rid = tlsStreamRids[0];
314-
const channelRid = tlsStreamRids[1];
316+
this.#channelRid = tlsStreamRids[1];
317+
const channelRid = this.#channelRid;
315318

316319
this.stream.on("data", (data) => {
317320
core.write(channelRid, data);
@@ -321,7 +324,9 @@ class JSStreamSocket {
321324
(async () => {
322325
while (true) {
323326
try {
324-
const nread = await core.read(channelRid, buf);
327+
const readPromise = core.read(channelRid, buf);
328+
core.unrefOpPromise(readPromise);
329+
const nread = await readPromise;
325330
this.stream.write(buf.slice(0, nread));
326331
} catch {
327332
break;
@@ -330,17 +335,41 @@ class JSStreamSocket {
330335
})();
331336

332337
this.stream.on("close", () => {
333-
core.close(this.#rid);
334-
core.close(channelRid);
338+
this.close();
335339
});
336340
}
337341

342+
// Called by stream_wrap's _onClose() via kStreamBaseField.close(),
343+
// or by event listeners when the transport/DuplexPair is destroyed.
344+
close() {
345+
if (this.#closed) return;
346+
this.#closed = true;
347+
if (this.#rid !== undefined) {
348+
try {
349+
core.close(this.#rid);
350+
} catch {
351+
// already closed
352+
}
353+
this.#rid = undefined;
354+
}
355+
if (this.#channelRid !== undefined) {
356+
try {
357+
core.close(this.#channelRid);
358+
} catch {
359+
// already closed
360+
}
361+
this.#channelRid = undefined;
362+
}
363+
}
364+
338365
handshake() {
339366
return op_node_tls_handshake(this.#rid);
340367
}
341368

342369
read(buf) {
343-
return core.read(this.#rid, buf);
370+
const promise = core.read(this.#rid, buf);
371+
core.unrefOpPromise(promise);
372+
return promise;
344373
}
345374

346375
write(data) {

ext/node/polyfills/internal_binding/stream_wrap.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,8 @@ export class LibuvStreamWrap extends HandleWrap {
386386

387387
if (
388388
ObjectPrototypeIsPrototypeOf(Deno.errors.Interrupted.prototype, e) ||
389-
ObjectPrototypeIsPrototypeOf(Deno.errors.BadResource.prototype, e)
389+
ObjectPrototypeIsPrototypeOf(Deno.errors.BadResource.prototype, e) ||
390+
ObjectPrototypeIsPrototypeOf(Deno.errors.UnexpectedEof.prototype, e)
390391
) {
391392
nread = MapPrototypeGet(codeMap, "EOF")!;
392393
} else if (

libs/core/runtime/jsruntime.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ impl InnerIsolateState {
180180
self.main_realm.0.context_state.pending_ops.shutdown();
181181
let inspector = self.state.inspector.take();
182182

183+
// Unregister isolate waker before dropping the isolate
184+
let isolate_ptr = unsafe { self.v8_isolate.as_raw_isolate_ptr() };
185+
setup::unregister_isolate_waker(setup::isolate_ptr_to_key(isolate_ptr));
186+
183187
let state_ptr = self.v8_isolate.get_data(STATE_DATA_OFFSET);
184188
// SAFETY: We are sure that it's a valid pointer for whole lifetime of
185189
// the runtime.
@@ -450,6 +454,8 @@ pub struct JsRuntimeState {
450454
pub(crate) function_templates: Rc<RefCell<FunctionTemplateData>>,
451455
pub(crate) callsite_prototype: RefCell<Option<v8::Global<v8::Object>>>,
452456
waker: Arc<AtomicWaker>,
457+
/// Guards the 100ms safety-net timer so at most one is active at a time.
458+
safety_net_active: Arc<std::sync::atomic::AtomicBool>,
453459
/// Accessed through [`JsRuntimeState::with_inspector`].
454460
inspector: RefCell<Option<Rc<JsRuntimeInspector>>>,
455461
has_inspector: Cell<bool>,
@@ -776,7 +782,8 @@ impl JsRuntime {
776782
eval_context_code_cache_ready_cb: RefCell::new(
777783
eval_context_set_code_cache_cb,
778784
),
779-
waker,
785+
waker: waker.clone(),
786+
safety_net_active: Arc::new(std::sync::atomic::AtomicBool::new(false)),
780787
// Some fields are initialized later after isolate is created
781788
inspector: None.into(),
782789
has_inspector: false.into(),
@@ -852,6 +859,14 @@ impl JsRuntime {
852859
);
853860

854861
let isolate_ptr = unsafe { isolate.as_raw_isolate_ptr() };
862+
863+
// Register this isolate's waker so the custom platform can wake
864+
// the event loop when V8 posts foreground tasks from background threads.
865+
setup::register_isolate_waker(
866+
setup::isolate_ptr_to_key(isolate_ptr),
867+
waker.clone(),
868+
);
869+
855870
// ...isolate is fully set up, we can forward its pointer to the ops to finish
856871
// their' setup...
857872
for op_ctx in op_ctxs.iter_mut() {
@@ -2268,6 +2283,27 @@ impl JsRuntime {
22682283
scope.perform_microtask_checkpoint();
22692284
}
22702285

2286+
// Safety net: if V8 has pending background tasks (e.g. module compilation),
2287+
// schedule a delayed wake to pump the message loop in case the platform
2288+
// callback was missed due to a race condition. Uses an OS thread (not
2289+
// tokio) to avoid depending on the async runtime being cooperative.
2290+
// The AtomicBool guard ensures at most one safety-net timer is active.
2291+
if pending_state.has_pending_background_tasks
2292+
&& !self
2293+
.inner
2294+
.state
2295+
.safety_net_active
2296+
.swap(true, std::sync::atomic::Ordering::Relaxed)
2297+
{
2298+
let waker = cx.waker().clone();
2299+
let flag = self.inner.state.safety_net_active.clone();
2300+
std::thread::spawn(move || {
2301+
std::thread::sleep(std::time::Duration::from_millis(100));
2302+
flag.store(false, std::sync::atomic::Ordering::Relaxed);
2303+
waker.wake_by_ref();
2304+
});
2305+
}
2306+
22712307
// Re-wake logic for next iteration
22722308
#[allow(clippy::suspicious_else_formatting, clippy::if_same_then_else)]
22732309
{

0 commit comments

Comments
 (0)