-
-
Notifications
You must be signed in to change notification settings - Fork 146
Description
Piscina 3.0.0 (2.2.0 as well), Node.js 14.17.0 and 16.1.0
In the official example of using MessageChannel the order is not consistent. The task can finish before the message event or the other way round. I was naively expecting every postMessage inside the worker to appear before as a message event before runTask finishes. But the order is seemingly random causing funny side effects on my end.
Here's the code adapted from the official example. I haven't seen it ever happen on the first iteration, but often the second (sometimes a hand full of iterations) will stop because the task resolved before the message event happened.
Expected order: [message, done]
What I see often: [done, message]
index.js
'use strict';
const Piscina = require('piscina');
const { resolve } = require('path');
const { MessageChannel } = require('worker_threads');
const piscina = new Piscina({
filename: resolve(__dirname, 'worker.js'),
});
(async function () {
for (let i = 0; i < 1000; i++) {
const channel = new MessageChannel();
let events = [];
channel.port2.on('message', (message) => {
console.log('message');
events.push('message');
});
await piscina.runTask({ port: channel.port1 }, [channel.port1]);
console.log('done');
events.push('done');
// Make sure we wait for all message events to make it more reliable.
await new Promise((resolve) => {
setTimeout(resolve, 100);
});
if (events[0] !== 'message' || events[1] !== 'done') {
console.log(events);
console.error(new Error('Wrong order'));
process.exit(1);
}
console.log('-----------------------------');
}
process.exit(0);
})();worker.js
'use strict';
module.exports = ({ port }) => {
port.postMessage('hello from the worker pool');
};Now it gets more interesting with multiple message, let's send two messages from the worker:
Expected order: [message, message, done]
What I see often: [done, message, message] or even [message, done, message]
index.js
'use strict';
const Piscina = require('piscina');
const { resolve } = require('path');
const { MessageChannel } = require('worker_threads');
const piscina = new Piscina({
filename: resolve(__dirname, 'worker.js'),
});
(async function () {
for (let i = 0; i < 1000; i++) {
const channel = new MessageChannel();
let events = [];
channel.port2.on('message', (message) => {
console.log('message');
events.push('message');
});
await piscina.runTask({ port: channel.port1 }, [channel.port1]);
console.log('done');
events.push('done');
// Make sure we wait for all message events to make it more reliable.
await new Promise((resolve) => {
setTimeout(resolve, 100);
});
if (events[0] !== 'message' || events[1] !== 'message' || events[2] !== 'done') {
console.log(events);
console.error(new Error('Wrong order'));
process.exit(1);
}
console.log('-----------------------------');
}
process.exit(0);
})();worker.js
'use strict';
module.exports = ({ port }) => {
port.postMessage('hello from the worker pool');
port.postMessage('hello from the worker pool');
};With such a small message you usually get [done, message, message] with the second iteration. With a larger message you can get [message, done, message] as well and it often takes more than two iterations.
'use strict';
module.exports = ({ port }) => {
port.postMessage(Buffer.alloc(1024 * 1024 * 5));
port.postMessage(Buffer.alloc(1024 * 1024 * 5));
};E.g. here's a run that just took more than two iterations:
$ node index.js
message
message
done
-----------------------------
message
message
done
-----------------------------
message
message
done
-----------------------------
message
message
done
-----------------------------
message
done
message
[ 'message', 'done', 'message' ]
Error: Wrong order
at /home/alex/src/issues/piscina-race-condition/index.js:32:21
If this is expected behavior on Node.js's end I wonder if Piscina could do something about that? I'm not really into implementing flow control with ACK messages and what not when that's exactly why I'm using Piscina in the first place (native worker threads are somewhat painful to use 😅 ).