Security News
The Risks of Misguided Research in Supply Chain Security
Snyk's use of malicious npm packages for research raises ethical concerns, highlighting risks in public deployment, data exfiltration, and unauthorized testing.
Node stream multiplexing with back-pressure on each stream.
stream.Duplex
over a carrier Duplex
.http2
.The API is described here.
Multiplexing multiple streams over a single TCP stream:
var net = require('net'),
crypto = require('crypto'),
assert = require('assert'),
BPMux = require('bpmux').BPMux,
sent = [];
net.createServer(function (c)
{
var received = [], ended = 0;
new BPMux(c).on('handshake', function (duplex)
{
var accum = '';
duplex.on('readable', function ()
{
var data = this.read();
if (data)
{
accum += data.toString('hex');
}
});
duplex.on('end', function ()
{
received.push(accum);
ended += 1;
assert(ended <= 10);
if (ended === 10)
{
assert.deepEqual(received.sort(), sent.sort());
}
});
});
}).listen(7000, function ()
{
var mux = new BPMux(net.createConnection(7000)), i;
function multiplex(n)
{
var data = crypto.randomBytes(n * 100);
mux.multiplex().end(data);
sent.push(data.toString('hex'));
}
for (i = 1; i <= 10; i += 1)
{
multiplex(i);
}
});
Multiple return pipes to the browser, multiplexed over a single Primus connection:
var PrimusDuplex = require('primus-backpressure').PrimusDuplex,
BPMux = require('bpmux').BPMux,
http = require('http'),
path = require('path'),
crypto = require('crypto'),
stream = require('stream'),
assert = require('assert'),
finalhandler = require('finalhandler'),
serve_static = require('serve-static'),
Primus = require('primus'),
serve = serve_static(__dirname);
http.createServer(function (req, res)
{
serve(req, res, finalhandler(req, res));
}).listen(7500, function ()
{
var primus = new Primus(this);
primus.on('connection', function (spark)
{
var mux = new BPMux(new PrimusDuplex(spark)), ended = 0, i;
function multiplex(n)
{
var buf = crypto.randomBytes(10 * 1024),
buf_stream = new stream.PassThrough(),
bufs = [],
duplex = mux.multiplex({ handshake_data: Buffer.from([n]) });
buf_stream.end(buf);
buf_stream.pipe(duplex);
duplex.on('readable', function ()
{
var data;
while (true)
{
data = this.read();
if (data === null)
{
break;
}
bufs.push(data);
}
});
duplex.on('end', function ()
{
console.log('end', n);
ended += 1;
assert(ended <= 10);
assert.deepEqual(Buffer.concat(bufs), buf);
});
}
for (i = 0; i < 10; i += 1)
{
multiplex(i);
}
});
console.log('Point your browser to http://localhost:7500/loader.html');
});
The HTML (loader.html
) for the browser-side of this example:
<html>
<head>
<title>BPMux Test Runner</title>
<script type="text/javascript" src="/primus/primus.js"></script>
<script type="text/javascript" src="bundle.js"></script>
<script type="text/javascript" src="loader.js"></script>
</head>
<body onload='doit()'>
</body>
</html>
The browser-side code (loader.js
):
function doit()
{
var mux = new BPMux(new PrimusDuplex(new Primus({ strategy: false })));
mux.on('handshake', function (duplex, handshake_data)
{
console.log("handshake", handshake_data[0]);
duplex.pipe(duplex);
duplex.on('end', function ()
{
console.log('end', handshake_data[0]);
});
});
}
The browser-side dependencies (bundle.js
) can be produced by webpack from:
PrimusDuplex = require('primus-backpressure').PrimusDuplex;
BPMux = require('bpmux').BPMux;
Multiplexing libraries which don't exert backpressure on individual streams suffer from starvation. A stream which doesn't read its data stops other streams on the multiplex getting their data.
Here's a test using the multiplex library:
// Uses https://github.com/maxogden/multiplex (npm install multiplex)
// Backpressure is exerted across the multiplex as a whole, not individual streams.
// This means a stream which doesn't read its data starves the other streams.
const fs = require('fs');
const net = require('net');
const multiplex = require('multiplex');
require('net').createServer(c => {
c.pipe(multiplex((stream, id) => {
stream.on('data', function(d) {
console.log('data', id, d.length);
if (id === '0') {
this.pause();
}
});
}));
}).listen(7000, () => {
const plex = multiplex();
plex.pipe(net.createConnection(7000));
const stream1 = plex.createStream();
const stream2 = plex.createStream();
fs.createReadStream('/dev/urandom').pipe(stream1);
fs.createReadStream('/dev/urandom').pipe(stream2);
});
When the first stream is paused, backpressure is applied to the second stream too, even though it hasn't been paused. If you run this example, you'll see:
$ node multiplex.js
data 0 65536
data 1 65536
BPMux doesn't suffer from this problem since backpressure is exerted on each stream separately. Here's the same test:
// BPMux exerts backpressure on individual streams so a stream which doesn't
// read its data doesn't starve the other streams.
const fs = require('fs');
const net = require('net');
const { BPMux } = require('bpmux');
require('net').createServer(c => {
new BPMux(c).on('handshake', stream => {
stream.on('data', function (d) {
console.log('data', stream._chan, d.length);
if (stream._chan === 0) {
this.pause();
}
});
});
}).listen(7000, () => {
const mux = new BPMux(net.createConnection(7000));
const stream1 = mux.multiplex();
const stream2 = mux.multiplex();
fs.createReadStream('/dev/urandom').pipe(stream1);
fs.createReadStream('/dev/urandom').pipe(stream2);
});
The second stream continues to receive data when the first stream is paused:
data 0 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
...
npm install bpmux
Over TCP (long test):
grunt test
Over TCP (quick test):
grunt test-fast
Over Primus (using nwjs to run browser- and server-side):
grunt test-browser
The examples at the top of this page:
grunt test-examples
grunt coverage
Instanbul results are available here.
Coveralls page is here.
grunt lint
Constructor for a
BPMux
object which multiplexes more than onestream.Duplex
over a carrierDuplex
.
Parameters:
{Duplex} carrier
The Duplex
stream over which other Duplex
streams will be multiplexed.{Object} [options]
Configuration options. This is passed down to frame-stream
. It also supports the following additional properties:
{Object} [peer_multiplex_options]
When your BPMux
object detects a new multiplexed stream from the peer on the carrier, it creates a new Duplex
and emits a peer_multiplex
event. When it creates the Duplex
, it uses peer_multiplex_options
to configure it with the following options:
{Integer} [max_write_size]
Maximum number of bytes to write to the Duplex
at once, regardless of how many bytes the peer is free to receive. Defaults to 0 (no limit).
{Boolean} [check_read_overflow]
Whether to check if more data than expected is being received. If true
and the Duplex
's high-water mark for reading is exceeded then the Duplex
emits an error
event. This should not normally occur unless you add data yourself using readable.unshift
— in which case you should set check_read_overflow
to false
. Defaults to true
.
{Function} [parse_handshake_data(handshake_data)]
When a new stream is multiplexed, the BPMux
objects at each end of the carrier exchange a handshake message. You can supply application-specific handshake data to add to the handshake message (see BPMux.prototype.multiplex
and BPMux.events.handshake
). By default, when handshake data from the peer is received, it's passed to your application as a raw Buffer
. Use parse_handshake_data
to specify a custom parser. It will receive the Buffer
as an argument and should return a value which makes sense to your application.
{Boolean} [coalesce_writes]
Whether to batch together writes to the carrier. When the carrier indicates it's ready to receive data, its spare capacity is shared equally between the multiplexed streams. By default, the data from each stream is written separately to the carrier. Specify true
to write all the data to the carrier in a single write. Depending on the carrier, this can be more performant.
{Boolean} [high_channels]
BPMux
assigns unique channel numbers to multiplexed streams. By default, it assigns numbers in the range [0..2^31). If your application can synchronise the two BPMux
instances on each end of the carrier stream so they never call multiplex
at the same time then you don't need to worry about channel number clashes. For example, one side of the carrier could always call multiplex
and the other listen for handshake
events. Or they could take it in turns. If you can't synchronise both sides of the carrier, you can get one side to use a different range by specifying high_channels
as true
. The BPMux
with high_channels
set to true
will assign channel numbers in the range [2^31..2^32).
{Integer} [max_open]
Maximum number of multiplexed streams that can be open at a time. Defaults to 0 (no maximum).
{Integer} [max_header_size]
BPMux
adds a control header to each message it sends, which the receiver reads into memory. The header is of variable length — for example, handshake messages contain handshake data which can be supplied by the application. max_header_size
is the maximum number of header bytes to read into memory. If a larger header is received, BPMux
emits an error
event. Defaults to 0 (no limit).
{Integer|false}
keep_alive
Send a single byte keep-alive message every N milliseconds. Defaults to 30000 (30 seconds). Pass false
to disable.
Go: TOC
Multiplex a new
stream.Duplex
over the carrier.
Parameters:
{Object} [options]
Configuration options:
{Buffer} [handshake_data]
Application-specific handshake data to send to the peer. When a new stream is multiplexed, the BPMux
objects at each end of the carrier exchange a handshake message. You can optionally supply handshake data to add to the handshake message here. The peer application will receive this when its BPMux
object emits a handshake
event. Defaults to a zero-length Buffer
.
{Integer} [max_write_size]
Maximum number of bytes to write to the Duplex
at once, regardless of how many bytes the peer is free to receive. Defaults to 0 (no limit).
{Boolean} [check_read_overflow]
Whether to check if more data than expected is being received. If true
and the Duplex
's high-water mark for reading is exceeded then the Duplex
emits an error
event. This should not normally occur unless you add data yourself using readable.unshift
— in which case you should set check_read_overflow
to false
. Defaults to true
.
{Integer} [channel]
Unique number for the new stream. BPMux
identifies each multiplexed stream by giving it a unique number, which it allocates automatically. If you want to do the allocation yourself, specify a channel number here. It's very unlikely you'll need to do this but the option is there. Duplex
objects managed by BPMux
expose a get_channel
method to retrieve their channel number. Defaults to automatic allocation.
Return:
{Duplex}
The new Duplex
which is multiplexed over the carrier. This supports back-pressure using the stream readable
event and write
method.
Throws:
{Error}
If there are no channel numbers left to allocate to the new stream, the maximum number of open multiplexed streams would be exceeded or the carrier has finished or ended.Go: TOC | BPMux.prototype
peer_multiplex
event
A BPMux
object emits a peer_multiplex
event when it detects a new multiplexed stream from its peer on the carrier stream.
Parameters:
{Duplex} duplex
The new stream.Go: TOC | BPMux.events
handshake
event
A BPMux
object emits a handshake
event when it receives a handshake message from its peer on the carrier stream. This can happen in two cases:
BPMux
object is processing a handshake message for a new multiplexed stream the peer created and it hasn't seen before. Note the handshake
event is emitted after the peer_multiplex
event.multiplex
on its BPMux
object to multiplex a new stream over the carrier and now the peer has replied with a handshake message.Parameters:
{Duplex} duplex
The multiplexed stream for which a handshake message has been received. Please note that a handshake
event is also emitted on duplex
immediately after BPMux
's handshake
event finishes processing. duplex
's handshake
event is passed the same handshake_data
and delay_handshake
parameters decribed below.
{Object} handshake_data
Application-specific data which the peer sent along with the handshake message. If you specified a parse_handshake_data
function in the BPMux
constructor then handshake_data
will be the return value from calling that function.
{Function} [delay_handshake]
This parameter will be null
in case 2 (your application previously created duplex
). Otherwise (case 1), this parameter will be a function. By default, the BPMux
object replies to the peer's handshake message as soon as your event handler returns and doesn't attach any application-specific handshake data. If you wish to delay the handshake message or provide handshake data, call delay_handshake
. It returns another function which you can call at any time to send the handshake message. The returned function takes a single argument:
{Buffer} [handshake_data]
Application-specific handshake data to attach to the handshake message sent to the peer. Defaults to a zero-length Buffer
.Go: TOC | BPMux.events
handshake_sent
event
A BPMux
object emits a handshake_sent
event after it sends a handshake message to its peer on the carrier stream.
Parameters:
{Duplex} duplex
The multiplexed stream for which a handshake has been sent. Please note that a handshake_sent
event is also emitted on duplex
immediately after BPMux
's handshake
event finishes processing. duplex
's handshake_sent
event is passed the same complete
parameter described below.{Boolean} complete
Whether the handshake message was completely sent (true
) or the carrier stream buffered it (false
). You can use this to apply back-pressure to stream multiplexing. For example, if complete
is false
then you could avoid calling multiplex
until a drain
event is emitted.Go: TOC | BPMux.events
drain
event
A BPMux
object emits a drain
event when its carrier stream emits a
drain
event.
Go: TOC | BPMux.events
end
event
A BPMux
object emits a end
event after the carrier stream ends (will receive
no more data).
Go: TOC | BPMux.events
finish
event
A BPMux
object emits a finish
event after the carrier stream finishes (won't
write any more data).
Go: TOC | BPMux.events
full
event
A BPMux
object emits a full
event when it wants to add a new multiplexed stream on the carrier stream but the number of multiplexed streams is at its maximum. It will remain at maximum until a removed
event is emitted.
Go: TOC | BPMux.events
removed
event
A BPMux
object emits a removed
event when a multiplexed stream has closed
(finished and ended) and been removed from the list of multiplexed streams.
Parameters:
{Duplex} duplex
The stream which has closed.Go: TOC | BPMux.events
keep_alive
event
A BPMux
object emits a keep_alive
event when it receives a keep-alive
message from its peer.
Go: TOC | BPMux.events
—generated by apidox—
FAQs
Node stream multiplexing with back-pressure on each stream
We found that bpmux demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Snyk's use of malicious npm packages for research raises ethical concerns, highlighting risks in public deployment, data exfiltration, and unauthorized testing.
Research
Security News
Socket researchers found several malicious npm packages typosquatting Chalk and Chokidar, targeting Node.js developers with kill switches and data theft.
Security News
pnpm 10 blocks lifecycle scripts by default to improve security, addressing supply chain attack risks but sparking debate over compatibility and workflow changes.