Skip to content

Commit 9c5a19e

Browse files
metcoder95ronagdependabot[bot]alan-agius4groozin
authored
feat: support Atomics.waitAsync (#687)
Co-authored-by: Robert Nagy <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Alan Agius <[email protected]> Co-authored-by: Tomasz Mikos <[email protected]> Co-authored-by: Chocobozzz <[email protected]> Co-authored-by: Charles <[email protected]> Co-authored-by: Rafael Gonzaga <[email protected]>
1 parent 4e1683a commit 9c5a19e

File tree

17 files changed

+277
-133
lines changed

17 files changed

+277
-133
lines changed

README.md

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ const { resolve } = require('path');
278278
const Piscina = require('piscina');
279279
const piscina = new Piscina({
280280
filename: resolve(__dirname, 'worker.js'),
281-
useAtomics: false
281+
atomics: 'disabled'
282282
});
283283

284284
async function main () {
@@ -362,12 +362,16 @@ This class extends [`EventEmitter`][] from Node.js.
362362
only makes sense to specify if there is some kind of asynchronous component
363363
to the task. Keep in mind that Worker threads are generally not built for
364364
handling I/O in parallel.
365-
* `useAtomics`: (`boolean`) Use the [`Atomics`][] API for faster communication
365+
* `atomics`: (`sync` | `async` | `disabled`) Use the [`Atomics`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics) API for faster communication
366366
between threads. This is on by default. You can disable `Atomics` globally by
367-
setting the environment variable `PISCINA_DISABLE_ATOMICS` to `1`.
368-
If `useAtomics` is `true`, it will cause to pause threads (stoping all execution)
369-
between tasks. Ideally, threads should wait for all operations to finish before
370-
returning control to the main thread (avoid having open handles within a thread).
367+
setting the environment variable `PISCINA_DISABLE_ATOMICS` to `1` .
368+
If `atomics` is `sync`, it will cause to pause threads (stoping all execution)
369+
between tasks. Ideally, threads should wait for all operations to finish before
370+
returning control to the main thread (avoid having open handles within a thread). If still want to have the possibility
371+
of having open handles or handle asynchrnous tasks, you can set the environment variable `PISCINA_ENABLE_ASYNC_ATOMICS` to `1` or setting `options.atomics` to `async`.
372+
373+
> **Note**: The `async` mode comes with performance penalties and can lead to undesired behaviour if open handles are not tracked correctly.
374+
371375
* `resourceLimits`: (`object`) See [Node.js new Worker options][]
372376
* `maxOldGenerationSizeMb`: (`number`) The maximum size of each worker threads
373377
main heap in MB.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
'use strict';
2+
const { Piscina } = require('../dist');
3+
const { resolve } = require('path');
4+
5+
async function simpleBenchmark ({ duration = 10000 } = {}) {
6+
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/add.js'), atomics: 'async' });
7+
let done = 0;
8+
9+
const results = [];
10+
const start = process.hrtime.bigint();
11+
while (pool.queueSize === 0) {
12+
results.push(scheduleTasks());
13+
}
14+
15+
async function scheduleTasks () {
16+
while ((process.hrtime.bigint() - start) / 1_000_000n < duration) {
17+
await pool.run({ a: 4, b: 6 });
18+
done++;
19+
}
20+
}
21+
22+
await Promise.all(results);
23+
24+
return done / duration * 1e3;
25+
}
26+
27+
simpleBenchmark().then((opsPerSecond) => {
28+
console.log(`opsPerSecond: ${opsPerSecond} (with default taskQueue)`);
29+
});

docs/docs/api-reference/class.md

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,20 @@ This class extends [`EventEmitter`](https://nodejs.org/api/events.html) from Nod
4747
only makes sense to specify if there is some kind of asynchronous component
4848
to the task. Keep in mind that Worker threads are generally not built for
4949
handling I/O in parallel.
50-
- `useAtomics`: (`boolean`) Use the [`Atomics`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics) API for faster communication
50+
- `atomics`: (`sync` | `async` | `disabled`) Use the [`Atomics`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics) API for faster communication
5151
between threads. This is on by default. You can disable `Atomics` globally by
52-
setting the environment variable `PISCINA_DISABLE_ATOMICS` to `1`.
53-
If `useAtomics` is `true`, it will cause to pause threads (stoping all execution)
52+
setting the environment variable `PISCINA_DISABLE_ATOMICS` to `1` .
53+
If `atomics` is `sync`, it will cause to pause threads (stoping all execution)
5454
between tasks. Ideally, threads should wait for all operations to finish before
55-
returning control to the main thread (avoid having open handles within a thread).
55+
returning control to the main thread (avoid having open handles within a thread). If still want to have the possibility
56+
of having open handles or handle asynchrnous tasks, you can set the environment variable `PISCINA_ENABLE_ASYNC_ATOMICS` to `1` or setting `options.atomics` to `async`.
57+
58+
** :::info
59+
**Note**: The `async` mode comes with performance penalties and can lead to undesired behaviour if open handles are not tracked correctly.
60+
Workers should be designed to wait for all operations to finish before returning control to the main thread, if any background operations are still running
61+
`async` can be of help (e.g. for cache warming, etc).
62+
:::
63+
**
5664
- `resourceLimits`: (`object`) See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options)
5765
- `maxOldGenerationSizeMb`: (`number`) The maximum size of each worker threads
5866
main heap in MB.
@@ -93,6 +101,9 @@ This class extends [`EventEmitter`](https://nodejs.org/api/events.html) from Nod
93101
- `workerHistogram`: (`boolean`) By default `false`. It will hint the Worker pool to record statistics for each individual Worker
94102
- `loadBalancer`: ([`PiscinaLoadBalancer`](#piscinaloadbalancer)) By default, Piscina uses a least-busy algorithm. The `loadBalancer`
95103
option can be used to provide an alternative implementation. See [Custom Load Balancers](../advanced-topics/loadbalancer.mdx) for additional detail.
104+
- `workerHistogram`: (`boolean`) By default `false`. It will hint the Worker pool to record statistics for each individual Worker
105+
- `loadBalancer`: ([`PiscinaLoadBalancer`](#piscinaloadbalancer)) By default, Piscina uses a least-busy algorithm. The `loadBalancer`
106+
option can be used to provide an alternative implementation. See [Custom Load Balancers](../advanced-topics/loadbalancer.mdx) for additional detail.
96107

97108
:::caution
98109
Use caution when setting resource limits. Setting limits that are too low may

docs/docs/examples/broadcast.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ const { resolve } = require('path');
2020
const Piscina = require('piscina');
2121
const piscina = new Piscina({
2222
filename: resolve(__dirname, 'worker.js'),
23-
useAtomics: false
23+
atomics: 'disabled'
2424
});
2525

2626
async function main () {

examples/broadcast/main.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ const { resolve } = require('path');
66
const Piscina = require('piscina');
77
const piscina = new Piscina({
88
filename: resolve(__dirname, 'worker.js'),
9-
// Set useAtomics to false to avoid threads being blocked when idle
10-
useAtomics: false
9+
// Set atomics to disabled to avoid threads being blocked when idle
10+
atomics: 'disabled'
1111
});
1212

1313
async function main () {

package.json

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
"test:ci": "npm run lint && npm run build && npm run test:coverage",
2020
"test:coverage": "c8 --reporter=lcov tap --cov",
2121
"prepack": "npm run build",
22-
"bench": "npm run bench:taskqueue && npm run bench:piscina",
23-
"bench:piscina": "npm run benchmark:piscina-default &&npm run benchmark:piscina-fixed-queue && npm run benchmark:piscina-comparison",
24-
"bench:taskqueue": "npm run benchmark:queue-comparison",
25-
"benchmark:piscina-default": "node benchmark/simple-benchmark.js",
26-
"benchmark:piscina-fixed-queue": "node benchmark/simple-benchmark-fixed-queue.js",
27-
"benchmark:piscina-comparison": "node benchmark/piscina-queue-comparison.js",
28-
"benchmark:queue-comparison": "node benchmark/queue-comparison.js"
22+
"benchmark": "npm run bench:queue && npm run benchmark:piscina",
23+
"benchmark:piscina": "npm run benchmark:default &&npm run benchmark:queue:fixed && npm run benchmark:default:comparison",
24+
"benchmark:default": "node benchmark/simple-benchmark.js",
25+
"benchmark:default:async": "node benchmark/simple-benchmark.js",
26+
"benchmark:default:comparison": "node benchmark/piscina-queue-comparison.js",
27+
"benchmark:queue": "npm run benchmark:queue-comparison",
28+
"benchmark:queue:fixed": "node benchmark/simple-benchmark-fixed-queue.js",
29+
"benchmark:queue:comparison": "node benchmark/queue-comparison.js"
2930
},
3031
"repository": {
3132
"type": "git",

src/index.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ interface Options {
6565
idleTimeout? : number,
6666
maxQueue? : number | 'auto',
6767
concurrentTasksPerWorker? : number,
68-
useAtomics? : boolean,
68+
atomics? : 'sync' | 'async' | 'disabled',
6969
resourceLimits? : ResourceLimits,
7070
argv? : string[],
7171
execArgv? : string[],
@@ -88,7 +88,7 @@ interface FilledOptions extends Options {
8888
idleTimeout : number,
8989
maxQueue : number,
9090
concurrentTasksPerWorker : number,
91-
useAtomics: boolean,
91+
atomics: Options['atomics'],
9292
taskQueue : TaskQueue,
9393
niceIncrement : number,
9494
closeTimeout : number,
@@ -122,7 +122,7 @@ const kDefaultOptions : FilledOptions = {
122122
idleTimeout: 0,
123123
maxQueue: Infinity,
124124
concurrentTasksPerWorker: 1,
125-
useAtomics: true,
125+
atomics: 'sync',
126126
taskQueue: new ArrayTaskQueue(),
127127
niceIncrement: 0,
128128
trackUnmanagedFds: true,
@@ -274,7 +274,7 @@ class ThreadPool {
274274
name: this.options.name,
275275
port: port2,
276276
sharedBuffer: workerInfo.sharedBuffer,
277-
useAtomics: this.options.useAtomics,
277+
atomics: this.options.atomics!,
278278
niceIncrement: this.options.niceIncrement
279279
};
280280
worker.postMessage(message, [port2]);
@@ -379,7 +379,7 @@ class ThreadPool {
379379
}
380380

381381
_processPendingMessages () {
382-
if (this.inProcessPendingMessages || !this.options.useAtomics) {
382+
if (this.inProcessPendingMessages || this.options.atomics === 'disabled') {
383383
return;
384384
}
385385

@@ -755,9 +755,9 @@ export default class Piscina<T = any, R = any> extends EventEmitterAsyncResource
755755
throw new TypeError(
756756
'options.concurrentTasksPerWorker must be a positive integer');
757757
}
758-
if (options.useAtomics !== undefined &&
759-
typeof options.useAtomics !== 'boolean') {
760-
throw new TypeError('options.useAtomics must be a boolean value');
758+
if (options.atomics != null && (typeof options.atomics !== 'string' ||
759+
!['sync', 'async', 'disabled'].includes(options.atomics))) {
760+
throw new TypeError('options.atomics should be a value of sync, sync or disabled.');
761761
}
762762
if (options.resourceLimits !== undefined &&
763763
(typeof options.resourceLimits !== 'object' ||

src/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export interface StartupMessage {
88
name: string
99
port: MessagePort
1010
sharedBuffer: Int32Array
11-
useAtomics: boolean
11+
atomics: 'async' | 'sync' | 'disabled'
1212
niceIncrement: number
1313
}
1414

0 commit comments

Comments
 (0)