diff --git a/cli/lib/worker.rs b/cli/lib/worker.rs index 887b2f4eb5c2ea..7e26898cccfee3 100644 --- a/cli/lib/worker.rs +++ b/cli/lib/worker.rs @@ -369,6 +369,54 @@ impl LibWorkerFactorySharedState { bundle_provider: shared.bundle_provider.clone(), }; let maybe_initial_cwd = shared.options.maybe_initial_cwd.clone(); + // Apply resource limits to v8::CreateParams if specified. + // Uses individual V8 ResourceConstraints setters to match Node.js + // behavior (node_worker.cc UpdateResourceConstraints). + let (create_params, resolved_limits) = if let Some(ref limits) = + args.resource_limits + { + let mut params = + create_isolate_create_params(&shared.sys).unwrap_or_default(); + + if let Some(max_old) = + limits.max_old_generation_size_mb.filter(|&v| v > 0) + { + params = + params.set_max_old_generation_size_in_bytes(max_old * 1024 * 1024); + } + if let Some(max_young) = + limits.max_young_generation_size_mb.filter(|&v| v > 0) + { + params = params + .set_max_young_generation_size_in_bytes(max_young * 1024 * 1024); + } + if let Some(code_range) = limits.code_range_size_mb.filter(|&v| v > 0) { + params = + params.set_code_range_size_in_bytes(code_range * 1024 * 1024); + } + + let mb = 1024 * 1024; + // Read back resolved values (including V8 defaults for + // unspecified fields), matching Node.js behavior. + // Note: integer division truncates sub-MB fractions, which is fine + // since V8 and Node.js both work in whole-MB granularity here. + let resolved = deno_node::ops::worker_threads::ResolvedResourceLimits { + max_young_generation_size_mb: params + .max_young_generation_size_in_bytes() + / mb, + max_old_generation_size_mb: params.max_old_generation_size_in_bytes() + / mb, + code_range_size_mb: params.code_range_size_in_bytes() / mb, + stack_size_mb: limits + .stack_size_mb + .unwrap_or(deno_node::ops::worker_threads::DEFAULT_STACK_SIZE_MB), + }; + + (Some(params), Some(resolved)) + } else { + (create_isolate_create_params(&shared.sys), None) + }; + let options = WebWorkerOptions { name: args.name, main_module: args.main_module.clone(), @@ -403,7 +451,7 @@ impl LibWorkerFactorySharedState { }, extensions: vec![], startup_snapshot: shared.options.startup_snapshot, - create_params: create_isolate_create_params(&shared.sys), + create_params, unsafely_ignore_certificate_errors: shared .options .unsafely_ignore_certificate_errors @@ -424,7 +472,35 @@ impl LibWorkerFactorySharedState { enable_stack_trace_arg_in_ops: has_trace_permissions_enabled(), }; - WebWorker::bootstrap_from_options(services, options) + let has_resource_limits = args.resource_limits.is_some(); + let (mut worker, handle, bootstrap_options) = + WebWorker::from_options(services, options); + + // Store resolved resource limits in the worker's op state BEFORE + // bootstrapping, so the worker_threads polyfill can read them + // via op during init. + if let Some(resolved) = resolved_limits { + worker.js_runtime.op_state().borrow_mut().put(resolved); + } + + worker.bootstrap(&bootstrap_options); + + // When resource limits are set, install a near-heap-limit callback + // that terminates the worker's isolate gracefully instead of + // crashing the entire process with a V8 fatal OOM. + if has_resource_limits { + let ts_handle = worker.js_runtime.v8_isolate().thread_safe_handle(); + let oom_flag = worker.oom_triggered.clone(); + worker.js_runtime.add_near_heap_limit_callback( + move |current_limit, _initial_limit| { + oom_flag.store(true, std::sync::atomic::Ordering::SeqCst); + ts_handle.terminate_execution(); + current_limit * 2 + }, + ); + } + + (worker, handle) }) } } diff --git a/ext/node/lib.rs b/ext/node/lib.rs index 3715b004a13680..7c350b175b277b 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -309,6 +309,7 @@ deno_core::extension!(deno_node, ops::util::op_node_in_npm_package, ops::util::op_node_parse_env, ops::worker_threads::op_worker_threads_filename, + ops::worker_threads::op_worker_get_resource_limits, ops::ipc::op_node_child_ipc_pipe, ops::ipc::op_node_ipc_write_json, ops::ipc::op_node_ipc_read_json, diff --git a/ext/node/ops/worker_threads.rs b/ext/node/ops/worker_threads.rs index f4c0438e7ef4cf..b4ee904253464f 100644 --- a/ext/node/ops/worker_threads.rs +++ b/ext/node/ops/worker_threads.rs @@ -13,6 +13,20 @@ use deno_permissions::PermissionsContainer; use crate::ExtNodeSys; use crate::NodeRequireLoaderRc; +/// Default thread stack size in MB, matching Node.js default. +pub const DEFAULT_STACK_SIZE_MB: usize = 4; + +/// Resolved resource limits with V8 defaults filled in for unspecified values. +/// Stored in the worker's OpState so the JS polyfill can read actual values. +#[derive(deno_core::serde::Serialize, Clone, Debug, Default, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct ResolvedResourceLimits { + pub max_young_generation_size_mb: usize, + pub max_old_generation_size_mb: usize, + pub code_range_size_mb: usize, + pub stack_size_mb: usize, +} + #[must_use = "the resolved return value to mitigate time-of-check to time-of-use issues"] fn ensure_read_permission<'a>( state: &mut OpState, @@ -100,3 +114,14 @@ pub fn op_worker_threads_filename( .map_err(WorkerThreadsFilenameError::Permission)?; Ok(Some(url.into())) } + +/// Returns the resolved resource limits for this worker, or None if +/// no resource limits were configured. Called from worker_threads +/// polyfill during init to get actual V8 values (with defaults filled in). +#[op2] +#[serde] +pub fn op_worker_get_resource_limits( + state: &mut OpState, +) -> Option { + state.try_borrow::().cloned() +} diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index f4426ee08c5baf..fa6353b48d538d 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -10,6 +10,7 @@ import { op_host_recv_message, op_host_terminate_worker, op_message_port_recv_message_sync, + op_worker_get_resource_limits, op_worker_threads_filename, } from "ext:core/ops"; import { @@ -198,14 +199,7 @@ class NodeWorker extends EventEmitter { // https://nodejs.org/api/worker_threads.html#workerthreadid threadId = this.#id; // https://nodejs.org/api/worker_threads.html#workerresourcelimits - resourceLimits: Required< - NonNullable - > = { - maxYoungGenerationSizeMb: -1, - maxOldGenerationSizeMb: -1, - codeRangeSizeMb: -1, - stackSizeMb: 4, - }; + resourceLimits: WorkerOptions["resourceLimits"] = {}; // https://nodejs.org/api/worker_threads.html#workerstdin stdin: Writable | null = null; // https://nodejs.org/api/worker_threads.html#workerstdout @@ -350,6 +344,8 @@ class NodeWorker extends EventEmitter { } } + const resourceLimits_ = options?.resourceLimits ?? undefined; + const serializedWorkerMetadata = serializeJsMessageData({ workerData: options?.workerData, environmentData: environmentData, @@ -360,6 +356,7 @@ class NodeWorker extends EventEmitter { isEval: !!options?.eval, isWorkerThread: true, hasStdin: !!options?.stdin, + resourceLimits: resourceLimits_, }, options?.transferList ?? []); if (options?.eval) { @@ -406,12 +403,17 @@ class NodeWorker extends EventEmitter { name: this.#name, workerType: "node", closeOnIdle: true, + resourceLimits: resourceLimits_, }, serializedWorkerMetadata, ); this.#id = id; this.threadId = id; + if (resourceLimits_) { + this.resourceLimits = { ...resourceLimits_ }; + } + if (options?.stdin) { // deno-lint-ignore no-this-alias const worker = this; @@ -499,15 +501,25 @@ class NodeWorker extends EventEmitter { this.#status = "CLOSED"; this.#closeStdio(); if (this.listenerCount("error") > 0) { - const err = new Error(data.errorMessage ?? data.message); - if (data.name) { - err.name = data.name; + const errMsg = data.errorMessage ?? data.message; + const errName = data.name; + let err; + if (errName === "ERR_WORKER_OUT_OF_MEMORY") { + err = new Error(errMsg); + err.code = errName; + err.name = "Error"; + } else { + err = new Error(errMsg); + if (errName) { + err.name = errName; + } } // Stack is unavailable from the worker context (e.g. prepareStackTrace // may have thrown). Match Node.js behavior of setting stack to undefined. err.stack = undefined; this.emit("error", err); } + this.resourceLimits = {}; if (!this.#exited) { this.#exited = true; this.emit("exit", data.exitCode ?? 1); @@ -522,6 +534,7 @@ class NodeWorker extends EventEmitter { debugWT(`Host got "close" message from worker: ${this.#name}`); this.#status = "CLOSED"; this.#closeStdio(); + this.resourceLimits = {}; if (!this.#exited) { this.#exited = true; this.emit("exit", data ?? 0); @@ -747,14 +760,11 @@ internals.__initWorkerThreads = ( internals.__isWorkerThread = !runningOnMainThread; defaultExport.isMainThread = isMainThread; - // fake resourceLimits - resourceLimits = isMainThread ? {} : { - maxYoungGenerationSizeMb: 48, - maxOldGenerationSizeMb: 2048, - codeRangeSizeMb: 0, - stackSizeMb: 4, - }; - defaultExport.resourceLimits = resourceLimits; + + if (isMainThread) { + resourceLimits = {}; + defaultExport.resourceLimits = resourceLimits; + } if (!isMainThread) { // TODO(bartlomieju): this is a really hacky way to provide @@ -789,6 +799,16 @@ internals.__initWorkerThreads = ( process.env = env; } + // Get resolved resource limits from the Rust side (includes V8 + // defaults for unspecified fields), matching Node.js behavior. + const resolvedLimits = op_worker_get_resource_limits(); + if (resolvedLimits) { + resourceLimits = resolvedLimits; + } else { + resourceLimits = {}; + } + defaultExport.resourceLimits = resourceLimits; + // Set process.argv for worker threads. // In Node.js, worker process.argv is [execPath, scriptPath, ...argv]. if (isWorkerThread) { diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index 3f35c8bc1ea648..45577ee834b406 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -34,6 +34,16 @@ use crate::worker::FormatJsErrorFn; pub const UNSTABLE_FEATURE_NAME: &str = "worker-options"; +/// V8 resource limits for worker isolates, matching Node.js `resourceLimits`. +#[derive(Deserialize, Default, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ResourceLimits { + pub max_young_generation_size_mb: Option, + pub max_old_generation_size_mb: Option, + pub code_range_size_mb: Option, + pub stack_size_mb: Option, +} + pub struct CreateWebWorkerArgs { pub name: String, pub worker_id: WorkerId, @@ -43,6 +53,7 @@ pub struct CreateWebWorkerArgs { pub worker_type: WorkerThreadType, pub close_on_idle: bool, pub maybe_worker_metadata: Option, + pub resource_limits: Option, } pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle) @@ -123,6 +134,7 @@ pub struct CreateWorkerArgs { specifier: String, worker_type: WorkerThreadType, close_on_idle: bool, + resource_limits: Option, } #[derive(Debug, thiserror::Error, deno_error::JsError)] @@ -194,8 +206,15 @@ fn op_create_worker( let (handle_sender, handle_receiver) = std::sync::mpsc::sync_channel::(1); - // Setup new thread - let thread_builder = std::thread::Builder::new().name(format!("{worker_id}")); + // Setup new thread. If stackSizeMb is specified in resourceLimits, + // set the OS thread stack size to match Node.js behavior. + let mut thread_builder = + std::thread::Builder::new().name(format!("{worker_id}")); + if let Some(ref limits) = args.resource_limits + && let Some(stack_mb) = limits.stack_size_mb.filter(|&v| v > 0) + { + thread_builder = thread_builder.stack_size(stack_mb * 1024 * 1024); + } let maybe_worker_metadata = if let Some(data) = maybe_worker_metadata { let transferables = deserialize_js_transferables(state, data.transferables)?; @@ -230,6 +249,7 @@ fn op_create_worker( worker_type, close_on_idle: args.close_on_idle, maybe_worker_metadata, + resource_limits: args.resource_limits, }); // Send thread safe handle from newly created worker to host thread diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index f08f708f2e8c7f..29379b6bf75a33 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -167,6 +167,14 @@ impl Serialize for WorkerControlEvent { "exitCode": exit_code, }) } + CoreErrorKind::JsBox(js_error_box) => { + let class = js_error_box.get_class(); + json!({ + "message": js_error_box.to_string(), + "name": class, + "exitCode": exit_code, + }) + } _ => json!({ "message": error.to_string(), "exitCode": exit_code, @@ -442,6 +450,9 @@ pub struct WebWorker { memory_trim_handle: Option>, maybe_coverage_dir: Option, bootstrap_error: Option, + /// Set to `true` by the near-heap-limit callback when resource limits + /// are exceeded, so the error handler can emit `ERR_WORKER_OUT_OF_MEMORY`. + pub oom_triggered: Arc, } impl Drop for WebWorker { @@ -474,7 +485,7 @@ impl WebWorker { (worker, handle) } - fn from_options< + pub fn from_options< TInNpmPackageChecker: InNpmPackageChecker + 'static, TNpmPackageFolderResolver: NpmPackageFolderResolver + 'static, TExtNodeSys: ExtNodeSys + 'static, @@ -755,6 +766,7 @@ impl WebWorker { memory_trim_handle: None, maybe_coverage_dir: options.maybe_coverage_dir, bootstrap_error: None, + oom_triggered: Arc::new(AtomicBool::new(false)), }, external_handle, options.bootstrap, @@ -1152,6 +1164,24 @@ pub async fn run_web_worker( }; if let Err(e) = result { + // For Node workers with OOM, skip script execution (V8 is terminated) + // and use a dedicated error type with exit code 1. + if internal_handle.worker_type == WorkerThreadType::Node + && worker.oom_triggered.load(Ordering::SeqCst) + { + let oom_error: CoreError = CoreErrorKind::JsBox( + deno_error::JsErrorBox::new( + "ERR_WORKER_OUT_OF_MEMORY", + "Worker terminated due to reaching memory limit: JS heap out of memory", + ), + ) + .into_box(); + internal_handle + .post_event(WorkerControlEvent::TerminalError(oom_error, 1)) + .expect("Failed to post message to host"); + return Ok(()); + } + print_worker_error(&e, &name, format_js_error_fn.as_deref()); // For Node workers, dispatch process 'exit' event before sending diff --git a/tests/node_compat/config.jsonc b/tests/node_compat/config.jsonc index 7660446629ff37..40955f432ba91c 100644 --- a/tests/node_compat/config.jsonc +++ b/tests/node_compat/config.jsonc @@ -1623,6 +1623,12 @@ "parallel/test-worker-ref-onexit.js": {}, "parallel/test-worker-relative-path-double-dot.js": {}, "parallel/test-worker-relative-path.js": {}, + "parallel/test-worker-resource-limits.js": { + "darwin": false, + "linux": false, + "windows": false, + "reason": "Depends on v8.getHeapSpaceStatistics() which is not yet implemented in Deno" + }, "parallel/test-worker-stdio-flush-inflight.js": {}, "parallel/test-worker-stdio-flush.js": {}, "parallel/test-worker-stdio.js": {}, diff --git a/tests/specs/node/worker_threads_resource_limits/__test__.jsonc b/tests/specs/node/worker_threads_resource_limits/__test__.jsonc new file mode 100644 index 00000000000000..b37779fd0df754 --- /dev/null +++ b/tests/specs/node/worker_threads_resource_limits/__test__.jsonc @@ -0,0 +1,17 @@ +{ + "tests": { + "resource_limits_passed": { + "args": "run --allow-read main.mjs", + "output": "main.out" + }, + "resource_limits_default": { + "args": "run --allow-read default_main.mjs", + "output": "default_main.out" + }, + "resource_limits_oom": { + "args": "run --allow-read oom_main.mjs", + "output": "oom_main.out", + "exitCode": 0 + } + } +} diff --git a/tests/specs/node/worker_threads_resource_limits/default_main.mjs b/tests/specs/node/worker_threads_resource_limits/default_main.mjs new file mode 100644 index 00000000000000..95605099f7c571 --- /dev/null +++ b/tests/specs/node/worker_threads_resource_limits/default_main.mjs @@ -0,0 +1,12 @@ +import { Worker } from "node:worker_threads"; + +// No resourceLimits specified - should get defaults +const worker = new Worker(new URL("./worker.mjs", import.meta.url)); + +worker.on("message", (msg) => { + console.log("resourceLimits:", JSON.stringify(msg)); +}); + +worker.on("exit", (code) => { + console.log("exit:", code); +}); diff --git a/tests/specs/node/worker_threads_resource_limits/default_main.out b/tests/specs/node/worker_threads_resource_limits/default_main.out new file mode 100644 index 00000000000000..6c024dd0dda531 --- /dev/null +++ b/tests/specs/node/worker_threads_resource_limits/default_main.out @@ -0,0 +1,2 @@ +resourceLimits: {} +exit: 0 diff --git a/tests/specs/node/worker_threads_resource_limits/main.mjs b/tests/specs/node/worker_threads_resource_limits/main.mjs new file mode 100644 index 00000000000000..bfd639963739a1 --- /dev/null +++ b/tests/specs/node/worker_threads_resource_limits/main.mjs @@ -0,0 +1,22 @@ +import { Worker } from "node:worker_threads"; + +const worker = new Worker(new URL("./worker.mjs", import.meta.url), { + resourceLimits: { + maxOldGenerationSizeMb: 128, + maxYoungGenerationSizeMb: 16, + codeRangeSizeMb: 64, + stackSizeMb: 4, + }, +}); + +worker.on("message", (msg) => { + console.log("resourceLimits:", JSON.stringify(msg)); +}); + +worker.on("error", (err) => { + console.error("error:", err.message); +}); + +worker.on("exit", (code) => { + console.log("exit:", code); +}); diff --git a/tests/specs/node/worker_threads_resource_limits/main.out b/tests/specs/node/worker_threads_resource_limits/main.out new file mode 100644 index 00000000000000..a6a99341bcafa6 --- /dev/null +++ b/tests/specs/node/worker_threads_resource_limits/main.out @@ -0,0 +1,2 @@ +resourceLimits: {"maxYoungGenerationSizeMb":16,"maxOldGenerationSizeMb":128,"codeRangeSizeMb":64,"stackSizeMb":4} +exit: 0 diff --git a/tests/specs/node/worker_threads_resource_limits/oom_main.mjs b/tests/specs/node/worker_threads_resource_limits/oom_main.mjs new file mode 100644 index 00000000000000..1644f5ba4da67b --- /dev/null +++ b/tests/specs/node/worker_threads_resource_limits/oom_main.mjs @@ -0,0 +1,16 @@ +import { Worker } from "node:worker_threads"; + +const worker = new Worker(new URL("./oom_worker.mjs", import.meta.url), { + resourceLimits: { + maxOldGenerationSizeMb: 64, + }, +}); + +worker.on("error", (err) => { + console.log("error code:", err.code); + console.log("error message:", err.message); +}); + +worker.on("exit", (code) => { + console.log("exit:", code); +}); diff --git a/tests/specs/node/worker_threads_resource_limits/oom_main.out b/tests/specs/node/worker_threads_resource_limits/oom_main.out new file mode 100644 index 00000000000000..fcade8bccdb64b --- /dev/null +++ b/tests/specs/node/worker_threads_resource_limits/oom_main.out @@ -0,0 +1,3 @@ +error code: ERR_WORKER_OUT_OF_MEMORY +error message: Worker terminated due to reaching memory limit: JS heap out of memory +exit: 1 diff --git a/tests/specs/node/worker_threads_resource_limits/oom_worker.mjs b/tests/specs/node/worker_threads_resource_limits/oom_worker.mjs new file mode 100644 index 00000000000000..1312c1fb71a8a2 --- /dev/null +++ b/tests/specs/node/worker_threads_resource_limits/oom_worker.mjs @@ -0,0 +1,5 @@ +// Allocate memory until the heap limit is reached. +const arrays = []; +while (true) { + arrays.push(new Array(1024 * 1024).fill(0)); +} diff --git a/tests/specs/node/worker_threads_resource_limits/worker.mjs b/tests/specs/node/worker_threads_resource_limits/worker.mjs new file mode 100644 index 00000000000000..010e3b4d38c27d --- /dev/null +++ b/tests/specs/node/worker_threads_resource_limits/worker.mjs @@ -0,0 +1,3 @@ +import { parentPort, resourceLimits } from "node:worker_threads"; + +parentPort.postMessage(resourceLimits);