sflow is a powerful and highly-extensible library designed for processing and manipulating streams of data effortlessly based on WebStream. Built for async-first pipelines, every pipeline method accepts async functions, making it perfect for web applications that handle async I/O operations and need to manage concurrency efficiently. Inspired by the functional programming paradigm, it provides a rich set of utilities for transforming streams, including chunking, filtering, mapping, reducing, among many others. It's a perfect companion for those who work extensively with streams and want to make their data processing pipelines more efficient and concise.
ONLINE DEMO avaliable here! - sflow online Examples and Use Cases
π Documentation: snomiao/sflow | DeepWiki
-
Async-first design: Every pipeline method accepts async functions, enabling seamless async I/O operations and concurrency management for web applications.
-
Chunking and buffering: Easily divide your stream into chunks based on different criteria such as count, intervals, custom conditions, etc.
-
Transformations: Map, filter, reduce, and various other transformations to process your stream data.
-
Stream utilities: Merge, throttle, debounce, and more utilities for advanced stream controls.
-
Error handling: Prevent or handle errors during stream processing, ensuring robustness.
-
Integration with other libraries: Seamlessly integrates with tools like
web-streams-extensions, making it versatile for different streaming needs. -
TypeScript support: Fully typed for a richer developer experience and better code quality.
Install sflow using npm or bun:
npm install sflow
# or if you are using bun
bun add sflowHere's a basic example of how to use sflow to process a stream:
import { sflow } from "sflow";
async function run() {
let result = await sflow([1, 2, 3, 4])
.map((n) => n * 2)
.log() // this stage prints 2, 4, 6, 8
.filter((n) => n > 4)
.log() // this stage prints 6, 8
.reduce((a, b) => a + b, 0) // first emit 0+6=6, second emit 0+6+8=14
.log() // this stage prints 6, 14
.toArray();
console.log(result); // Outputs: [6, 14]
}
await run();sflow excels at handling async operations in pipelines - perfect for web applications:
import { sflow } from "sflow";
// Example: Fetch user data and process concurrently
async function processUsers() {
const result = await sflow([1, 2, 3, 4, 5])
// Every method accepts async functions!
.map(async (userId) => {
const response = await fetch(`https://api.example.com/users/${userId}`);
return response.json();
})
.filter(async (user) => {
// Async filtering for complex checks
const isActive = await checkUserStatus(user.id);
return isActive;
})
.map(async (user) => {
// Transform with async operations
const profile = await enrichUserProfile(user);
return profile;
})
.toArray();
console.log(result); // Array of processed user profiles
}
async function checkUserStatus(id: number) {
// Simulate async check
return id % 2 === 0;
}
async function enrichUserProfile(user: any) {
// Simulate async enrichment
return { ...user, enriched: true };
}Initialize a flow from various types of data sources:
import { sflow } from "sflow";
// From an array
const flow1 = sflow([1, 2, 3, 4]);
// From a promise
const flow2 = sflow(Promise.resolve([1, 2, 3, 4]));
// From an async iterable
async function* asyncGenerator() {
yield 1;
yield 2;
yield 3;
}
const flow3 = sflow(asyncGenerator());Transform your flow with various transformation methods. All methods support async functions:
// Synchronous mapping
flow1.map((n) => n * 2);
// Async mapping - great for API calls, database queries, etc.
flow1.map(async (n) => {
const data = await fetchData(n);
return data.value * 2;
});
// Synchronous filtering
flow1.filter((n) => n % 2 === 0);
// Async filtering - perfect for complex validation
flow1.filter(async (n) => {
const isValid = await validateAsync(n);
return isValid;
});
// Async reducing
flow1.reduce(async (a, b) => {
const result = await computeAsync(a, b);
return result;
}, 0);sflow provides methods for chunking, buffering, and grouping data:
// Chunking by count
flow1.chunk(2); // [[1, 2], [3, 4]]
// Buffering within a time interval
flow1.chunkByInterval(1000);
// Custom chunking
flow1.chunkBy((x) => Math.floor(x / 2));sflow comes with a plethora of utilities to manipulate streams efficiently:
// Throttling
flow1.throttle(100);
// Debouncing
flow1.debounce(200);
// Converting to array
flow1.toArray();
// Merging multiple streams
const mergedFlow = sflow([flow1, flow2]).merge();
// Use chunkIf to split tokens by line
await sflow("a,b,c\n\n1,2,3\nd,s,f".split(""))
.through(chunkIfs((e: string) => e.indexOf("\n") === -1))
.map((chars) => chars.join(""))
.toArray(); // ["a,b,c\n",'\n', "1,2,3\n", "d,s,f"]With TypeScript, sflow ensures your transformations are type-safe:
import { sflow } from "sflow";
const typedFlow = sflow([{ a: 1, b: [1, 2, 3] }])
.unwind("b") // Use `unwind` for objects with nested arrays
.mapAddField("newField", (item) => item.a + item.b);Contributions to sflow are always welcome! If you have any ideas, suggestions, or bug reports, feel free to open an issue on GitHub or submit a pull request.
Try to create your first PR start from here! https://github.dev/snomiao/sflow
- Join sflow Community by Post Comments here:
- Infinite Streams with Elixir
- web-streams-extensions - npm
- $unwind (aggregation) - MongoDB Manual v7.0
- Introducing sflow: A New Era of Web Stream Processing for TypeScript/JavaScript Developers! : r/javascript
sflow is released under the MIT License. See the LICENSE file for more details.
sflow aims to simplify stream processing and bring functional programming paradigms to modern JavaScript and TypeScript development. Happy streaming! π