Security News
ESLint is Now Language-Agnostic: Linting JSON, Markdown, and Beyond
ESLint has added JSON and Markdown linting support with new officially-supported plugins, expanding its versatility beyond JavaScript.
stream-chain
Advanced tools
The stream-chain npm package is designed to facilitate the creation and management of processing pipelines for streams. It allows you to chain together multiple stream processing steps in a flexible and efficient manner.
Creating a Stream Chain
This code demonstrates how to create a stream chain that parses JSON data, picks a specific part of the JSON, and then streams the values. The pipeline processes a JSON string and logs the value of the 'data' key.
const { chain } = require('stream-chain');
const { parser } = require('stream-json');
const { pick } = require('stream-json/filters/Pick');
const { streamValues } = require('stream-json/streamers/StreamValues');
const pipeline = chain([
parser(),
pick({ filter: 'data' }),
streamValues(),
]);
pipeline.on('data', (data) => {
console.log(data.value);
});
pipeline.write('{"data": {"key": "value"}}');
pipeline.end();
Combining Multiple Streams
This example shows how to combine multiple streams in a chain to process an array of JSON objects. The pipeline parses the JSON, picks the 'items' array, and then streams each value in the array.
const { chain } = require('stream-chain');
const { parser } = require('stream-json');
const { pick } = require('stream-json/filters/Pick');
const { streamValues } = require('stream-json/streamers/StreamValues');
const { streamArray } = require('stream-json/streamers/StreamArray');
const pipeline = chain([
parser(),
pick({ filter: 'items' }),
streamArray(),
streamValues(),
]);
pipeline.on('data', (data) => {
console.log(data.value);
});
pipeline.write('{"items": [{"key": "value1"}, {"key": "value2"}]}');
pipeline.end();
The through2 package is a tiny wrapper around Node.js streams.Transform. It simplifies the creation of transform streams, allowing you to easily process data as it passes through the stream. Unlike stream-chain, through2 focuses on creating individual transform streams rather than chaining multiple streams together.
Highland is a high-level stream library for Node.js that provides a more functional approach to working with streams. It allows you to create and manipulate streams using a variety of functional programming techniques. Highland offers more comprehensive stream manipulation capabilities compared to stream-chain, which is more focused on chaining existing streams.
Mississippi is a collection of useful stream utility modules that make working with streams easier. It includes modules for creating, combining, and consuming streams. Mississippi provides a broader set of utilities for stream management compared to stream-chain, which is specifically designed for chaining streams together.
stream-chain
creates a chain of streams out of regular functions, asynchronous functions, generator functions, and existing streams, while properly handling backpressure. The resulting chain is represented as a Duplex stream, which can be combined with other streams the usual way. It eliminates a boilerplate helping to concentrate on functionality without losing the performance especially make it easy to build object mode data processing pipelines.
Originally stream-chain
was used internally with stream-fork and stream-json to create flexible data processing pipelines.
stream-chain
is a lightweight, no-dependencies micro-package. It is distributed under New BSD license.
const Chain = require('stream-chain');
const fs = require('fs');
const zlib = require('zlib');
const {Transform} = require('stream');
// the chain will work on a stream of number objects
const chain = new Chain([
// transforms a value
x => x * x,
// returns several values
x => [x - 1, x, x + 1],
// waits for an asynchronous operation
async x => await getTotalFromDatabaseByKey(x),
// returns multiple values with a generator
function* (x) {
for (let i = x; i > 0; --i) {
yield i;
}
return 0;
},
// filters out even values
x => x % 2 ? x : null,
// uses an arbitrary transform stream
new Transform({
writableObjectMode: true,
transform(x, _, callback) {
// transform to text
callback(null, x.toString());
}
}),
// compress
zlib.createGzip()
]);
// log errors
chain.on('error', error => console.log(error));
// use the chain, and save the result to a file
dataSource.pipe(chain).pipe(fs.createWriteStream('output.txt.gz'));
Making processing pipelines appears to be easy: just chain functions one after another, and we are done. Real life pipelines filter objects out and/or produce more objects out of a few ones. On top of that we have to deal with asynchronous operations, while processing or producing data: networking, databases, files, user responses, and so on. Unequal number of values per stage, and unequal throughput of stages introduced problems like backpressure, which requires algorithms implemented by streams.
While a lot of API improvements were made to make streams easy to use, in reality, a lot of boilerplate is required when creating a pipeline. stream-chain
eliminates most of it.
npm i --save stream-chain
# or: yarn add stream-chain
Chain
, which is returned by require('stream-chain')
, is based on Duplex. It chains its dependents in a single pipeline optionally binding error
events.
Many details about this package can be discovered by looking at test files located in tests/
and in the source code (index.js
).
new Chain(fns[, options])
The constructor accepts the following arguments:
fns
is an array of functions arrays or stream instances.
chunk
(an object), and an optional encoding
. See Node's documentation for more details on those parameters. The function will be called in the context of the created stream.
// produces no values:
x => []
// produces two values:
x => [x, x + 1]
// produces one array value:
x => [[x, x + 1]]
undefined
or null
, no value shall be passed.// produces no values:
x => null
x => undefined
// produces one value:
x => x
then()
), it will be waited for. Its result should be a regular value.
// delays by 0.5s:
x => new Promise(
resolve => setTimeout(() => resolve(x), 500))
next()
), it will be iterated according to the generator protocol. The results should be regular values.
// produces multiple values:
class Nextable {
constructor(x) {
this.x = x;
this.i = -1;
}
next() {
return {
done: this.i <= 1,
value: this.x + this.i++
};
}
}
x => new Nextable(x)
next()
can return a Promise
according to the asynchronous generator protocol.// fails
x => { throw new Error('Bad!'); }
// delays by 0.5s:
async x => {
await new Promise(resolve => setTimeout(() => resolve(), 500));
return x;
}
// produces multiple values:
function* (x) {
for (let i = -1; i <= 1; ++i) {
if (i) yield x + i;
}
return x;
}
// produces multiple values:
async function* (x) {
for (let i = -1; i <= 1; ++i) {
if (i) {
await new Promise(resolve => setTimeout(() => resolve(), 50));
yield x + i;
}
}
return x;
}
null
, undefined
, and arrays) are allowed
and passed without modifications. The last value is a subject to processing defined above for regular functions.
Chain.final(value)
(see below), it terminates the chain using
value
as the final value of the chain.Chain
instance ignores all possible writes to the front, and ends when the first stream ends.Chain
instance does not produce any output, and finishes when the last stream finishes.'data'
event is not used in this case, the instance resumes itself automatically. Read about it in Node's documentation:
options
is an optional object detailed in the Node's documentation.
options
is not specified, or falsy, it is assumed to be:
{writableObjectMode: true, readableObjectMode: true}
writableObjectMode
is the same as the corresponding object mode of the first stream, and readableObjectMode
is the same as the corresponding object mode of the last stream.
skipEvents
is an optional flag. If it is falsy (the default), 'error'
events from all streams are forwarded to the created instance. If it is truthy, no event forwarding is made. A user can always do so externally or in a constructor of derived classes.An instance can be used to attach handlers for stream events.
const chain = new Chain([x => x * x, x => [x - 1, x, x + 1]]);
chain.on('error', error => console.error(error));
dataSource.pipe(chain);
Following public properties are available:
streams
is an array of streams created by a constructor. Its values either Transform streams that use corresponding functions from a constructor parameter, or user-provided streams. All streams are piped sequentially starting from the beginning.input
is the beginning of the pipeline. Effectively it is the first item of streams
.output
is the end of the pipeline. Effectively it is the last item of streams
.Generally, a Chain
instance should be used to represent a chain:
const chain = new Chain([
x => x * x,
x => [x - 1, x, x + 1],
new Transform({
writableObjectMode: true,
transform(chunk, _, callback) {
callback(null, chunk.toString());
}
})
]);
dataSource
.pipe(chain);
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('output.txt.gz'));
But in some cases input
and output
provide a better control over how a data processing pipeline should be organized:
chain.output
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('output.txt.gz'));
dataSource.pipe(chain.input);
Please select what style you want to use, and never mix them together with the same object.
Following static methods are available:
chain(fns[, options)
is a helper factory function, which has the same arguments as the constructor and returns a Chain
instance.
const {chain} = require('stream-chain');
// simple
dataSource
.pipe(chain([x => x * x, x => [x - 1, x, x + 1]]));
// all inclusive
chain([
dataSource,
x => x * x,
x => [x - 1, x, x + 1],
zlib.createGzip(),
fs.createWriteStream('output.txt.gz')
])
final(value)
is a helper factory function, which can be used in by chained functions (see above the array of functions).
It returns a special value, which terminates the chain and uses the passed value as the result of the chain.
const {chain, final} = require('stream-chain');
// simple
dataSource
.pipe(chain([[x => x * x, x => 2 * x + 1]]));
// faster than [x => x * x, x => 2 * x + 1]
// final
dataSource
.pipe(chain([[
x => x * x,
x => final(x),
x => 2 * x + 1
]]));
// the same as [[x => x * x, x => x]]
// the same as [[x => x * x]]
// the same as [x => x * x]
// final as a terminator
dataSource
.pipe(chain([[
x => x * x,
x => final(),
x => 2 * x + 1
]]));
// produces no values, because the final value is undefined,
// which is interpreted as "no value shall be passed"
// see the doc above
// final() as a filter
dataSource
.pipe(chain([[
x => x * x,
x => x % 2 ? final() : x,
x => 2 * x + 1
]]));
// only even values are passed, odd values are ignored
// if you want to be really performant...
const none = final();
dataSource
.pipe(chain([[
x => x * x,
x => x % 2 ? none : x,
x => 2 * x + 1
]]));
many(array)
is a helper factory function, which is used to wrap arrays to be interpreted as multiple values returned from a function.
At the moment it is redundant: you can use a simple array to indicate that, but a naked array is being deprecated and in future versions it will be passed as is.
The thinking is that using many()
is better indicates the intention. Additionally, in the future versions it will be used by array of functions (see above).
const {chain, many} = require('stream-chain');
dataSource
.pipe(chain([x => many([x, x + 1, x + 2])]));
// currently the same as [x => [x, x + 1, x + 2]]
const
-ness in the async generator branch (thx Patrick Pang).take
, takeWhile
, skip
, skipWhile
, fold
, scan
, Reduce
, comp
.'finish'
event instead of _final()
.FAQs
Chain functions as transform streams.
The npm package stream-chain receives a total of 863,762 weekly downloads. As such, stream-chain popularity was classified as popular.
We found that stream-chain demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
ESLint has added JSON and Markdown linting support with new officially-supported plugins, expanding its versatility beyond JavaScript.
Security News
Members Hub is conducting large-scale campaigns to artificially boost Discord server metrics, undermining community trust and platform integrity.
Security News
NIST has failed to meet its self-imposed deadline of clearing the NVD's backlog by the end of the fiscal year. Meanwhile, CVE's awaiting analysis have increased by 33% since June.