Piscina is a fast and efficient worker thread pool implementation for Node.js, designed for offloading CPU-intensive tasks from the main event loop.
Repository: https://github.com/piscinajs/piscina Website/Docs: https://piscinajs.dev/
Key Features:
- Fast inter-thread communication (using Atomics by default).
- Handles both fixed-task and variable-task workloads.
- Flexible pool size management (
minThreads
,maxThreads
). - Async tracking integration.
- Performance statistics (
runTime
,waitTime
). - Task cancellation support (
AbortController
,EventEmitter
). - Resource limits enforcement (
maxOldGenerationSizeMb
, etc.). - Supports CommonJS, ESM, and TypeScript.
- Custom task queues.
- Optional CPU scheduling priorities (Linux).
Requirements: Node.js 18.x and higher.
npm install piscina
# or
yarn add piscina
main.js
: Initializes Piscina, pointing to the worker file.worker.js
: Contains the code the worker threads will execute.
// main.js
const path = require('node:path');
const Piscina = require('piscina');
const piscina = new Piscina({
// Absolute path or file:// URL to the worker script
filename: path.resolve(__dirname, 'worker.js')
});
(async () => {
try {
const result = await piscina.run({ a: 5, b: 10 });
console.log(`Result: ${result}`); // Output: Result: 15
const asyncResult = await piscina.run({ a: 2, b: 3, delayMs: 100 });
console.log(`Async Result: ${asyncResult}`); // Output: Async Result: 5 (after ~100ms)
} catch (error) {
console.error('Task failed:', error);
} finally {
// Optional: Gracefully shutdown the pool when done
// await piscina.close();
}
})();
// worker.js - Synchronous Task
// module.exports = (taskData) => {
// console.log(`Worker processing: ${JSON.stringify(taskData)}`);
// if (typeof taskData.a !== 'number' || typeof taskData.b !== 'number') {
// throw new Error('Invalid input: a and b must be numbers');
// }
// return taskData.a + taskData.b;
// };
// worker.js - Asynchronous Task (using Promise)
const { setTimeout: sleep } = require('node:timers/promises');
module.exports = async (taskData) => {
console.log(`Worker processing async: ${JSON.stringify(taskData)}`);
if (typeof taskData.a !== 'number' || typeof taskData.b !== 'number') {
throw new Error('Invalid input: a and b must be numbers');
}
if (taskData.delayMs) {
await sleep(taskData.delayMs);
}
return taskData.a + taskData.b;
};
main.mjs
: Usesimport
andfile://
URLs.worker.mjs
: Usesexport default
.
// main.mjs
import { Piscina } from 'piscina';
import { fileURLToPath } from 'node:url';
import path from 'node:path';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const piscina = new Piscina({
// Must be a file:// URL for ESM workers
filename: new URL('./worker.mjs', import.meta.url).href
});
(async () => {
try {
const result = await piscina.run({ x: 7, y: 3 });
console.log(`ESM Result: ${result}`); // Output: ESM Result: 10
} catch (error) {
console.error('ESM Task failed:', error);
}
})();
// worker.mjs
import { setTimeout as sleep } from 'node:timers/promises';
// export default ({ x, y }) => {
// console.log(`ESM Worker processing: ${JSON.stringify({ x, y })}`);
// return x + y;
// };
// OR async
export default async ({ x, y, delayMs = 50 }) => {
console.log(`ESM Worker processing async: ${JSON.stringify({ x, y })}`);
await sleep(delayMs);
return x + y;
};
const piscina = new Piscina({
// --- Core ---
filename: path.resolve(__dirname, 'worker.js'), // Path to worker script (required if not passed in run())
name: 'default', // Name of the exported function to run (default: 'default')
// --- Pool Size ---
minThreads: Math.max(1, os.availableParallelism() / 2), // Default: os.availableParallelism()
maxThreads: os.availableParallelism() * 1.5, // Default: os.availableParallelism() * 1.5
idleTimeout: 0, // ms. Time before an idle thread is shut down (0 = immediate, Infinity = never). Default: 0
// Consider increasing for better performance if tasks are frequent.
// --- Queueing ---
maxQueue: 'auto', // Max tasks waiting for a thread. Default: Infinity. 'auto' = maxThreads^2
concurrentTasksPerWorker: 1, // Tasks per worker thread. Default: 1. Increase for I/O-bound async tasks in workers.
taskQueue: new Piscina.FixedQueue(), // Optional: Custom task queue implementation. Default: Array-based FIFO. FixedQueue is faster.
// --- Performance & Communication ---
atomics: 'sync', // 'sync' (default), 'async', 'disabled'. Controls Atomics usage for communication. 'async' allows event loop activity between tasks but has overhead.
recordTiming: true, // Collect runTime and waitTime stats. Default: true.
// --- Worker Environment ---
resourceLimits: { // See Node.js Worker docs
maxOldGenerationSizeMb: 1024, // Max main heap size (MB) per worker
// maxYoungGenerationSizeMb: ...,
// codeRangeSizeMb: ...,
stackSizeMb: 8, // Default: 4
},
env: { // Set environment variables inside workers
...process.env, // Inherit main process env
WORKER_SPECIFIC_VAR: 'value'
},
argv: ['--worker-arg1'], // Append to process.argv in workers
execArgv: ['--inspect=0'], // Node.js CLI options for workers (e.g., debugging)
workerData: { sharedConfig: 'some value' }, // Data available via require('piscina').workerData or import { workerData } from 'piscina'
// --- Scheduling & Cleanup ---
niceIncrement: 0, // Linux only (requires @napi-rs/nice): Increase value to lower worker priority. Default: 0
trackUnmanagedFds: true, // Auto-close fs.open/fs.close FDs on worker exit. Default: true (Node >= 12.19/14.6)
closeTimeout: 30000, // ms. Max time to wait for tasks during piscina.close(). Default: 30000
});
Returns a Promise
that resolves with the worker function's return value or rejects on error/cancellation.
// Basic run
const result = await piscina.run({ data: 'some task data' });
// Override filename for this task
const resultOtherWorker = await piscina.run(
{ data: 'specific task' },
{ filename: path.resolve(__dirname, 'other_worker.js') }
);
// Specify exported function name (see 3.3)
const multiplyResult = await piscina.run(
{ a: 5, b: 3 },
{ name: 'multiply' }
);
// Transfer data instead of cloning (for ArrayBuffer, TypedArray, MessagePort etc.)
const buffer = new ArrayBuffer(1024);
// Fill buffer...
const resultWithTransfer = await piscina.run(
buffer, // Task data is the buffer itself
{ transferList: [buffer] } // Mark buffer for transfer
);
// Note: `buffer` in the main thread is now unusable after transfer.
// Cancelable Task (see 3.5)
const ac = new AbortController();
const taskPromise = piscina.run({ /* ... */ }, { signal: ac.signal });
// Later...
// ac.abort();
try {
await taskPromise;
} catch (err) {
if (err.name === 'AbortError' || err.code === 'ERR_WORKER_ABORTED') {
console.log('Task was cancelled');
} else {
console.error('Task failed:', err);
}
}
Define multiple named functions in your worker file and choose which one to run using the name
option.
// worker_math.js
function add({ a, b }) { return a + b; }
function subtract({ a, b }) { return a - b; }
function multiply({ a, b }) { return a * b; }
// Method 1: Assign properties to the default export
// add.add = add; // Optional: make 'add' runnable via name: 'add'
// add.subtract = subtract;
// add.multiply = multiply;
// module.exports = add; // 'add' is the default export
// Method 2: Export an object (Recommended for clarity)
module.exports = {
add, // Runnable via name: 'add' (or 'default' if no default export exists explicitly)
subtract, // Runnable via name: 'subtract'
multiply, // Runnable via name: 'multiply'
// You can still set a default if needed:
default: add // Runnable via name: 'default' (or if no name option is provided)
};
// main.js (using worker_math.js)
const piscina = new Piscina({
filename: path.resolve(__dirname, 'worker_math.js'),
// 'name' option in constructor sets the default for piscina.run()
// name: 'add' // Optional: makes 'add' the default if not specified in run()
});
(async () => {
const sum = await piscina.run({ a: 10, b: 5 }); // Runs default ('add' if set above, or the module's default export)
// OR explicitly specify name:
// const sum = await piscina.run({ a: 10, b: 5 }, { name: 'add' });
const difference = await piscina.run({ a: 10, b: 5 }, { name: 'subtract' });
const product = await piscina.run({ a: 10, b: 5 }, { name: 'multiply' });
console.log({ sum, difference, product });
// Output: { sum: 15, difference: 5, product: 50 }
})();
Use an AbortController
or any EventEmitter
that emits an 'abort'
event.
// Using AbortController (Recommended)
const ac = new AbortController();
const { signal } = ac;
const task = piscina.run({ /* ... */ }, { signal });
// To cancel:
setTimeout(() => {
console.log('Aborting task...');
ac.abort();
}, 50); // Example: Abort after 50ms
try {
await task;
} catch (error) {
// Piscina throws specific errors on abort
if (error.name === 'AbortError' || error.code === 'ERR_WORKER_ABORTED') {
console.log('Task successfully aborted by AbortController.');
} else {
console.error('Task failed for other reason:', error);
}
}
// Using EventEmitter
const EventEmitter = require('node:events');
const eeSignal = new EventEmitter();
const task2 = piscina.run({ /* ... */ }, { signal: eeSignal });
// To cancel:
setTimeout(() => {
console.log('Aborting task via EventEmitter...');
eeSignal.emit('abort');
}, 50);
try {
await task2;
} catch (error) {
if (error.name === 'AbortError' || error.code === 'ERR_WORKER_ABORTED') {
console.log('Task successfully aborted by EventEmitter.');
} else {
console.error('Task 2 failed for other reason:', error);
}
}
- Note: Aborted tasks reject the promise returned by
piscina.run()
. If the task was already running, the worker thread executing it will be terminated and replaced.
Use Piscina.move()
in the worker to mark transferable objects (like ArrayBuffer
) to be transferred back to the main thread instead of cloned, improving performance for large data.
// worker_transfer.js
const { move } = require('piscina');
// or import { move } from 'piscina'; // for ESM
module.exports = (taskData) => {
const dataSize = taskData.size || 10 * 1024 * 1024; // 10MB
const buffer = new ArrayBuffer(dataSize);
const view = new Uint8Array(buffer);
// Fill the buffer with some data...
for (let i = 0; i < dataSize; i++) {
view[i] = i % 256;
}
console.log('Worker created buffer, moving it back.');
// Wrap the transferable object with move()
return move(buffer);
};
// main.js (using worker_transfer.js)
const piscina = new Piscina({ filename: path.resolve(__dirname, 'worker_transfer.js') });
(async () => {
console.time('transfer');
const transferredBuffer = await piscina.run({ size: 50 * 1024 * 1024 }); // 50MB
console.timeEnd('transfer');
console.log(`Received buffer of size: ${transferredBuffer.byteLength}`);
// The buffer is now owned by the main thread.
})();
Transferable
Interface: For complex objects containing transferable parts, implement a customTransferable
usingPiscina.transferableSymbol
andPiscina.valueSymbol
. See the official README/docs for details.
Export a Promise
from the worker file that resolves to the task function. Piscina waits for this promise before marking the worker as ready.
// worker_async_init.js
const { setTimeout: sleep } = require('node:timers/promises');
async function initializeWorker() {
console.log('Worker initializing...');
// Simulate async setup (e.g., DB connection, loading data)
await sleep(500);
console.log('Worker ready!');
// Return the actual task processing function
return (taskData) => {
console.log(`Processing task: ${taskData.value}`);
return `Processed: ${taskData.value}`;
};
}
// Export the promise returned by the async function call
module.exports = initializeWorker();
// main.js (using worker_async_init.js)
const piscina = new Piscina({ filename: path.resolve(__dirname, 'worker_async_init.js') });
// Workers will take ~500ms to become available
(async () => {
console.log('Submitting task...');
const result = await piscina.run({ value: 'Test Data' });
console.log(result); // Output: Processed: Test Data (after init completes)
})();
Control the flow of tasks when the pool is busy.
// main_backpressure.js
const piscina = new Piscina({
filename: path.resolve(__dirname, 'worker.js'),
maxThreads: 2,
maxQueue: 4 // Limit queue size (or use 'auto')
// idleTimeout: 1000 // Prevent threads stopping immediately
});
let taskCounter = 0;
let isPaused = false;
const MAX_TASKS = 20;
function submitTask() {
if (taskCounter >= MAX_TASKS) {
console.log('All tasks submitted.');
// Consider closing the pool when truly done
// piscina.close().then(() => console.log('Pool closed.'));
return;
}
// Check if pool is overloaded BEFORE submitting
if (piscina.queueSize >= piscina.options.maxQueue) {
if (!isPaused) {
console.log(`Queue full (${piscina.queueSize}/${piscina.options.maxQueue}). Pausing submission...`);
isPaused = true;
// Stop adding tasks (e.g., pause reading from a stream)
}
// Optionally wait for drain event or check periodically
return;
}
// If paused and queue has space, resume
if (isPaused) {
console.log(`Queue has space (${piscina.queueSize}/${piscina.options.maxQueue}). Resuming submission...`);
isPaused = false;
// Resume adding tasks (e.g., resume reading from a stream)
}
const currentTask = taskCounter++;
console.log(`Submitting task ${currentTask}`);
piscina.run({ a: currentTask, b: 1, delayMs: 100 }) // Simulate work
.then(result => console.log(`Task ${currentTask} completed: ${result}`))
.catch(err => console.error(`Task ${currentTask} failed:`, err));
// Immediately try to submit the next task (or use setInterval/setTimeout)
setImmediate(submitTask);
}
// Listen for 'drain' event - emitted when queue is empty *and* tasks running equals pool capacity or less
// Useful for knowing when to resume a paused producer
piscina.on('drain', () => {
console.log('*** Pool drained (queue empty) ***');
if (isPaused) {
console.log('Resuming submission due to drain event.');
isPaused = false;
// Resume adding tasks
setImmediate(submitTask); // Kick off submission again
}
});
// Listen for 'needsDrain' event - emitted when queue size > 0 OR active tasks >= maxThreads
// Can also be used for pausing
// piscina.on('needsDrain', () => {
// if (!isPaused) {
// console.log('*** Pool needs drain *** -- Pausing');
// isPaused = true;
// }
// });
// Start the submission process
submitTask();
Communicate between the main thread and all worker threads (Node.js v18+).
// main_broadcast.js
const { BroadcastChannel } = require('node:worker_threads');
const Piscina = require('piscina');
const path = require('node:path');
const piscina = new Piscina({
filename: path.resolve(__dirname, 'worker_broadcast.js'),
minThreads: 2,
maxThreads: 2,
// BroadcastChannel might not work reliably with atomics: 'sync' if threads pause immediately.
// Consider 'async' or disabling atomics if issues arise, though this example works often.
// atomics: 'async'
});
const channelName = 'my-app-config-updates';
async function main() {
const bc = new BroadcastChannel(channelName);
console.log('Starting workers...');
// Run tasks to ensure workers are started and listening
await Promise.all([
piscina.run({ id: 1 }),
piscina.run({ id: 2 })
]);
console.log('Workers started. Broadcasting message in 1 second...');
setTimeout(() => {
const message = { type: 'config_update', data: { theme: 'dark' } };
console.log('Main: Broadcasting:', message);
bc.postMessage(message);
}, 1000);
setTimeout(() => {
console.log('Broadcasting another message...');
bc.postMessage({ type: 'shutdown_signal' });
bc.close(); // Close the channel when done
piscina.close(); // Close the pool
}, 3000);
}
main();
// worker_broadcast.js
const { BroadcastChannel, isMainThread } = require('node:worker_threads');
const { workerData } = require('piscina'); // Access initial workerData if needed
if (isMainThread) {
throw new Error('This script should only run in a worker thread');
}
const channelName = 'my-app-config-updates';
let bc; // Define bc in the worker's scope
// Main task function
module.exports = async (taskData) => {
console.log(`Worker [${taskData.id}]: Received task.`);
// Initialize BroadcastChannel only once per worker instance
if (!bc) {
console.log(`Worker [${taskData.id}]: Setting up BroadcastChannel listener...`);
bc = new BroadcastChannel(channelName);
bc.onmessage = (event) => {
console.log(`Worker [${taskData.id}]: Received broadcast message:`, event.data);
if (event.data?.type === 'shutdown_signal') {
console.log(`Worker [${taskData.id}]: Received shutdown. Closing channel.`);
bc.close();
}
// Handle the message (e.g., update worker's internal state)
};
// Optional: Handle errors on the channel
bc.onmessageerror = (error) => {
console.error(`Worker [${taskData.id}]: BroadcastChannel message error:`, error);
};
}
// Simulate some work for the task itself
await new Promise(resolve => setTimeout(resolve, 500));
console.log(`Worker [${taskData.id}]: Task finished.`);
return `Worker ${taskData.id} done.`;
};
// Optional: Cleanup when the worker exits (e.g., due to idleTimeout or destroy)
// process.on('exit', () => {
// if (bc) {
// console.log('Worker exiting, closing BroadcastChannel');
// bc.close();
// }
// });
Replace the default FIFO queue for potentially better performance, especially with large queues.
const { Piscina, FixedQueue } = require('piscina'); // Import FixedQueue
const path = require('node:path');
const piscina = new Piscina({
filename: path.resolve(__dirname, 'worker.js'),
// Use the built-in high-performance queue
taskQueue: new FixedQueue()
});
// ... rest of your code using piscina.run() ...
- You can also implement your own queue by providing an object matching the
TaskQueue
interface (see README).
piscina.run(task, [options])
: Submits a task. ReturnsPromise<Result>
. (See 3.2)piscina.close([options])
: Gracefully shuts down the pool. Waits for running tasks to complete (up tocloseTimeout
). ReturnsPromise<void>
.options.force
(boolean, defaultfalse
): Iftrue
, cancels queued (not yet running) tasks immediately.
await piscina.close(); // Wait for running tasks await piscina.close({ force: true }); // Abort queued tasks
piscina.destroy()
: Immediately terminates all workers and rejects all pending/running tasks. ReturnsPromise<void>
. Use for forceful shutdown.await piscina.destroy();
Listen using piscina.on('eventName', handler)
.
'error' (error)
: Emitted for unhandled errors within the pool/worker management itself (e.g., worker crashes unexpectedly, invalid messages). Task-specific errors reject therun()
promise.'drain'
: Emitted when the queue becomes empty and worker utilization allows for more tasks. Useful for resuming paused producers. (See 3.7)'needsDrain'
: Emitted whenpiscina.needsDrain
becomes true (pool is at or over capacity). Useful for pausing producers.'message' (message)
: Emitted when a worker sends a message usingparentPort.postMessage()
(less common with Piscina's task model, but possible).
piscina.options
: Copy of the options used to configure the pool.piscina.threads
: Array of the activeWorker
instances.piscina.queueSize
: Number of tasks currently waiting in the queue.piscina.completed
: Total number of tasks completed since the pool was created.piscina.duration
: Time in milliseconds since the pool was created.piscina.needsDrain
: Boolean indicating if the pool is operating at or beyond capacity (queueSize > 0
or active tasks >=maxThreads
). (See 3.7)piscina.utilization
: A ratio (0-1) indicating how busy the pool has been (approx. total task run time / total pool capacity time).piscina.runTime
: Histogram object summarizing task execution times (ms). Includes.average
,.mean
,.stddev
,.min
,.max
, and percentiles (.p50
,.p99
, etc.). RequiresrecordTiming: true
.piscina.waitTime
: Histogram object summarizing task wait times in the queue (ms). Same structure asrunTime
. RequiresrecordTiming: true
.
console.log(`Queue size: ${piscina.queueSize}`);
console.log(`Pool needs drain? ${piscina.needsDrain}`);
console.log(`Average run time: ${piscina.runTime.average} ms`);
console.log(`99th percentile wait time: ${piscina.waitTime.p99} ms`);
Piscina.isWorkerThread
: Boolean,true
if the current code is running inside a Piscina worker thread.Piscina.version
: String, the version of the Piscina library.Piscina.move(value)
: Marks a transferable value for transfer instead of cloning when returned from a worker. (See 3.5)Piscina.transferableSymbol
,Piscina.valueSymbol
: Symbols used for implementing the customTransferable
interface.Piscina.queueOptionsSymbol
: Symbol used for passing options to custom task queues via the task object.Piscina.FixedQueue
: The built-in high-performance task queue class. (See 3.9)
// worker.js (CJS)
const { isWorkerThread, workerData, move, parentPort /* etc */ } = require('piscina');
// or const piscina = require('piscina'); piscina.isWorkerThread ...
if (isWorkerThread) {
console.log('Running inside a Piscina worker.');
console.log('Initial workerData:', workerData); // Access data passed in constructor
}
module.exports = (task) => { /* ... */ };
// worker.mjs (ESM)
import { isWorkerThread, workerData, move, parentPort /* etc */ } from 'piscina';
if (isWorkerThread) {
console.log('Running inside a Piscina worker (ESM).');
console.log('Initial workerData:', workerData);
}
export default (task) => { /* ... */ };
- Use Case: Best for CPU-bound synchronous tasks (complex calculations, data processing) to avoid blocking the main Node.js event loop. Asynchronous tasks (like I/O) already benefit from Node.js's internal thread pool; moving them to Piscina workers might offer limited gains unless they also involve significant CPU work.
- Worker Initialization: Costly operations (DB connections, loading large data) should ideally happen once per worker, potentially using async initialization (See 3.6).
- Data Transfer: Avoid cloning large data. Use
transferList
inpiscina.run()
options orPiscina.move()
when returning from workers forArrayBuffer
s etc. (See 3.5). Pass only necessary data to tasks. - Pool Sizing (
minThreads
,maxThreads
): Tune based on available CPU cores and workload. Too few threads underutilize CPU; too many cause excessive context switching. Start with defaults and benchmark. - Idle Timeout (
idleTimeout
): The default of0
(immediate shutdown) can cause thread churn (creation/destruction overhead) if tasks arrive intermittently. IncreaseidleTimeout
(e.g.,1000
ms to60000
ms) or raiseminThreads
if you expect frequent tasks, to keep workers warm. Balance this against resource usage of idle threads. - Queue Size (
maxQueue
): Default (Infinity
) can lead to high memory usage if tasks arrive much faster than they're processed. Setting'auto'
(maxThreads^2
) or a fixed number provides backpressure but risks rejecting tasks if the producer isn't designed to handle it. Usepiscina.needsDrain
/'drain'
event to manage flow (See 3.7). - Out-of-Scope Async Code: Ensure all asynchronous operations within a worker task handler are
await
ed before returning. Unawaited async operations might pause and unexpectedly resume when the next task runs on that worker, leading to unpredictable behavior.// worker.js - BAD EXAMPLE const { setTimeout: sleep } = require('node:timers/promises'); module.exports = ({ a, b }) => { // This promise is NOT awaited sleep(1000).then(() => { console.log('This might run during a LATER task!'); }); return a + b; // Returns immediately };
- Resource Limits: Use
resourceLimits
cautiously. Setting them too low can make workers unusable or cause crashes. Profile memory usage. - Linux Priority (
niceIncrement
): If using on Linux and installing@napi-rs/nice
, increasingniceIncrement
lowers worker CPU priority. This can prevent workers from starving the main event loop but will slow down task completion. Profile carefully. - Multiple Pools: Be aware that multiple
Piscina
instances in the same application create separate thread pools that compete for CPU resources. Consider sharing a single pool if appropriate, or document clearly if your library uses Piscina internally.
- Task Rejections: Check the error passed to the
.catch()
block of thepiscina.run()
promise. It could be an error thrown by your worker code, a cancellation error (AbortError
/ERR_WORKER_ABORTED
), or an error from Piscina itself (e.g., worker exited unexpectedly). - Worker Crashes: Listen for the
'error'
event on thePiscina
instance for errors not tied to a specific task promise (e.g., uncatchable exceptions during worker idle time, resource limit exceeded). Check workerstderr
if possible (may require custom setup or observing thread exit codes). - High Memory Usage: Check
maxQueue
size. Ensure large data isn't being unnecessarily cloned (usemove
/transferList
). Check for memory leaks within your worker code. Monitor usingresourceLimits
and external tools. - Slow Performance / High CPU: Profile task execution time (
piscina.runTime
). Check for thread churn (idleTimeout
). Ensure you aren't creating too many threads (maxThreads
). ConsiderniceIncrement
on Linux. Ensure the tasks are genuinely CPU-bound. - Deadlocks/Hangs: Ensure worker tasks always complete or throw errors. Unresolved promises in workers can block threads. Check for issues with
atomics: 'sync'
if workers rely on the event loop between tasks (consider'async'
or'disabled'
in that specific case).