Security News
Introducing the Socket Python SDK
The initial version of the Socket Python SDK is now on PyPI, enabling developers to more easily interact with the Socket REST API in Python projects.
stream-chain
Advanced tools
The stream-chain npm package is designed to facilitate the creation and management of processing pipelines for streams. It allows you to chain together multiple stream processing steps in a flexible and efficient manner.
Creating a Stream Chain
This code demonstrates how to create a stream chain that parses JSON data, picks a specific part of the JSON, and then streams the values. The pipeline processes a JSON string and logs the value of the 'data' key.
const { chain } = require('stream-chain');
const { parser } = require('stream-json');
const { pick } = require('stream-json/filters/Pick');
const { streamValues } = require('stream-json/streamers/StreamValues');
const pipeline = chain([
parser(),
pick({ filter: 'data' }),
streamValues(),
]);
pipeline.on('data', (data) => {
console.log(data.value);
});
pipeline.write('{"data": {"key": "value"}}');
pipeline.end();
Combining Multiple Streams
This example shows how to combine multiple streams in a chain to process an array of JSON objects. The pipeline parses the JSON, picks the 'items' array, and then streams each value in the array.
const { chain } = require('stream-chain');
const { parser } = require('stream-json');
const { pick } = require('stream-json/filters/Pick');
const { streamValues } = require('stream-json/streamers/StreamValues');
const { streamArray } = require('stream-json/streamers/StreamArray');
const pipeline = chain([
parser(),
pick({ filter: 'items' }),
streamArray(),
streamValues(),
]);
pipeline.on('data', (data) => {
console.log(data.value);
});
pipeline.write('{"items": [{"key": "value1"}, {"key": "value2"}]}');
pipeline.end();
The through2 package is a tiny wrapper around Node.js streams.Transform. It simplifies the creation of transform streams, allowing you to easily process data as it passes through the stream. Unlike stream-chain, through2 focuses on creating individual transform streams rather than chaining multiple streams together.
Highland is a high-level stream library for Node.js that provides a more functional approach to working with streams. It allows you to create and manipulate streams using a variety of functional programming techniques. Highland offers more comprehensive stream manipulation capabilities compared to stream-chain, which is more focused on chaining existing streams.
Mississippi is a collection of useful stream utility modules that make working with streams easier. It includes modules for creating, combining, and consuming streams. Mississippi provides a broader set of utilities for stream management compared to stream-chain, which is specifically designed for chaining streams together.
stream-chain
creates a chain of streams out of regular functions, asynchronous functions, generator functions, and existing streams, while properly handling backpressure. The resulting chain is represented as a Duplex stream, which can be combined with other streams the usual way. It eliminates a boilerplate helping to concentrate on functionality without losing the performance especially make it easy to build object mode data processing pipelines.
Originally stream-chain
was used internally with stream-fork and stream-json to create flexible data processing pipelines.
stream-chain
is a lightweight, no-dependencies micro-package. It is distributed under New BSD license.
import chain from 'stream-chain';
// or: const chain = require('stream-chain');
import fs from 'fs';
import zlib from 'zlib';
import {Transform} from 'stream';
// this chain object will work on a stream of numbers
const pipeline = chain([
// transforms a value
x => x * x,
// returns several values
x => chain.many([x - 1, x, x + 1]),
// waits for an asynchronous operation
async x => await getTotalFromDatabaseByKey(x),
// returns multiple values with a generator
function* (x) {
for (let i = x; i > 0; --i) {
yield i;
}
return 0;
},
// filters out even values
x => x % 2 ? x : null,
// uses an arbitrary transform stream
new Transform({
objectMode: true,
transform(x, _, callback) {
callback(null, x + 1);
}
}),
// transform to strings
x => '' + x,
// compress
zlib.createGzip()
]);
// the chain object is a regular stream
// it can be used with normal stream methods
// log errors
pipeline.on('error', error => console.log(error));
// use the chain object, and save the result to a file
dataSource.pipe(pipeline).pipe(fs.createWriteStream('output.txt.gz'));
Making processing pipelines appears to be easy: just chain functions one after another, and we are done. Real life pipelines filter objects out and/or produce more objects out of a few ones. On top of that we have to deal with asynchronous operations, while processing or producing data: networking, databases, files, user responses, and so on. Unequal number of values per stage, and unequal throughput of stages introduced problems like backpressure, which requires algorithms implemented by streams.
While a lot of API improvements were made to make streams easy to use, in reality, a lot of boilerplate is required when creating a pipeline. stream-chain
eliminates most of it.
npm i --save stream-chain
# or: yarn add stream-chain
All documentation can be found in the wiki. It document in details the main function and various utilities and helpers that can simplify stream programming. Additionally it includes a support for JSONL (line-separated JSON files).
An object that is returned by chain()
is based on Duplex. It chains its dependents in a single pipeline optionally binding error
events.
Many details about this package can be discovered by looking at test files located in tests/
and in the source code (src/
).
chain(fns[, options])
The factory function accepts the following arguments:
fns
is an array of functions, arrays or stream instances.
chunk
(an object), and an optional encoding
. See Node's documentation for more details on those parameters.
undefined
or null
, no value shall be passed.// produces no values:
x => null
x => undefined
// produces one value:
x => x
then()
), it will be waited for. Its result should be a regular value.
// delays by 0.5s:
x => new Promise(
resolve => setTimeout(() => resolve(x), 500))
next()
), it will be iterated according to the generator protocol. The results should be regular values.
// produces multiple values:
class Nextable {
constructor(x) {
this.x = x;
this.i = -1;
}
next() {
return {
done: this.i <= 1,
value: this.x + this.i++
};
}
}
x => new Nextable(x)
next()
can return a Promise
according to the asynchronous generator protocol.// fails
x => { throw new Error('Bad!'); }
// delays by 0.5s:
async x => {
await new Promise(resolve => setTimeout(() => resolve(), 500));
return x;
}
// produces multiple values:
function* (x) {
for (let i = -1; i <= 1; ++i) {
if (i) yield x + i;
}
return x;
}
// produces multiple values:
async function* (x) {
for (let i = -1; i <= 1; ++i) {
if (i) {
await new Promise(resolve => setTimeout(() => resolve(), 50));
yield x + i;
}
}
return x;
}
'data'
event is not used in this case, the instance resumes itself automatically. Read about it in Node's documentation:
ReadableStream
or WritableStream
), it is adapted to a corresponding Node stream and included in the pipeline.
options
is an optional object detailed in the Node's documentation.
{writableObjectMode: true, readableObjectMode: true}
If options
is specified it is copied over the default options.writableObjectMode
is the same as the corresponding object mode of the first stream, and readableObjectMode
is the same as the corresponding object mode of the last stream.
skipEvents
is an optional boolean flag. If it is falsy (the default), 'error'
events from all streams are forwarded to the created instance. If it is truthy, no event forwarding is made. A user can always do so externally or in a constructor of derived classes.noGrouping
is an optional boolean flag. If it is falsy (the default), all subsequent functions are grouped together using the gen()
utility for improved performance. If it is specified and truthy, all functions will be wrapped as streams individually. This mode is compatible with how the 2.x version works.An instance can be used to attach handlers for stream events.
const pipeline = chain([x => x * x, x => [x - 1, x, x + 1]]);
pipeline.on('error', error => console.error(error));
dataSource.pipe(pipeline);
BSD-3-Clause
const
-ness in the async generator branch (thx Patrick Pang).take
, takeWhile
, skip
, skipWhile
, fold
, scan
, Reduce
, comp
.'finish'
event instead of _final()
.FAQs
Chain functions as transform streams.
We found that stream-chain demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
The initial version of the Socket Python SDK is now on PyPI, enabling developers to more easily interact with the Socket REST API in Python projects.
Security News
Floating dependency ranges in npm can introduce instability and security risks into your project by allowing unverified or incompatible versions to be installed automatically, leading to unpredictable behavior and potential conflicts.
Security News
A new Rust RFC proposes "Trusted Publishing" for Crates.io, introducing short-lived access tokens via OIDC to improve security and reduce risks associated with long-lived API tokens.