-
-
Notifications
You must be signed in to change notification settings - Fork 12
V2: Intro
Just examples. Think about how you would do them without stream-chain.
- Simple pipeline
- Using asynchronous generators
- Combined functions
- Create
Transformout of functions - Slicing streams
- Folding
- Combined functions on steroids!
const Chain = require('stream-chain');
const fs = require('fs');
const zlib = require('zlib');
const {Transform} = require('stream');
// the chain will work on a stream of number objects
const chain = new Chain([
// transforms a value
x => x * x,
// returns several values
x => [x - 1, x, x + 1],
// waits for an asynchronous operation
async x => await getTotalFromDatabaseByKey(x),
// returns multiple values with a generator
function* (x) {
for (let i = x; i >= 0; --i) {
yield i;
}
},
// filters out even values
x => x % 2 ? x : null,
// uses an arbitrary transform stream
new Transform({
writableObjectMode: true,
transform(x, _, callback) {
// transform to text
callback(null, x.toString());
}
}),
// compress
zlib.createGzip()
]);
// log errors
chain.on('error', error => console.log(error));
// use the chain, and save the result to a file
dataSource.pipe(chain).pipe(fs.createWriteStream('output.txt.gz'));const {chain} = require('stream-chain');
const family = chain([
async function*(person) {
yield person;
// asynchronously retrieve parents
if (person.father) {
yield await getPersonFromDB(person.father);
}
if (person.mother) {
yield await getPersonFromDB(person.mother);
}
// asynchronously retrieve children, if any
for (let i = 0; i < person.children; ++i) {
yield await getPersonFromDB(person.children[i]);
}
},
new Transform({
writableObjectMode: true,
transform(x, _, callback) {
// transform to text
callback(null, JSON.stringify(x));
}
}),
zlib.createGzip(),
fs.createWriteStream('families.json-stream.gz')
]);
people.pipe(family);A block of regular functions can be separated and included in an array. Functions in such block will be combined without using streams to improve the performance.
const lessEfficient = chain([
x => x * x,
x => 2 * x
// ... more stages
]);
const moreEfficient = chain([
[
x => x * x,
x => 2 * x
]
// ... more stages
]);
const comp = request('stream-chain/utils/comp');
const evenMoreEfficient = chain([
comp(
x => x * x,
x => 2 * x
)
// ... more stages
]);
// or use gen() the same way:
const gen = request('stream-chain/utils/gen');
const theCoolest = chain([
gen(
x => x * x,
x => 2 * x
)
// ... more stages
]);Returning Chain.none terminates the pipeline. In the example below, it is used to filter out all even values.
const {chain, none, final} = require('stream-chain');
const pipeline = chain([
[
x => x % 2 ? x : none,
x => x * x,
x => 2 * x
]
// ... more stages
]);
// input: 1, 2, 3, 4
// output: 2, 18Wrapping value in final() terminates a pipeline and uses the value as the final result. The example below does not double odd values.
const {chain, none, final} = require('stream-chain');
const pipeline = chain([
[
x => x * x,
x => x % 2 ? final(x) : x,
x => 2 * x
]
// ... more stages
]);
// input: 1, 2, 3, 4
// output: 1, 8, 9, 32Sometimes all we need is to wrap a function into a Transform. It works with combined functions as well.
const {convertToTransform} = require('stream-chain');
const stream = convertToTransform(x => x + 1);const {convertToTransform} = require('stream-chain');
const stream = convertToTransform([
x => x * x,
x => 2 * x
]);This example processes only 5 items from the beginning of a stream.
const take = require('stream-json/utils/take');
const pipeline = chain([
take(5)
// ... more stages
]);This example skips 5 items from the beginning of a stream.
const skip = require('stream-json/utils/skip');
const pipeline = chain([
skip(5)
// ... more stages
]);This example skips 5 items from the beginning of a stream and takes the next 5.
const lessEfficient = chain([
skip(5),
take(5)
// ... more stages
]);
const moreEfficient = chain([
take({n: 5, skip: 5})
// ... more stages
]);Takes while a condition is true.
const takeWhile = require('stream-json/utils/takeWhile');
const pipeline = chain([
takeWhile(item => item !== 'separator')
// ... more stages
]);Skips while a condition is true.
const skipWhile = require('stream-json/utils/skipWhile');
const pipeline = chain([
skipWhile(item => item !== 'separator')
// ... more stages
]);Processes data between first two separators.
const pipeline = chain([
skipWhile(item => item !== 'separator'),
skip(1), // skip the separator
takeWhile(item => item !== 'separator')
// ... more stages
]);It is the same as reduce() in JavaScript's arrays.
const fold = require('stream-json/utils/fold');
const pipeline = chain([
fold((acc, x) => acc + x, 0)
// ... more stages
]);
// input: 1, 2, 3
// output: 6scan() is like fold() but outputs all intermediate values of its accumulator.
const scan = require('stream-json/utils/scan');
const pipeline = chain([
scan((acc, x) => acc + x, 0)
// ... more stages
]);
// input: 1, 2, 3
// output: 1, 3, 6Reduce is a Writable stream, which is used at the end of a pipeline to accumulate items. Its accumulator is available as a property. It can be used like fold() and scan().
const {reduce} = require('stream-json/utils/Reduce');
const toArray = reduce((acc, x) => {
acc.push(x);
return acc;
}, []);
const pipeline = chain([
// ... more stages
toArray
]);
// input: 1, 2, 3
// toArray.accumulator is [1, 2, 3]Unlike array-combined functions comp() and gen() can combine asynchronous and regular functions and generators. It allows functions to return multiple values wrapped in many(). Of course, none and final() works too.
const {comp} = require('stream-chain/utils/comp');
const pipeline = chain([
comp(
function*(x) {
yield x;
yield 10 * x;
},
x => 2 * x
)
// ... more stages
]);
// input: 1, 2, 3
// output: 2, 20, 4, 40, 6, 60 const {chain, many} = require('stream-chain');
const {comp} = require('stream-chain/utils/comp');
const pipeline = chain([
comp(
x => many([x, 10 * x]),
x => 2 * x
)
// ... more stages
]);
// input: 1, 2, 3
// output: 2, 20, 4, 40, 6, 60 const pipeline = chain([
comp(
async x => await getItemNumberFromDB(x),
x => 2 * x
)
// ... more stages
]);Make a separate function has its benefits — you can use it with streams or without streams. comp.asFun() returns an asynchronous function.
const doubler = comp.asFun(
async x => await getItemNumberFromDB(x),
x => 2 * x
);
const pipeline = chain([
doubler
// ... more stages
]);
doubler(42)
.then(value => console.log(value))
.catch(error => console.error(error));