Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 78 additions & 2 deletions cli/lib/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,54 @@ impl<TSys: DenoLibSys> LibWorkerFactorySharedState<TSys> {
bundle_provider: shared.bundle_provider.clone(),
};
let maybe_initial_cwd = shared.options.maybe_initial_cwd.clone();
// Apply resource limits to v8::CreateParams if specified.
// Uses individual V8 ResourceConstraints setters to match Node.js
// behavior (node_worker.cc UpdateResourceConstraints).
let (create_params, resolved_limits) = if let Some(ref limits) =
args.resource_limits
{
let mut params =
create_isolate_create_params(&shared.sys).unwrap_or_default();

if let Some(max_old) =
limits.max_old_generation_size_mb.filter(|&v| v > 0)
{
params =
params.set_max_old_generation_size_in_bytes(max_old * 1024 * 1024);
}
if let Some(max_young) =
limits.max_young_generation_size_mb.filter(|&v| v > 0)
{
params = params
.set_max_young_generation_size_in_bytes(max_young * 1024 * 1024);
}
if let Some(code_range) = limits.code_range_size_mb.filter(|&v| v > 0) {
params =
params.set_code_range_size_in_bytes(code_range * 1024 * 1024);
}

let mb = 1024 * 1024;
// Read back resolved values (including V8 defaults for
// unspecified fields), matching Node.js behavior.
// Note: integer division truncates sub-MB fractions, which is fine
// since V8 and Node.js both work in whole-MB granularity here.
let resolved = deno_node::ops::worker_threads::ResolvedResourceLimits {
max_young_generation_size_mb: params
.max_young_generation_size_in_bytes()
/ mb,
max_old_generation_size_mb: params.max_old_generation_size_in_bytes()
/ mb,
code_range_size_mb: params.code_range_size_in_bytes() / mb,
stack_size_mb: limits
.stack_size_mb
.unwrap_or(deno_node::ops::worker_threads::DEFAULT_STACK_SIZE_MB),
};

(Some(params), Some(resolved))
} else {
(create_isolate_create_params(&shared.sys), None)
};

let options = WebWorkerOptions {
name: args.name,
main_module: args.main_module.clone(),
Expand Down Expand Up @@ -403,7 +451,7 @@ impl<TSys: DenoLibSys> LibWorkerFactorySharedState<TSys> {
},
extensions: vec![],
startup_snapshot: shared.options.startup_snapshot,
create_params: create_isolate_create_params(&shared.sys),
create_params,
unsafely_ignore_certificate_errors: shared
.options
.unsafely_ignore_certificate_errors
Expand All @@ -424,7 +472,35 @@ impl<TSys: DenoLibSys> LibWorkerFactorySharedState<TSys> {
enable_stack_trace_arg_in_ops: has_trace_permissions_enabled(),
};

WebWorker::bootstrap_from_options(services, options)
let has_resource_limits = args.resource_limits.is_some();
let (mut worker, handle, bootstrap_options) =
WebWorker::from_options(services, options);

// Store resolved resource limits in the worker's op state BEFORE
// bootstrapping, so the worker_threads polyfill can read them
// via op during init.
if let Some(resolved) = resolved_limits {
worker.js_runtime.op_state().borrow_mut().put(resolved);
}

worker.bootstrap(&bootstrap_options);

// When resource limits are set, install a near-heap-limit callback
// that terminates the worker's isolate gracefully instead of
// crashing the entire process with a V8 fatal OOM.
if has_resource_limits {
let ts_handle = worker.js_runtime.v8_isolate().thread_safe_handle();
let oom_flag = worker.oom_triggered.clone();
worker.js_runtime.add_near_heap_limit_callback(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw this callback runs when v8 performs a last-resort gc. that gc might reclaim enough space that js is able to continue running normally.

move |current_limit, _initial_limit| {
oom_flag.store(true, std::sync::atomic::Ordering::SeqCst);
ts_handle.terminate_execution();
current_limit * 2
},
);
}

(worker, handle)
})
}
}
Expand Down
1 change: 1 addition & 0 deletions ext/node/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ deno_core::extension!(deno_node,
ops::util::op_node_in_npm_package<TInNpmPackageChecker, TNpmPackageFolderResolver, TSys>,
ops::util::op_node_parse_env,
ops::worker_threads::op_worker_threads_filename<TSys>,
ops::worker_threads::op_worker_get_resource_limits,
ops::ipc::op_node_child_ipc_pipe,
ops::ipc::op_node_ipc_write_json,
ops::ipc::op_node_ipc_read_json,
Expand Down
25 changes: 25 additions & 0 deletions ext/node/ops/worker_threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ use deno_permissions::PermissionsContainer;
use crate::ExtNodeSys;
use crate::NodeRequireLoaderRc;

/// Default thread stack size in MB, matching Node.js default.
pub const DEFAULT_STACK_SIZE_MB: usize = 4;

/// Resolved resource limits with V8 defaults filled in for unspecified values.
/// Stored in the worker's OpState so the JS polyfill can read actual values.
#[derive(deno_core::serde::Serialize, Clone, Debug, Default, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct ResolvedResourceLimits {
pub max_young_generation_size_mb: usize,
pub max_old_generation_size_mb: usize,
pub code_range_size_mb: usize,
pub stack_size_mb: usize,
}

#[must_use = "the resolved return value to mitigate time-of-check to time-of-use issues"]
fn ensure_read_permission<'a>(
state: &mut OpState,
Expand Down Expand Up @@ -100,3 +114,14 @@ pub fn op_worker_threads_filename<TSys: ExtNodeSys + 'static>(
.map_err(WorkerThreadsFilenameError::Permission)?;
Ok(Some(url.into()))
}

/// Returns the resolved resource limits for this worker, or None if
/// no resource limits were configured. Called from worker_threads
/// polyfill during init to get actual V8 values (with defaults filled in).
#[op2]
#[serde]
pub fn op_worker_get_resource_limits(
state: &mut OpState,
) -> Option<ResolvedResourceLimits> {
state.try_borrow::<ResolvedResourceLimits>().cloned()
}
58 changes: 39 additions & 19 deletions ext/node/polyfills/worker_threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
op_host_recv_message,
op_host_terminate_worker,
op_message_port_recv_message_sync,
op_worker_get_resource_limits,
op_worker_threads_filename,
} from "ext:core/ops";
import {
Expand Down Expand Up @@ -198,14 +199,7 @@ class NodeWorker extends EventEmitter {
// https://nodejs.org/api/worker_threads.html#workerthreadid
threadId = this.#id;
// https://nodejs.org/api/worker_threads.html#workerresourcelimits
resourceLimits: Required<
NonNullable<WorkerOptions["resourceLimits"]>
> = {
maxYoungGenerationSizeMb: -1,
maxOldGenerationSizeMb: -1,
codeRangeSizeMb: -1,
stackSizeMb: 4,
};
resourceLimits: WorkerOptions["resourceLimits"] = {};
// https://nodejs.org/api/worker_threads.html#workerstdin
stdin: Writable | null = null;
// https://nodejs.org/api/worker_threads.html#workerstdout
Expand Down Expand Up @@ -350,6 +344,8 @@ class NodeWorker extends EventEmitter {
}
}

const resourceLimits_ = options?.resourceLimits ?? undefined;

const serializedWorkerMetadata = serializeJsMessageData({
workerData: options?.workerData,
environmentData: environmentData,
Expand All @@ -360,6 +356,7 @@ class NodeWorker extends EventEmitter {
isEval: !!options?.eval,
isWorkerThread: true,
hasStdin: !!options?.stdin,
resourceLimits: resourceLimits_,
}, options?.transferList ?? []);

if (options?.eval) {
Expand Down Expand Up @@ -406,12 +403,17 @@ class NodeWorker extends EventEmitter {
name: this.#name,
workerType: "node",
closeOnIdle: true,
resourceLimits: resourceLimits_,
},
serializedWorkerMetadata,
);
this.#id = id;
this.threadId = id;

if (resourceLimits_) {
this.resourceLimits = { ...resourceLimits_ };
}

if (options?.stdin) {
// deno-lint-ignore no-this-alias
const worker = this;
Expand Down Expand Up @@ -499,15 +501,25 @@ class NodeWorker extends EventEmitter {
this.#status = "CLOSED";
this.#closeStdio();
if (this.listenerCount("error") > 0) {
const err = new Error(data.errorMessage ?? data.message);
if (data.name) {
err.name = data.name;
const errMsg = data.errorMessage ?? data.message;
const errName = data.name;
let err;
if (errName === "ERR_WORKER_OUT_OF_MEMORY") {
err = new Error(errMsg);
err.code = errName;
err.name = "Error";
} else {
err = new Error(errMsg);
if (errName) {
err.name = errName;
}
}
// Stack is unavailable from the worker context (e.g. prepareStackTrace
// may have thrown). Match Node.js behavior of setting stack to undefined.
err.stack = undefined;
this.emit("error", err);
}
this.resourceLimits = {};
if (!this.#exited) {
this.#exited = true;
this.emit("exit", data.exitCode ?? 1);
Expand All @@ -522,6 +534,7 @@ class NodeWorker extends EventEmitter {
debugWT(`Host got "close" message from worker: ${this.#name}`);
this.#status = "CLOSED";
this.#closeStdio();
this.resourceLimits = {};
if (!this.#exited) {
this.#exited = true;
this.emit("exit", data ?? 0);
Expand Down Expand Up @@ -747,14 +760,11 @@ internals.__initWorkerThreads = (
internals.__isWorkerThread = !runningOnMainThread;

defaultExport.isMainThread = isMainThread;
// fake resourceLimits
resourceLimits = isMainThread ? {} : {
maxYoungGenerationSizeMb: 48,
maxOldGenerationSizeMb: 2048,
codeRangeSizeMb: 0,
stackSizeMb: 4,
};
defaultExport.resourceLimits = resourceLimits;

if (isMainThread) {
resourceLimits = {};
defaultExport.resourceLimits = resourceLimits;
}

if (!isMainThread) {
// TODO(bartlomieju): this is a really hacky way to provide
Expand Down Expand Up @@ -789,6 +799,16 @@ internals.__initWorkerThreads = (
process.env = env;
}

// Get resolved resource limits from the Rust side (includes V8
// defaults for unspecified fields), matching Node.js behavior.
const resolvedLimits = op_worker_get_resource_limits();
if (resolvedLimits) {
resourceLimits = resolvedLimits;
} else {
resourceLimits = {};
}
defaultExport.resourceLimits = resourceLimits;

// Set process.argv for worker threads.
// In Node.js, worker process.argv is [execPath, scriptPath, ...argv].
if (isWorkerThread) {
Expand Down
24 changes: 22 additions & 2 deletions runtime/ops/worker_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ use crate::worker::FormatJsErrorFn;

pub const UNSTABLE_FEATURE_NAME: &str = "worker-options";

/// V8 resource limits for worker isolates, matching Node.js `resourceLimits`.
#[derive(Deserialize, Default, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ResourceLimits {
pub max_young_generation_size_mb: Option<usize>,
pub max_old_generation_size_mb: Option<usize>,
pub code_range_size_mb: Option<usize>,
pub stack_size_mb: Option<usize>,
}

pub struct CreateWebWorkerArgs {
pub name: String,
pub worker_id: WorkerId,
Expand All @@ -43,6 +53,7 @@ pub struct CreateWebWorkerArgs {
pub worker_type: WorkerThreadType,
pub close_on_idle: bool,
pub maybe_worker_metadata: Option<WorkerMetadata>,
pub resource_limits: Option<ResourceLimits>,
}

pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
Expand Down Expand Up @@ -123,6 +134,7 @@ pub struct CreateWorkerArgs {
specifier: String,
worker_type: WorkerThreadType,
close_on_idle: bool,
resource_limits: Option<ResourceLimits>,
}

#[derive(Debug, thiserror::Error, deno_error::JsError)]
Expand Down Expand Up @@ -194,8 +206,15 @@ fn op_create_worker(
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<SendableWebWorkerHandle>(1);

// Setup new thread
let thread_builder = std::thread::Builder::new().name(format!("{worker_id}"));
// Setup new thread. If stackSizeMb is specified in resourceLimits,
// set the OS thread stack size to match Node.js behavior.
let mut thread_builder =
std::thread::Builder::new().name(format!("{worker_id}"));
if let Some(ref limits) = args.resource_limits
&& let Some(stack_mb) = limits.stack_size_mb.filter(|&v| v > 0)
{
thread_builder = thread_builder.stack_size(stack_mb * 1024 * 1024);
}
let maybe_worker_metadata = if let Some(data) = maybe_worker_metadata {
let transferables =
deserialize_js_transferables(state, data.transferables)?;
Expand Down Expand Up @@ -230,6 +249,7 @@ fn op_create_worker(
worker_type,
close_on_idle: args.close_on_idle,
maybe_worker_metadata,
resource_limits: args.resource_limits,
});

// Send thread safe handle from newly created worker to host thread
Expand Down
Loading
Loading