
Product
Introducing Tier 1 Reachability: Precision CVE Triage for Enterprise Teams
Socket’s new Tier 1 Reachability filters out up to 80% of irrelevant CVEs, so security teams can focus on the vulnerabilities that matter.
@pipes/utils
Advanced tools
The pipesjs/utils
module is to web streams
what highland.js
is to node streams
.
It contains utility functions to make working with web streams
a lot easier.
For more about Web Streams
, refer to the spec.
The utils
module is to web streams
what highland.js
is to node streams
. It contains utility functions to make working with web streams
a lot easier. Here's more about Web Streams
from the spec itself:
Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of APIs for creating and interfacing with such streaming data, embodied in readable streams, writable streams, and transform streams.
The spec is still evolving but has reached a fairly stable stage with a reference implementation as well. The API has almost been finalized and Stream
s are coming to the web very soon!
At it's core, the API exposes three major components:
ReadableStream
encapsulates a source producing values and emits them.TransformStream
are essentially { readable, writable}
pairs that take a function which can be used to transform the values flowing through it.WritableStream
encapsulates a sink that receives values and writes to it.Stream
s are essentially data structures that handle sequential flow of values. You can split streams, merge them and connect them together in various ways. What's amazing is that, in most cases, they can handle backpressure automatically, so you don't have to mess with the underlying details.
For further information, the spec is quite informative and easy to read. Jake Archibald also wrote a great blog post on them.
Heads up: If you're coming from node
land, web streams
are quite a lot different from node streams
and incompatible with each other.
The library depends on @pipes/core, so make sure you include it in before including the library.
You can use either of the builds from the dist
folder:
<script src="path/to/web-streams-polyfill.js"></script>
<script src="path/to/pipes.utils.js"></script>
And in your code, all the functions will be available on the window.Pipes.utils
variable.
let { uniq, compact } = window.Pipes.utils;
The library has a peer-dependency on @pipes/core, so to install it:
npm install @pipes/core @pipes/utils
The library is split up into modules, so you can both require the whole library or only parts of it:
let { compact } = require("@pipes/utils");
let compact = require("@pipes/utils/compact");
If you want, you can directly import the es6 modules like so:
import pipesUtils from "@pipes/utils/src";
import { compact } from "@pipes/utils/src";
import compact from "@pipes/utils/src/compact";
The utils
library only consists of the following functions:
Set up code for examples
// Setup
let createReadable = data => new ReadableStream({
start (controller) {
this.data = data || [1,2,3];
// Kickstart stream
controller.enqueue( this.data.pop() );
},
pull (controller) {
if ( !this.data.length )
return controller.close()
controller.enqueue( this.data.pop() );
}
}),
createWritable = () => new WritableStream({
write (chunk) {
console.log( chunk );
}
});
This function takes an int n
and returns a transform stream
that batches the incoming values in arrays of lengths no
more than n
.
Parameters
size
numberExamples
let
input = [1,2,3,4,5],
expected = [[1,2],[3,4],[5]];
let readable, writable, res=[];
// Create test streams
readable = createTestReadable( input );
writable = createTestWritable( c => res.push( c ));
// Connect the streams
connect(
readable,
batch( 2 ),
writable
); // res == expected
Returns TransformStream
This function returns a transform stream that spits out only truthy values from the input stream.
Examples
let readable, writable,
count = 0;
// Create test streams
readable = createTestReadable( [true, false, 0, "", "hello", 1] );
writable = createTestWritable( () => count++ );
// Connect the streams
connect(
readable,
compact(),
writable
); // count == 3
This function takes an iterable
as argument and returns
a readable stream that repeatedly emits values generated by the emitter.
Examples
let readable, writable, values=[1,2,3], sum=0;
// Create test streams
readable = cycle( values );
writable = createTestWritable( c => { sum+=c });
// Connect the streams
const expected = 2 * values.reduce( (a, b) => a+b );
connect(
readable,
take( 2 * values.length ),
writable
); // sum == expected
This function takes an int n
and returns a
transform stream
that debounces the incoming values by n
ms,
only producing values with n
ms delay between them and dropping the rest.
Parameters
Returns ReadableWritable
This function takes an int n
and returns a transform stream
that drops the first n
values from the input stream.
Parameters
count
numberExamples
let readable, writable,
count = 0;
// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( () => count++ );
// Connect the streams
connect(
readable,
drop(3),
writable
); // count == 3
Returns ReadableWritable
This function takes a predicate function as argument and returns
a transform stream
that only emits values that satisfy the predicate.
Parameters
pred
function (any?): booleanExamples
let readable, writable;
// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( c => assert( c > 3 ) );
// Connect the streams
connect(
readable,
filter( a => a > 3 ),
writable
);
Returns ReadableWritable
This function returns a transform stream
that takes the first value
from the input stream and enqueues it on the output stream.
Examples
let readable, writable, el;
// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( e => { el = e; });
// Connect the streams
connect(
readable,
head(),
writable
); // el == 1
Returns ReadableWritable
This function takes any value a
and returns a transform stream
that intersperses the values from the input stream with the a
.
Parameters
val
anyExamples
let readable, writable,
res = [];
// Create test streams
readable = createTestReadable( [1,2,3] );
writable = createTestWritable( c => res.push(c) );
// Connect the streams
connect(
readable,
intersperse(0),
writable
); // res == [1,0,2,0,3]
Returns ReadableWritable
This function returns a transform stream
that takes
the last value from the input stream and enqueues it on the output stream.
Examples
let readable, writable, el;
// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( e => { el = e; });
// Connect the streams
connect(
readable,
last(),
writable
); // el == 6
Returns ReadableWritable
This function takes any number of string
s as arguments and
returns a transform stream
that extracts the passed property names from
incoming values.
Parameters
Examples
let readable, writable,
o = {
'a': 1,
'b': 2
};
// Create test streams
readable = createTestReadable( [o, o, o] );
writable = createTestWritable( c => assert( c.a ) && assert( !c.b ) );
// Connect the streams
connect(
readable,
pick('a'),
writable
);
Returns ReadableWritable
This function takes a string
as argument and returns
a transform stream
that extracts the passed property from
incoming values.
Parameters
prop
stringExamples
let readable, writable,
o = {
'a': 1,
'b': 2
};
// Create test streams
readable = createTestReadable( [o, o, o] );
writable = createTestWritable( c => assert( c == 1 ) );
// Connect the streams
connect(
readable,
pluck('a'),
writable
);
Returns ReadableWritable
This function takes a value as argument and returns a readable stream
that repeatedly emits that value.
Parameters
value
anyExamples
let readable, writable, val=1, len=6, sum=0;
// Create test streams
readable = repeat(val);
writable = createTestWritable( c => { sum+=c });
// Connect the streams
connect(
readable,
take( len ),
writable
); // sum == (val * len)
Returns ReadableWritable
This function takes a reducer function and an optional init
value
as arguments and returns a transform stream
that applies the
function to the incoming values and enqueues the accumulation of the results.
If an init
value is not passed, the first incoming value is treated as one.
Parameters
func
function ((T2? | T1?), T1?): T2init
T1?Examples
let readable, writable, res;
// Create test streams
readable = createTestReadable( [1,2,3] );
writable = createTestWritable( c => {
// Check last element is number
let n = c[c.length-1];
assert( n === +n );
res = c;
});
// Connect the streams
connect(
readable,
scan( add, 0 ),
writable
); // res[res.length-1], [1,2,3].reduce( add )
Returns ReadableWritable
This function takes an int m
and an int n
and returns
a transform stream
that drops the first m
values and
takes the next (m-n)
values from the input stream.
Parameters
Examples
let readable, writable,
count = 0;
// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( () => count++ );
// Connect the streams
connect(
readable,
slice(2,5),
writable
); // count == 3
Returns ReadableWritable
This function takes an int n
and returns a transform stream
that takes the first n
values from the input stream.
Parameters
count
numberExamples
let readable, writable,
count = 0;
// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( () => count++ );
// Connect the streams
connect(
readable,
take(3),
writable
); // count == 3
Returns ReadableWritable
This function takes a function as rgument and returns
a transform stream
that applies the function to the incoming
values before re-emitting them.
Parameters
func
anyFnExamples
let readable, writable;
// Create test streams
readable = createTestReadable( [1,2,3] );
writable = createTestWritable( c => assert( !Number.isNaN( c*1 ) ) );
// Connect the streams
connect(
readable,
tap( console.log.bind(console) ),
writable
);
Returns ReadableWritable
This function takes an int n
and returns a transform stream
that throttles the incoming values by n
ms, only producing
values every n
ms and dropping the rest.
Parameters
Returns TransformStream
This function returns a transform stream
that keeps only unique
values from the input stream and enqueues it on the output stream.
Examples
let readable, writable,
res = [];
// Create test streams
readable = createTestReadable( [1,1,2,2,3,3] );
writable = createTestWritable( c => res.push(c) );
// Connect the streams
connect(
readable,
uniq(),
writable
); // res == [1,2,3]
Returns ReadableWritable
FAQs
Basic utilities for web streams
The npm package @pipes/utils receives a total of 2 weekly downloads. As such, @pipes/utils popularity was classified as not popular.
We found that @pipes/utils demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket’s new Tier 1 Reachability filters out up to 80% of irrelevant CVEs, so security teams can focus on the vulnerabilities that matter.
Research
/Security News
Ongoing npm supply chain attack spreads to DuckDB: multiple packages compromised with the same wallet-drainer malware.
Security News
The MCP Steering Committee has launched the official MCP Registry in preview, a central hub for discovering and publishing MCP servers.