pulls
pulls - readable streams for Node.js and Browsers.
Naturally readable streams are everything that implements async iterable protocol or iterable protocol.
Streams are:
It's lightweight with 200LOC and fast - more than 2M/s throughput.
Basic usage examples
Consume array as stream for ex. and pipeline as map -> filter -> reduce
import {pipeline, stream} from "pulls";
import {map} from "pulls/map";
import {filter} from "pulls/filter";
import {reduce} from "pulls/reduce";
import {tap} from "pulls/tap";
import {sequence} from "pulls/sequence";
const expected = await pipeline(
stream([1, 2, 3]),
map((x) => x * 2),
filter((x) => x > 2),
reduce<number, number[]>((acc, value) => [...acc, value], []),
tap(console.log)
);
expect(expected).toEqual([4, 6])
One more example with async stream.
It takes 1s to calculate 1M dataset.
const expected = await pipeline(
sequence(1000000),
map((x) => x * 2),
filter((x) => x > 2),
reduce<number, number>((acc, value) => acc + value, 0),
tap(console.log)
);
expect(expected).toEqual(999998999998)
It's pipeline to consume async generator as stream as ex.
import {pipeline} from "pulls";
import {tap} from "pulls/tap";
async function* Generate123() {
yield 1
yield 2
yield Promise.resolve(3)
}
const expected: number[] = [];
const collect = (value:number) => expected.push(value);
await pipeline(
Generate123(),
tap<number>(collect)
)
expect(expected).toEqual([1, 2, 3])
Guide
TBD
Documentation
TBD
Stream examples
To get a point for a readable sources just check the information that I mentioned before. async iterable protocol or iterable protocol and more about iterating over async iterables. Or it could be enough to check examples here.
const asyncIterable = {
[Symbol.asyncIterator]() {
return {
i: 0,
next() {
if (this.i < 3) {
return Promise.resolve({ value: this.i++, done: false });
}
return Promise.resolve({ done: true });
}
};
}
};
const myAsyncIterable = {
async* [Symbol.asyncIterator]() {
yield "hello";
yield "async";
yield "iteration!";
}
};
let mySyncIterator = {
next: function() {
},
[Symbol.iterator]: function() { return this; }
};
const readableStream = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
const readableStream = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
import {Readable} from 'stream';
function* gen() {
yield 'One line\n';
yield 'Another line\n';
}
const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
import {Readable} from 'stream';
const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
Node.js streams cooperation
Node.js streams are readable and implements async iterable protocol and could be consumed in pipelines.
import {pipeline} from "pulls";
import {Readable} from 'stream';
function* gen() {
yield 'One line\n';
yield 'Another line\n';
}
const readableStream = Readable.from(gen(), {encoding: 'utf8'});
await pipeline(readableStream, )
Good article how to convert async iterable to Node.js readable and back could help you to manage Node.js streams.
TODOs
More information will come soon about transformations, composition, and stream management and scalability.