Skip to content

Commit 6e10fef

Browse files
committed
Fix unit test and add tests for multi-threaded buffer access
1 parent 094ad0c commit 6e10fef

File tree

2 files changed

+108
-81
lines changed

2 files changed

+108
-81
lines changed

test/index.test.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -914,21 +914,21 @@ describe('lmdb-js', function () {
914914
});
915915
it('getUserSharedBuffer', function () {
916916
let defaultIncrementer = new BigInt64Array(1);
917-
defaultIncrementer[0] = 4n;
918917
let incrementer = new BigInt64Array(
919918
db.getUserSharedBuffer('incrementer-test', defaultIncrementer.buffer),
920919
);
920+
incrementer[0] = 4n;
921921
should.equal(Atomics.add(incrementer, 0, 1n), 4n);
922922
let secondDefaultIncrementer = new BigInt64Array(1); //should not get used
923-
incrementer = new BigInt64Array( // should return same incrementer
923+
let nextIncrementer = new BigInt64Array( // should return same incrementer
924924
db.getUserSharedBuffer(
925925
'incrementer-test',
926926
secondDefaultIncrementer.buffer,
927927
),
928928
);
929-
should.equal(defaultIncrementer[0], 5n);
930-
should.equal(Atomics.add(incrementer, 0, 1n), 5n);
931-
should.equal(defaultIncrementer[0], 6n);
929+
should.equal(incrementer[0], 5n);
930+
should.equal(Atomics.add(nextIncrementer, 0, 1n), 5n);
931+
should.equal(incrementer[0], 6n);
932932
should.equal(secondDefaultIncrementer[0], 0n);
933933
});
934934
it('getUserSharedBuffer with callbacks', async function () {

test/threads.cjs

Lines changed: 103 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,95 +1,122 @@
11
var assert = require('assert');
2-
const { Worker, isMainThread, parentPort, threadId } = require('worker_threads');
2+
const {
3+
Worker,
4+
isMainThread,
5+
parentPort,
6+
threadId,
7+
} = require('worker_threads');
38
var path = require('path');
49
var numCPUs = require('os').cpus().length;
10+
const { setFlagsFromString } = require('v8');
11+
const { runInNewContext } = require('vm');
12+
13+
setFlagsFromString('--expose_gc');
14+
const gc = runInNewContext('gc');
515

616
const { open } = require('../dist/index.cjs');
717
const MAX_DB_SIZE = 256 * 1024 * 1024;
818
if (isMainThread) {
9-
var inspector = require('inspector')
10-
// inspector.open(9331, null, true);debugger
19+
var inspector = require('inspector');
20+
// inspector.open(9331, null, true);debugger
1121

12-
// The main thread
22+
// The main thread
1323

14-
let db = open({
15-
path: path.resolve(__dirname, './testdata'),
16-
maxDbs: 10,
17-
mapSize: MAX_DB_SIZE,
18-
maxReaders: 126,
19-
overlappingSync: true,
20-
});
24+
let db = open({
25+
path: path.resolve(__dirname, './testdata'),
26+
maxDbs: 10,
27+
mapSize: MAX_DB_SIZE,
28+
maxReaders: 126,
29+
overlappingSync: true,
30+
});
2131

22-
var workerCount = Math.min(numCPUs * 2, 20);
23-
var value = {test: '48656c6c6f2c20776f726c6421'};
24-
var str = 'this is supposed to be bigger than 16KB threshold for shared memory buffers';
25-
for (let i = 0; i < 9; i++) {
26-
str += str;
27-
}
28-
var bigValue = {test: str};
29-
// This will start as many workers as there are CPUs available.
30-
var workers = [];
31-
for (var i = 0; i < workerCount; i++) {
32-
var worker = new Worker(__filename);
33-
workers.push(worker);
34-
}
32+
let incrementer = new BigInt64Array(1);
33+
let incrementerBuffer = db.getUserSharedBuffer('test', incrementer.buffer);
34+
incrementer = new BigInt64Array(incrementerBuffer);
35+
incrementer[0] = 10000n;
3536

36-
var messages = [];
37-
workers.forEach(function(worker) {
38-
worker.on('message', function(msg) {
39-
messages.push(msg);
40-
// Once every worker has replied with a response for the value
41-
// we can exit the test.
37+
var workerCount = Math.min(numCPUs * 2, 20);
38+
var value = { test: '48656c6c6f2c20776f726c6421' };
39+
var str =
40+
'this is supposed to be bigger than 16KB threshold for shared memory buffers';
41+
for (let i = 0; i < 9; i++) {
42+
str += str;
43+
}
44+
var bigValue = { test: str };
45+
// This will start as many workers as there are CPUs available.
46+
var workers = [];
47+
for (var i = 0; i < workerCount; i++) {
48+
var worker = new Worker(__filename);
49+
workers.push(worker);
50+
}
4251

43-
setTimeout(() => {
44-
worker.terminate()
45-
}, 100);
46-
if (messages.length === workerCount) {
47-
db.close();
48-
for (var i = 0; i < messages.length; i ++) {
49-
assert(messages[i] === value.toString('hex'));
50-
}
51-
console.log("done", threadId)
52-
//setTimeout(() =>
53-
//process.exit(0), 200);
54-
}
55-
});
56-
});
52+
var messages = [];
53+
workers.forEach(function (worker) {
54+
worker.on('message', function (msg) {
55+
messages.push(msg);
56+
// Once every worker has replied with a response for the value
57+
// we can exit the test.
5758

58-
let last
59-
for (var i = 0; i < workers.length; i++) {
60-
last = db.put('key' + i, i % 2 === 1 ? bigValue : value);
61-
}
59+
setTimeout(() => {
60+
worker.terminate();
61+
}, 100);
62+
if (messages.length === workerCount) {
63+
db.close();
64+
for (var i = 0; i < messages.length; i++) {
65+
assert(messages[i] === value.toString('hex'));
66+
}
67+
assert(incrementer[0] === 10000n + BigInt(workerCount) * 10n);
68+
console.log('done', threadId, incrementer[0]);
69+
//setTimeout(() =>
70+
//process.exit(0), 200);
71+
}
72+
});
73+
});
6274

63-
last.then(() => {
64-
for (var i = 0; i < workers.length; i++) {
65-
var worker = workers[i];
66-
worker.postMessage({key: 'key' + i});
67-
};
68-
});
75+
let last;
76+
for (var i = 0; i < workers.length; i++) {
77+
last = db.put('key' + i, i % 2 === 1 ? bigValue : value);
78+
}
6979

80+
last.then(() => {
81+
for (var i = 0; i < workers.length; i++) {
82+
var worker = workers[i];
83+
worker.postMessage({ key: 'key' + i });
84+
}
85+
});
7086
} else {
71-
// The worker process
72-
let db = open({
73-
path: path.resolve(__dirname, './testdata'),
74-
maxDbs: 10,
75-
mapSize: MAX_DB_SIZE,
76-
maxReaders: 126,
77-
overlappingSync: true,
78-
});
87+
// The worker process
88+
let db = open({
89+
path: path.resolve(__dirname, './testdata'),
90+
maxDbs: 10,
91+
mapSize: MAX_DB_SIZE,
92+
maxReaders: 126,
93+
overlappingSync: true,
94+
});
7995

96+
parentPort.on('message', async function (msg) {
97+
if (msg.key) {
98+
for (let i = 0; i < 10; i++) {
99+
let incrementer = new BigInt64Array(1);
100+
incrementer[0] = 1n; // should be ignored
101+
let incrementerBuffer = db.getUserSharedBuffer(
102+
'test',
103+
incrementer.buffer,
104+
);
105+
incrementer = new BigInt64Array(incrementerBuffer);
106+
Atomics.add(incrementer, 0, 1n);
107+
gc();
108+
await new Promise((resolve) => setTimeout(resolve, 100));
109+
}
80110

81-
parentPort.on('message', async function(msg) {
82-
if (msg.key) {
83-
var value = db.get(msg.key);
84-
if (msg.key == 'key1' || msg.key == 'key3') {
85-
await db.put(msg.key, 'updated');
86-
}
87-
if (value === null) {
88-
parentPort.postMessage("");
89-
} else {
90-
parentPort.postMessage(value.toString('hex'));
91-
}
92-
93-
}
94-
});
111+
var value = db.get(msg.key);
112+
if (msg.key == 'key1' || msg.key == 'key3') {
113+
await db.put(msg.key, 'updated');
114+
}
115+
if (value === null) {
116+
parentPort.postMessage('');
117+
} else {
118+
parentPort.postMessage(value.toString('hex'));
119+
}
120+
}
121+
});
95122
}

0 commit comments

Comments
 (0)