-
-
Notifications
You must be signed in to change notification settings - Fork 51
Replace
Replace is a token item filter based on FilterBase. It replaces objects from a stream passing the rest as is. Its filter is not called for subobjects of replaced objects. In certain conditions, it can replace items with nothing effectively ignoring values.
There is a helper filter Ignore, which is used to remove values completely. It is based on Replace.
const Pick = require('stream-json/filters/pick');
const {streamArray} = require('stream-json/streamers/StreamArray');
const {chain} = require('stream-chain');
const fs = require('fs');
// our data stream:
// {total: 123456789, meta: {...}, data: [...]}
// we are interested in 'data'
const pipeline = chain([
fs.createReadStream('sample.json'),
Pick.withParser({filter: 'data'}),
streamArray()
]);
pipeline.on('data', data => console.log(data));More complex example:
const {pick} = require('stream-json/filters/pick');
const {parser} = require('stream-json/Parser');
const {streamJsonObjects} = require('stream-json/streamers/StreamJsonObjects');
// our data stream: array of objects,
// each object looks like that:
// {total: 123456789, meta: {...}, data: [...]}
// we want the content of all 'data' subobjects
const pipeline = chain([
fs.createReadStream('sample.json'),
parser(),
pick({filter: /^\d+\.data\.\d+/}),
streamJsonObject()
]);
pipeline.on('data', data => console.log(data));Pick has no special API. Based on FilterBase it uses following option properties:
-
filter- If it is truthy, the current object is streamed out with all its possible subobjects.
pathSeparatoronce
See their definition in FilterBase.
make() and pick() are two aliases of the factory function. It takes options described above, and return a new instance of Pick. pick() helps to reduce a boilerplate when creating data processing pipelines:
const {chain} = require('stream-chain');
const {parser} = require('stream-json/Parser');
const {pick} = require('stream-json/filters/Pick');
const fs = require('fs');
const pipeline = chain([
fs.createReadStream("sample.json"),
parser(),
pick({filter: 'data'})
]);
let objectCounter = 0;
pipeline.on('data', data => data.name === 'startObject' && ++objectCounter);
pipeline.on('end', console.log(`Found ${objectCounter} objects.`));Constructor property of make() (and pick()) is set to Pick. It can be used for indirect creating of filters or metaprogramming if needed.
withParser() takes one argument:
-
optionsis an object described in Parser's options. It is used to initialize both streams (aParserinstance and a stream returned bymake()).
It returns a stream produced by stream-chain, which wraps the pipeline. The most important utility of withParser() is that it correctly sets object modes of the returned stream: object mode for the Readable part and text mode for the Writable part.
This static method is created using withParser() utility. It simplifies a case when a stream should be immediately preceded by a parser.
const Pick = require('stream-json/filters/Pick');
const fs = require('fs');
const pipeline = fs.createReadStream("sample.json")
.pipe(Pick.withParser({filter: 'data'}));
let objectCounter = 0;
pipeline.on('data', data => data.name === 'startObject' && ++objectCounter);
pipeline.on('end', console.log(`Found ${objectCounter} objects.`));