lazy-pipeline
A super light-weight, tree-shakeable, reusable, lazy pipeline TypeScript library with functional APIs and no 3rd-party
dependencies.
lazy-pipeline
package is only
2.6KB minified and 1KB gzipped+minified, but with its tree-shakeable
design, the final package size will be reduced much more by the build tool.
Installation
npm i -S lazy-pipeline
Table of contents
What is a reusable, lazy pipeline?
Consider the following common pattern with arrays
const source = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
source
.map(value => value + 1)
.filter(value => value % 2 === 0)
.reduce((left, right) => left + right);
The combination of map
, filter
, and reduce
operations above constitutes a pipeline, each operation executes
eagerly as soon as it is added, but most notably, each operation runs as many times as there are elements.
The example above would look similar to the following using for-of
loops:
const source = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
const mappedValues = [];
for (const value of source) {
mappedValues.push(value + 1);
}
const filteredValues = [];
for (const value of mappedValues) {
if (value % 2 === 0) {
filteredValues.push(value);
}
}
let sum = 0;
for (const value of filteredValues) {
sum += value;
}
As you can see, it needs to iterate through the entire array every time an operation (map
, filter
, and reduce
) is
added which is not something that we want.
Now consider a similar pipeline using lazy-pipeline
APIs.
import { LazyPipeline } from 'lazy-pipeline';
import { filter, map, peek } from 'lazy-pipeline/operators';
import { sum } from 'lazy-pipeline/collectors';
const source = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
const result = LazyPipeline.from(source)
.add(
peek(value => console.log(value)),
map(value => value + 1),
filter(value => value % 2 === 0)
)
.collect(
sum()
);
console.log(result);
This second example is no different than the first one in terms of the final result, however, there are a few things
that distinguish it from the first example. First of all, you can add as many intermediate stages (map
, and filter
or any intermediate stages) to the pipeline, those stages won't be executed until collect()
is
called, that's the reason for its lazy evaluation nature, collect()
expects a terminal stage which is the final
stage to produce the result, these terminal stages' sole responsibility is to collect the remaining pipeline elements
into some final result container. Second of all, all stages (map
, filter
and sum
in this example) run in the same
iteration which is much more efficient as compared to the first example, the second example would look similar to the
following with a regular for-of
loop:
const source = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let sum = 0;
for (const value of source) {
console.log(value);
const mappedValue = value + 1;
if (mappedValue % 2 === 0) {
sum += mappedValue;
}
}
console.log(sum);
As you can see, we only need to iterate once per element to evaluate all of our operations which is obviously much more
efficient.
The most interesting bit of lazy-pipeline
module is its reusability feature. After constructing your pipeline, and
adding any number of intermediate stages to it, you can pass this pipeline object around and reuse it on as many
iterable data sources as desired, but keep it mind that after each call to collect()
to get the result, the pipeline
will be frozen to prevent any new stages from being added to it, to unfreeze it, simply call unfreeze()
on the same
pipeline instance, for example:
import { LazyPipeline } from 'lazy-pipeline';
import { filter, map, skip } from 'lazy-pipeline/operators';
import { findFirst, findLast, max, min } from 'lazy-pipeline/collectors';
const source1 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
const pipeline = LazyPipeline.from(source1).add(
map(value => value + 1),
filter(value => value % 2 === 0)
);
const maxNumber = pipeline.collect(
max()
);
pipeline.resume();
pipeline.unfreeze();
const minNumber = pipeline
.add(
limit(5)
)
.collect(
min()
);
pipeline.resume();
pipeline.unfreeze();
const firstElement = pipeline
.add(
skip(2)
)
.collect(
findFirst()
);
pipeline.resume();
const source2 = [1, 10, 100, 1000, 10000];
pipeline.readFrom(source2);
const lastElement = pipeline.collect(
findLast()
);
Once again, all stages (1)
, (2)
, (3)
, (4)
and (5)
from the example above will all run in the same iteration.
Terminologies
- Pipeline: A series of stages that execute in order and in the same iteration, and end
with a terminal stage that returns a result. Once the pipeline has been consumed by triggering a terminal stage with
collect()
, it cannot be consumed again until resume()
is called, otherwise, the pipeline will throw an error to
indicate as such. - Stage: A particular operation that executes on each element in the pipeline.
- Intermediate stage: A stage that performs some transformation on each element
in the pipeline and forwards the new element to the next stage in line downstream. The built-in intermediate stages'
creators are located inside operators module.
- Terminal stage: The final stage that collects all remaining elements in the
pipeline to produce a result. Each pipeline doesn't execute until a terminal stage is triggered by calling
collect()
on the pipeline instance which expects a TerminateStage
object. Terminal stages' creators are located inside
collectors module. - Collector: A function that returns a new object instance of type
TerminalStage
. - Operator: A function that returns a new object instance of type
IntermediateStage
.
Advantages of tree-shakeable, functional APIs
All of the stages (intermediate and terminal) are created using standalone functions, this not only makes the APIs
easier to use, it also allows bundling tools to effortlessly tree-shake those functions that are not imported into your
application. Additionally, with a functional programming model, it's straightforward to extend this library with your
own operators and collectors.
How to add a new operator
To add a new operator, create a function that returns a new object instance of type
IntermediateStage
each time it's called.
As an example, let's create an operator that increments each number in the pipeline by some provided value:
import { IntermediateStage } from 'lazy-pipeline/stages';
class IncrementByStage<IN extends number> extends IntermediateStage<IN, number> {
constructor(private readonly _valueToIncrementBy: number) {
super();
}
override consume(element: IN, hasMoreDataUpstream: boolean): void {
this._throwErrorIfNotNumber(element);
this._downstream.consume(element + this._valueToIncrementBy, hasMoreDataUpstream);
}
private _throwErrorIfNotNumber(element: IN) {
if (typeof element !== 'number') {
throw new Error(
`[incrementBy() operator] Numbers expected, erroneous pipeline element received was ${JSON.stringify(element)}`
);
}
}
}
export function incrementBy<IN extends number>(value: number): IntermediateStage<IN, number> {
if (typeof value !== 'number') {
throw new Error(
`[incrementBy() operator] The provided value to increment by is not a number, it was ${JSON.stringify(value)}`
);
}
return new IncrementByStage<IN>(value);
}
To use the new operator, simply add it to the pipeline, for example:
import { LazyPipeline } from 'lazy-pipeline';
import { sum } from 'lazy-pipeline/collectors';
import { incrementBy } from './incrementBy';
const result = LazyPipeline.from([1, 2, 3, 4, 5]).add(incrementBy(10)).collect(sum());
To view all built-in operators, please see the operators module
How to add a new collector
To add a new collector, create a function that returns a new object instance of type
TerminalStage
each time it's called. Each terminal stage must additionally
implement get()
method to return the final result when the pipeline terminates.
As an example, let's create a collector that multiplies all numbers in the pipeline:
import { TerminalStage } from 'lazy-pipeline/stages';
class MultiplyStage<IN extends number> extends TerminalStage<IN, number> {
private _product = 1;
override consume(element: IN, hasMoreDataUpstream: boolean): void {
this._throwErrorIfNotNumber(element);
this._product *= element;
}
private _throwErrorIfNotNumber(element: IN) {
if (typeof element !== 'number') {
throw new Error(
`[multiply() collector] Numbers expected, erroneous pipeline element received was ${JSON.stringify(element)}`
);
}
}
override get(): number {
return this._product;
}
}
export function multiply<IN extends number>(): IntermediateStage<IN, number> {
return new MultiplyStage<IN>(value);
}
To use the new collector, simply provide it to collect()
as followed:
import { LazyPipeline } from 'lazy-pipeline';
import { filter, map } from 'lazy-pipeline/operators';
import { multiply } from './multiply';
const result = LazyPipeline.from([1, 2, 3, 4, 5])
.add(
filter(e => e % 2 === 0),
map(e => e * e)
)
.collect(multiply());
To view all built-in collectors, please see the collectors module