-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathBlockDigestsStorage.js
More file actions
67 lines (61 loc) · 1.96 KB
/
BlockDigestsStorage.js
File metadata and controls
67 lines (61 loc) · 1.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
const async = require('async');
const stream = require('stream');
const Level = require('level');
const MAX_BATCH_SIZE = 100;
const MAX_QUEUE_SIZE = 1000;
/**
* Writable stream to store block digests coming from a
* BlockDigestsStream instance into a LevelDB database or other
* API-compatible storage layer
*
* @class BlockDigestsStorage
*/
class BlockDigestsStorage extends stream.Writable {
/**
* @constructor
* @param {object} params - constructor params, must contain
* either "levelPath" or "db" attribute
* @param {string} [params.levelPath] - path to LevelDB database
* to write to
* @param {string} [params.db] - custom db object, must contain
* the methods "batch" and "close" following levelup's API -
* currently used for unit tests
*/
constructor(params) {
super({ objectMode: true });
const { levelPath, db } = params;
if (levelPath) {
this.db = new Level(params.levelPath);
} else {
this.db = db;
}
this.cargo = async.cargo((tasks, cb) => {
this.db.batch(tasks, cb);
}, MAX_BATCH_SIZE);
}
_write(blockInfo, encoding, callback) {
const { size, digest, lastKey } = blockInfo;
this.cargo.push({
type: 'put',
key: lastKey, // index by last block key for efficient lookup
value: JSON.stringify({ size, digest }),
});
// heuristic to have basic flow control: delay the callback
// while queue size is above a reasonable size
async.whilst(
async () => this.cargo.length() > MAX_QUEUE_SIZE,
cb => setTimeout(cb, 100),
() => callback(),
);
}
_final(callback) {
if (this.cargo.idle()) {
this.db.close(callback);
} else {
this.cargo.drain(() => {
this.db.close(callback);
});
}
}
}
module.exports = BlockDigestsStorage;