-
-
Notifications
You must be signed in to change notification settings - Fork 51
Assembler
Assembler receives a token stream and assembles JavaScript objects. It is used as a building block for streamers.
const Asm = require('stream-json/Assembler');
const {parser} = require('stream-json');
const {chain} = require('stream-chain');
const fs = require('fs');
const zlib = require('zlib');
const pipeline = chain([
fs.createReadStream('data.json.gz'),
zlib.createGunzip(),
parser()
]);
const asm = Asm.connectTo(pipeline);
asm.on('done', asm => console.log(asm.current));Assembler is based on EventEmitter. Its constructor takes no arguments.
The current object being assembled. If it is an array, new subobjects will be added to it. If it is an object, new properties will be added to it. Otherwise, it can be one of this: a string, a number, null, true, false.
A string for a property key, or null, if unused. This variable is used to keep a property name until a corresponding subobject is assembled.
An array of parent objects for an object in current. This array is used as a stack. Even values correspond to parent objects, while odd values contain a property key or null.
Essentially, when a new object comes in the current values of current and key are pushed on stack and reinitialized. When current is fully assembled, it is placed in its parent object possibly using key, and old values of current and key are restored from stack.
It is a boolean flag. It indicates that a current object is fully assembled. When this flag flips to true an event called done is issued as well.
In case of a stream of multiple JSON objects, done can be flipped to true multiple times.
This getter returns a current object depth. It is 0 for top-level objects.
For example, for [{a: 1}] when assembling 1, the depth will be 2.
This getter calculates and returns an array similar to FilterBase's stack.
For example, for [{a: 1}] when assembling 1, the path will be [0, 'a'].
This is a helper method, which encapsulates a common pattern used to consume a token.
The whole implementation is very simple:
consume(chunk) {
this[chunk.name] && this[chunk.name](chunk.value);
}This fragment relies on the fact that Assembler provides methods named as tokens. Not all tokens can be defined.
This method attaches to a token stream to run consume() cycle on data event. Additionally, after consuming a token, if done property is true, done event is emitted with the Assembler instance as a payload.
const asm = new Asm();
asm.connectTo(pipeline);
asm.on('done', asm => console.log(asm.current));This method discards subobjects until we hit a depth indicated by level. It is assumed that the current depth is higher than a requested level. Otherwise, no actions are performed.
Assembler can define some token names as method names to handle corresponding tokens. The common cycle is defined in consume():
asm[chunk.name] && asm[chunk.name](chunk.value);This static method creates an Assembler instance, connects it to a token stream and returns the instance.
const asm = Asm.connectTo(pipeline);
asm.on('done', asm => console.log(asm.current));The whole implementation is very simple:
static connectTo(stream) {
return new Assembler().connectTo(stream);
}