Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ deno_task_shell = "=0.29.0"
deno_terminal = "=0.2.3"
deno_unsync = { version = "0.4.4", default-features = false }
deno_whoami = "0.1.0"
v8 = { version = "146.1.0", default-features = false }
v8 = { version = "146.2.0", default-features = false }

denokv_proto = "0.13.0"
denokv_remote = "0.13.0"
Expand Down
80 changes: 78 additions & 2 deletions cli/lib/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,54 @@ impl<TSys: DenoLibSys> LibWorkerFactorySharedState<TSys> {
bundle_provider: shared.bundle_provider.clone(),
};
let maybe_initial_cwd = shared.options.maybe_initial_cwd.clone();
// Apply resource limits to v8::CreateParams if specified.
// Uses individual V8 ResourceConstraints setters to match Node.js
// behavior (node_worker.cc UpdateResourceConstraints).
let (create_params, resolved_limits) = if let Some(ref limits) =
args.resource_limits
{
let mut params =
create_isolate_create_params(&shared.sys).unwrap_or_default();

if let Some(max_old) =
limits.max_old_generation_size_mb.filter(|&v| v > 0)
{
params =
params.set_max_old_generation_size_in_bytes(max_old * 1024 * 1024);
}
if let Some(max_young) =
limits.max_young_generation_size_mb.filter(|&v| v > 0)
{
params = params
.set_max_young_generation_size_in_bytes(max_young * 1024 * 1024);
}
if let Some(code_range) = limits.code_range_size_mb.filter(|&v| v > 0) {
params =
params.set_code_range_size_in_bytes(code_range * 1024 * 1024);
}

let mb = 1024 * 1024;
// Read back resolved values (including V8 defaults for
// unspecified fields), matching Node.js behavior.
// Note: integer division truncates sub-MB fractions, which is fine
// since V8 and Node.js both work in whole-MB granularity here.
let resolved = deno_node::ops::worker_threads::ResolvedResourceLimits {
max_young_generation_size_mb: params
.max_young_generation_size_in_bytes()
/ mb,
max_old_generation_size_mb: params.max_old_generation_size_in_bytes()
/ mb,
code_range_size_mb: params.code_range_size_in_bytes() / mb,
stack_size_mb: limits
.stack_size_mb
.unwrap_or(deno_node::ops::worker_threads::DEFAULT_STACK_SIZE_MB),
};

(Some(params), Some(resolved))
} else {
(create_isolate_create_params(&shared.sys), None)
};

let options = WebWorkerOptions {
name: args.name,
main_module: args.main_module.clone(),
Expand Down Expand Up @@ -403,7 +451,7 @@ impl<TSys: DenoLibSys> LibWorkerFactorySharedState<TSys> {
},
extensions: vec![],
startup_snapshot: shared.options.startup_snapshot,
create_params: create_isolate_create_params(&shared.sys),
create_params,
unsafely_ignore_certificate_errors: shared
.options
.unsafely_ignore_certificate_errors
Expand All @@ -424,7 +472,35 @@ impl<TSys: DenoLibSys> LibWorkerFactorySharedState<TSys> {
enable_stack_trace_arg_in_ops: has_trace_permissions_enabled(),
};

WebWorker::bootstrap_from_options(services, options)
let has_resource_limits = args.resource_limits.is_some();
let (mut worker, handle, bootstrap_options) =
WebWorker::from_options(services, options);

// Store resolved resource limits in the worker's op state BEFORE
// bootstrapping, so the worker_threads polyfill can read them
// via op during init.
if let Some(resolved) = resolved_limits {
worker.js_runtime.op_state().borrow_mut().put(resolved);
}

worker.bootstrap(&bootstrap_options);

// When resource limits are set, install a near-heap-limit callback
// that terminates the worker's isolate gracefully instead of
// crashing the entire process with a V8 fatal OOM.
if has_resource_limits {
let ts_handle = worker.js_runtime.v8_isolate().thread_safe_handle();
let oom_flag = worker.oom_triggered.clone();
worker.js_runtime.add_near_heap_limit_callback(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw this callback runs when v8 performs a last-resort gc. that gc might reclaim enough space that js is able to continue running normally.

move |current_limit, _initial_limit| {
oom_flag.store(true, std::sync::atomic::Ordering::SeqCst);
ts_handle.terminate_execution();
current_limit * 2
},
);
}

(worker, handle)
})
}
}
Expand Down
1 change: 1 addition & 0 deletions ext/node/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ deno_core::extension!(deno_node,
ops::util::op_node_in_npm_package<TInNpmPackageChecker, TNpmPackageFolderResolver, TSys>,
ops::util::op_node_parse_env,
ops::worker_threads::op_worker_threads_filename<TSys>,
ops::worker_threads::op_worker_get_resource_limits,
ops::ipc::op_node_child_ipc_pipe,
ops::ipc::op_node_ipc_write_json,
ops::ipc::op_node_ipc_read_json,
Expand Down
25 changes: 25 additions & 0 deletions ext/node/ops/worker_threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ use deno_permissions::PermissionsContainer;
use crate::ExtNodeSys;
use crate::NodeRequireLoaderRc;

/// Default thread stack size in MB, matching Node.js default.
pub const DEFAULT_STACK_SIZE_MB: usize = 4;

/// Resolved resource limits with V8 defaults filled in for unspecified values.
/// Stored in the worker's OpState so the JS polyfill can read actual values.
#[derive(deno_core::serde::Serialize, Clone, Debug, Default, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct ResolvedResourceLimits {
pub max_young_generation_size_mb: usize,
pub max_old_generation_size_mb: usize,
pub code_range_size_mb: usize,
pub stack_size_mb: usize,
}

#[must_use = "the resolved return value to mitigate time-of-check to time-of-use issues"]
fn ensure_read_permission<'a>(
state: &mut OpState,
Expand Down Expand Up @@ -100,3 +114,14 @@ pub fn op_worker_threads_filename<TSys: ExtNodeSys + 'static>(
.map_err(WorkerThreadsFilenameError::Permission)?;
Ok(Some(url.into()))
}

/// Returns the resolved resource limits for this worker, or None if
/// no resource limits were configured. Called from worker_threads
/// polyfill during init to get actual V8 values (with defaults filled in).
#[op2]
#[serde]
pub fn op_worker_get_resource_limits(
state: &mut OpState,
) -> Option<ResolvedResourceLimits> {
state.try_borrow::<ResolvedResourceLimits>().cloned()
}
58 changes: 39 additions & 19 deletions ext/node/polyfills/worker_threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
op_host_recv_message,
op_host_terminate_worker,
op_message_port_recv_message_sync,
op_worker_get_resource_limits,
op_worker_threads_filename,
} from "ext:core/ops";
import {
Expand Down Expand Up @@ -198,14 +199,7 @@ class NodeWorker extends EventEmitter {
// https://nodejs.org/api/worker_threads.html#workerthreadid
threadId = this.#id;
// https://nodejs.org/api/worker_threads.html#workerresourcelimits
resourceLimits: Required<
NonNullable<WorkerOptions["resourceLimits"]>
> = {
maxYoungGenerationSizeMb: -1,
maxOldGenerationSizeMb: -1,
codeRangeSizeMb: -1,
stackSizeMb: 4,
};
resourceLimits: WorkerOptions["resourceLimits"] = {};
// https://nodejs.org/api/worker_threads.html#workerstdin
stdin: Writable | null = null;
// https://nodejs.org/api/worker_threads.html#workerstdout
Expand Down Expand Up @@ -350,6 +344,8 @@ class NodeWorker extends EventEmitter {
}
}

const resourceLimits_ = options?.resourceLimits ?? undefined;

const serializedWorkerMetadata = serializeJsMessageData({
workerData: options?.workerData,
environmentData: environmentData,
Expand All @@ -360,6 +356,7 @@ class NodeWorker extends EventEmitter {
isEval: !!options?.eval,
isWorkerThread: true,
hasStdin: !!options?.stdin,
resourceLimits: resourceLimits_,
}, options?.transferList ?? []);

if (options?.eval) {
Expand Down Expand Up @@ -406,12 +403,17 @@ class NodeWorker extends EventEmitter {
name: this.#name,
workerType: "node",
closeOnIdle: true,
resourceLimits: resourceLimits_,
},
serializedWorkerMetadata,
);
this.#id = id;
this.threadId = id;

if (resourceLimits_) {
this.resourceLimits = { ...resourceLimits_ };
}

if (options?.stdin) {
// deno-lint-ignore no-this-alias
const worker = this;
Expand Down Expand Up @@ -499,15 +501,25 @@ class NodeWorker extends EventEmitter {
this.#status = "CLOSED";
this.#closeStdio();
if (this.listenerCount("error") > 0) {
const err = new Error(data.errorMessage ?? data.message);
if (data.name) {
err.name = data.name;
const errMsg = data.errorMessage ?? data.message;
const errName = data.name;
let err;
if (errName === "ERR_WORKER_OUT_OF_MEMORY") {
err = new Error(errMsg);
err.code = errName;
err.name = "Error";
} else {
err = new Error(errMsg);
if (errName) {
err.name = errName;
}
}
// Stack is unavailable from the worker context (e.g. prepareStackTrace
// may have thrown). Match Node.js behavior of setting stack to undefined.
err.stack = undefined;
this.emit("error", err);
}
this.resourceLimits = {};
if (!this.#exited) {
this.#exited = true;
this.emit("exit", data.exitCode ?? 1);
Expand All @@ -522,6 +534,7 @@ class NodeWorker extends EventEmitter {
debugWT(`Host got "close" message from worker: ${this.#name}`);
this.#status = "CLOSED";
this.#closeStdio();
this.resourceLimits = {};
if (!this.#exited) {
this.#exited = true;
this.emit("exit", data ?? 0);
Expand Down Expand Up @@ -747,14 +760,11 @@ internals.__initWorkerThreads = (
internals.__isWorkerThread = !runningOnMainThread;

defaultExport.isMainThread = isMainThread;
// fake resourceLimits
resourceLimits = isMainThread ? {} : {
maxYoungGenerationSizeMb: 48,
maxOldGenerationSizeMb: 2048,
codeRangeSizeMb: 0,
stackSizeMb: 4,
};
defaultExport.resourceLimits = resourceLimits;

if (isMainThread) {
resourceLimits = {};
defaultExport.resourceLimits = resourceLimits;
}

if (!isMainThread) {
// TODO(bartlomieju): this is a really hacky way to provide
Expand Down Expand Up @@ -789,6 +799,16 @@ internals.__initWorkerThreads = (
process.env = env;
}

// Get resolved resource limits from the Rust side (includes V8
// defaults for unspecified fields), matching Node.js behavior.
const resolvedLimits = op_worker_get_resource_limits();
if (resolvedLimits) {
resourceLimits = resolvedLimits;
} else {
resourceLimits = {};
}
defaultExport.resourceLimits = resourceLimits;

// Set process.argv for worker threads.
// In Node.js, worker process.argv is [execPath, scriptPath, ...argv].
if (isWorkerThread) {
Expand Down
24 changes: 22 additions & 2 deletions runtime/ops/worker_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ use crate::worker::FormatJsErrorFn;

pub const UNSTABLE_FEATURE_NAME: &str = "worker-options";

/// V8 resource limits for worker isolates, matching Node.js `resourceLimits`.
#[derive(Deserialize, Default, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ResourceLimits {
pub max_young_generation_size_mb: Option<usize>,
pub max_old_generation_size_mb: Option<usize>,
pub code_range_size_mb: Option<usize>,
pub stack_size_mb: Option<usize>,
}

pub struct CreateWebWorkerArgs {
pub name: String,
pub worker_id: WorkerId,
Expand All @@ -43,6 +53,7 @@ pub struct CreateWebWorkerArgs {
pub worker_type: WorkerThreadType,
pub close_on_idle: bool,
pub maybe_worker_metadata: Option<WorkerMetadata>,
pub resource_limits: Option<ResourceLimits>,
}

pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
Expand Down Expand Up @@ -123,6 +134,7 @@ pub struct CreateWorkerArgs {
specifier: String,
worker_type: WorkerThreadType,
close_on_idle: bool,
resource_limits: Option<ResourceLimits>,
}

#[derive(Debug, thiserror::Error, deno_error::JsError)]
Expand Down Expand Up @@ -194,8 +206,15 @@ fn op_create_worker(
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<SendableWebWorkerHandle>(1);

// Setup new thread
let thread_builder = std::thread::Builder::new().name(format!("{worker_id}"));
// Setup new thread. If stackSizeMb is specified in resourceLimits,
// set the OS thread stack size to match Node.js behavior.
let mut thread_builder =
std::thread::Builder::new().name(format!("{worker_id}"));
if let Some(ref limits) = args.resource_limits
&& let Some(stack_mb) = limits.stack_size_mb.filter(|&v| v > 0)
{
thread_builder = thread_builder.stack_size(stack_mb * 1024 * 1024);
}
let maybe_worker_metadata = if let Some(data) = maybe_worker_metadata {
let transferables =
deserialize_js_transferables(state, data.transferables)?;
Expand Down Expand Up @@ -230,6 +249,7 @@ fn op_create_worker(
worker_type,
close_on_idle: args.close_on_idle,
maybe_worker_metadata,
resource_limits: args.resource_limits,
});

// Send thread safe handle from newly created worker to host thread
Expand Down
Loading