New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

bpmux

Package Overview
Dependencies
Maintainers
1
Versions
65
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bpmux - npm Package Compare versions

Comparing version 8.1.3 to 8.2.0

test/comparison/http2.js

8

coverage/lcov-report/block-navigation.js

@@ -66,2 +66,10 @@ /* eslint-disable */

return function jump(event) {
if (
document.getElementById('fileSearch') === document.activeElement &&
document.activeElement != null
) {
// if we're currently focused on the search input, we don't want to navigate
return;
}
switch (event.which) {

@@ -68,0 +76,0 @@ case 78: // n

@@ -27,2 +27,27 @@ /* eslint-disable */

function onFilterInput() {
const searchValue = document.getElementById('fileSearch').value;
const rows = document.getElementsByTagName('tbody')[0].children;
for (let i = 0; i < rows.length; i++) {
const row = rows[i];
if (
row.textContent
.toLowerCase()
.includes(searchValue.toLowerCase())
) {
row.style.display = '';
} else {
row.style.display = 'none';
}
}
}
// loads the search box
function addSearchBox() {
var template = document.getElementById('filterTemplate');
var templateClone = template.content.cloneNode(true);
templateClone.getElementById('fileSearch').oninput = onFilterInput;
template.parentElement.appendChild(templateClone);
}
// loads all columns

@@ -166,2 +191,3 @@ function loadColumns() {

loadData();
addSearchBox();
addSortIndicators();

@@ -168,0 +194,0 @@ enableUI();

118

Gruntfile.js

@@ -13,2 +13,4 @@ /*eslint-env node */

const c8 = "npx c8 -x Gruntfile.js -x 'test/**'";
module.exports = function (grunt)

@@ -32,2 +34,9 @@ {

apidox: {
input: [ 'index.js', 'events_doc.js' ],
output: 'README.md',
fullSourceDescription: true,
extraHeadingLevels: 1
},
mochaTest: {

@@ -39,3 +48,4 @@ default: {

'test/test_inline_stream.js',
'test/test_http2.js'
'test/test_http2.js',
'test/test_http2_session.js'
],

@@ -54,64 +64,24 @@ options: {

apidox: {
input: [ 'index.js', 'events_doc.js' ],
output: 'README.md',
fullSourceDescription: true,
extraHeadingLevels: 1
},
shell: {
cover: {
command: "./node_modules/.bin/nyc -x Gruntfile.js -x 'test/**' ./node_modules/.bin/grunt test-fast",
options: {
execOptions: {
maxBuffer: Infinity
}
}
},
cover_report: {
command: './node_modules/.bin/nyc report -r lcov'
},
cover_check: {
command: './node_modules/.bin/nyc check-coverage --statements 100 --branches 100 --functions 100 --lines 100'
},
nw_build: {
command: [
'rsync -a node_modules test/fixtures/nw --exclude nw-builder',
'NODE_ENV=test ./node_modules/.bin/babel --config-file ./test/fixtures/nw/.babelrc.json test/fixtures/nw/node_modules/http2-duplex/server.js --out-file test/fixtures/nw/node_modules/http2_duplex_server.js --source-maps',
'cp index.js test/fixtures/nw/node_modules/bpmux.js',
'cp test/test_browser.js test/fixtures/nw/node_modules',
'cp test/test_comms.js test/fixtures/nw/node_modules',
'cp test/fixtures/webpack/bundle.js test/fixtures/nw',
'mkdir -p test/fixtures/nw/node_modules/fixtures',
'touch test/fixtures/nw/node_modules/fixtures/keep',
'mkdir -p test/fixtures/nw/node_modules/certs',
'cp test/certs/server.* test/fixtures/nw/node_modules/certs',
'./node_modules/.bin/nwbuild --quiet -p linux64 test/fixtures/nw'
].join('&&')
},
bpmux_test: {
command: 'export TEST_ERR_FILE=/tmp/test_err_$$; ./build/bpmux-test/linux64/bpmux-test; if [ -f $TEST_ERR_FILE ]; then exit 1; fi',
options: {
execOptions: {
maxBuffer: Infinity
}
}
},
bundle: {
command: './node_modules/.bin/webpack --mode production --config test/webpack.config.js'
},
bundle_example: {
command: './node_modules/.bin/webpack --mode production --config test/webpack.example.config.js'
},
certs: {
command: 'if [ ! -f test/certs/server.crt ]; then ./test/certs/make_ca_cert.sh && ./test/certs/make_server_cert.sh; fi'
}
}
exec: Object.fromEntries(Object.entries({
cover: `${c8} grunt test-fast`,
cover_report: `${c8} report -r lcov`,
cover_check: `${c8} check-coverage --statements 100 --branches 100 --functions 100 --lines 100`,
nw_build: [
'rsync -aL node_modules test/fixtures/nw --exclude nw-builder --exclude nwbuild',
'NODE_ENV=test npx babel --config-file ./test/fixtures/nw/.babelrc.json test/fixtures/nw/node_modules/http2-duplex/server.js --out-file test/fixtures/nw/node_modules/http2_duplex_server.js --source-maps',
'cp index.js test/fixtures/nw/node_modules/bpmux.js',
'cp test/test_browser.js test/fixtures/nw/node_modules',
'cp test/test_comms.js test/fixtures/nw/node_modules',
'cp test/fixtures/webpack/bundle.js test/fixtures/nw',
'mkdir -p test/fixtures/nw/node_modules/fixtures',
'touch test/fixtures/nw/node_modules/fixtures/keep',
'mkdir -p test/fixtures/nw/node_modules/certs',
'cp test/certs/server.* test/fixtures/nw/node_modules/certs',
'npx nwbuild --quiet -p linux64 test/fixtures/nw'
].join('&&'),
bpmux_test: 'export TEST_ERR_FILE=/tmp/test_err_$$; ./build/bpmux-test/linux64/bpmux-test; if [ -f $TEST_ERR_FILE ]; then exit 1; fi',
bundle: 'npx webpack --mode production --config test/webpack.config.js',
bundle_example: 'npx webpack --mode production --config test/webpack.example.config.js',
certs: 'if [ ! -f test/certs/server.crt ]; then ./test/certs/make_ca_cert.sh && ./test/certs/make_server_cert.sh; fi'
}).map(([k, cmd]) => [k, { cmd, stdio: 'inherit' }]))
});

@@ -122,3 +92,3 @@

grunt.loadNpmTasks('grunt-apidox');
grunt.loadNpmTasks('grunt-shell');
grunt.loadNpmTasks('grunt-exec');
grunt.loadNpmTasks('grunt-env');

@@ -128,7 +98,7 @@

grunt.registerTask('test', [
'shell:certs',
'exec:certs',
'mochaTest:default'
]);
grunt.registerTask('test-fast', [
'shell:certs',
'exec:certs',
'env:fast',

@@ -139,3 +109,3 @@ 'mochaTest:default'

grunt.registerTask('test-examples', [
'shell:bundle_example',
'exec:bundle_example',
'mochaTest:examples'

@@ -145,12 +115,12 @@ ]);

'save-primus',
'shell:certs',
'shell:bundle',
'shell:nw_build',
'shell:bpmux_test'
'exec:certs',
'exec:bundle',
'exec:nw_build',
'exec:bpmux_test'
]);
grunt.registerTask('docs', 'apidox');
grunt.registerTask('coverage', [
'shell:cover',
'shell:cover_report',
'shell:cover_check'
'exec:cover',
'exec:cover_report',
'exec:cover_check'
]);

@@ -157,0 +127,0 @@ grunt.registerTask('default', ['lint', 'test']);

@@ -193,2 +193,4 @@ /**

### [multiplex](https://github.com/maxogden/multiplex) library
Multiplexing libraries which don't exert backpressure on individual streams

@@ -291,2 +293,53 @@ suffer from starvation. A stream which doesn't read its data stops other streams

### HTTP/2 sessions
[HTTP/2 sessions](https://nodejs.org/dist/latest-v16.x/docs/api/http2.html#class-http2session)
do exert backpressure on individual streams, as this test shows:
```javascript
const fs = require('fs');
const http2 = require('http2');
const server = http2.createServer();
server.on('stream', (stream, headers) => {
stream.on('data', function (d) {
console.log('data', headers[':path'], d.length);
if (headers[':path'] === '/stream1') {
this.pause();
}
});
});
server.listen(8000);
const client = http2.connect('http://localhost:8000');
const stream1 = client.request({ ':path': '/stream1' }, { endStream: false });
const stream2 = client.request({ ':path': '/stream2' }, { endStream: false });
fs.createReadStream('/dev/urandom').pipe(stream1);
fs.createReadStream('/dev/urandom').pipe(stream2);
```
```
data /stream1 16384
data /stream2 16384
data /stream2 16348
data /stream2 35
data /stream2 16384
data /stream2 16384
data /stream2 1
data /stream2 16384
data /stream2 16366
data /stream2 18
data /stream2 16384
data /stream2 16382
data /stream2 2
data /stream2 16384
...
```
If you pass a pair of sessions (one client, one server) to [`BPMux()`](#bpmuxcarrier-options),
they will be used for multiplexing streams, with no additional overhead. This is useful if
you want to use the bpmux API.
## Errors

@@ -363,3 +416,3 @@

[Istanbul](http://gotwarlost.github.io/istanbul/) results are available [here](http://rawgit.davedoesdev.com/davedoesdev/bpmux/master/coverage/lcov-report/index.html).
[c8](https://github.com/bcoe/c8) results are available [here](http://rawgit.davedoesdev.com/davedoesdev/bpmux/master/coverage/lcov-report/index.html).

@@ -395,2 +448,31 @@ Coveralls page is [here](https://coveralls.io/r/davedoesdev/bpmux).

/**
Class for holding a pair of HTTP/2 sessions.
Pass this to [BPMux()](#bpmuxcarrier-options) and it will use the sessions'
existing support for multiplexing streams. Both [client](https://nodejs.org/dist/latest-v16.x/docs/api/http2.html#class-clienthttp2session) and [server](https://nodejs.org/dist/latest-v16.x/docs/api/http2.html#class-serverhttp2session) sessions
are required because HTTP/2 push streams are unidirectional.
@param {ClientHttp2Session} client Client session
@param {ServerHttp2Session} server Server session
*/
class Http2Sessions
{
constructor(client, server)
{
this._client = client;
this._server = server;
}
get client()
{
return this._client;
}
get server()
{
return this._server;
}
}
function BPDuplex(options, mux, chan)

@@ -406,2 +488,3 @@ {

this._mux = mux;
this.mux = mux;
this._chan = chan;

@@ -441,10 +524,3 @@ this._max_write_size = options.max_write_size;

}
if (this._ended)
{
this._check_remove();
}
// Don't call _check_remove if not ended because the close event may be
// due to a local destroy and so data may still come from the peer
// (but be ignored because we don't push to destroyed streams).
// Duplex will be removed when TYPE_END is received.
this._check_remove();
});

@@ -467,2 +543,6 @@

{
// Don't call _remove if not ended because the duplex may have closed
// due to a local destroy and so data may still come from the peer
// (but be ignored because we don't push to destroyed streams).
// Duplex will be removed when TYPE_END is received.
if (this._finished && this._ended && !this._removed)

@@ -522,3 +602,3 @@ {

@param {Duplex} carrier The `Duplex` stream over which other `Duplex` streams will be multiplexed.
@param {Duplex|Http2Sessions} carrier The `Duplex` stream over which other `Duplex` streams will be multiplexed.

@@ -557,6 +637,121 @@ @param {Object} [options] Configuration options. This is passed down to [`frame-stream`](https://github.com/davedoesdev/frame-stream). It also supports the following additional properties:

this._max_open = options.max_open;
this._max_header_size = options.max_header_size;
this.duplexes = new Map();
this._chan = 0;
this._chan_offset = options.high_channels ? this._max_duplexes : 0;
this._parse_handshake_data = options.parse_handshake_data;
this.carrier = carrier;
if (carrier instanceof Http2Sessions)
{
const http2 = require('http2');
const http2_options = options.http2 || {};
const response_headers = {
...http2_options.headers,
[http2.constants.HTTP2_HEADER_STATUS]: 200,
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/octet-stream',
};
carrier.server.on('stream', (duplex, headers) =>
{
if ((this._max_open > 0) && (this.duplexes.size === this._max_open))
{
this.emit('full');
return duplex.respond({
[http2.constants.HTTP2_HEADER_STATUS]: 503
}, {
endStream: true
});
}
const channel = Buffer.from(headers['bpmux-channel'], 'base64').readUint32BE();
if (this.duplexes.has(channel))
{
return duplex.respond({
[http2.constants.HTTP2_HEADER_STATUS]: 409
}, {
endStream: true
});
}
duplex.cork();
this._add_http2_duplex(duplex, channel);
this.emit('peer_multiplex', duplex);
let handshake_delayed = false;
this._parse_http2_handshake(duplex, headers, () =>
{
handshake_delayed = true;
let delayed_handshake;
const uncork = duplex.uncork;
duplex.uncork = () =>
{
duplex.uncork = uncork;
if (!duplex.destroyed) // Node 12 calls uncork on end even if destroyed
{
duplex.respond({
...response_headers,
...this._make_http2_handshake(delayed_handshake)
});
duplex._handshake_sent = true;
this.emit('handshake_sent', duplex, true);
duplex.emit('handshake_sent', true);
duplex.uncork();
}
};
return handshake =>
{
delayed_handshake = handshake;
duplex.uncork();
};
});
if (handshake_delayed)
{
this.emit('pre_handshake_sent', duplex, true);
duplex.emit('pre_handshake_sent', true);
}
else
{
duplex.respond({
...response_headers,
...this._make_http2_handshake()
});
duplex._handshake_sent = true;
this.emit('handshake_sent', duplex, true);
duplex.emit('handshake_sent', true);
duplex.uncork();
}
});
let closed = 0;
for (const session of [carrier.client, carrier.server])
{
session.on('close', () =>
{
// Note: http2 sessions only close once all their
// streams have closed so we don't need to go
// through the duplexes here and close them
if (++closed === 2)
{
this.emit('finish');
this.emit('end');
this.emit('close');
}
});
session.on('error', err =>
{
for (const duplex of this.duplexes.values())
{
if ((duplex.session === session) &&
!duplex.destroyed &&
(duplex.listenerCount('error') > 0))
{
duplex.emit('error', err);
}
}
this.emit('error', err);
});
}
return;
}
this._max_header_size = options.max_header_size;
this._finished = false;

@@ -568,5 +763,3 @@ this._ended = false;

this._peer_multiplex_options = options.peer_multiplex_options;
this._parse_handshake_data = options.parse_handshake_data;
this._coalesce_writes = options.coalesce_writes;
this.carrier = carrier;
this._sending = false;

@@ -601,2 +794,10 @@ this._send_requested = false;

function check_close()
{
if (ths._finished && ths._ended)
{
ths.emit('close');
}
}
function finish()

@@ -620,2 +821,3 @@ {

ths.emit('finish');
check_close();
}

@@ -640,2 +842,3 @@

ths.emit('end');
check_close();
}

@@ -651,7 +854,11 @@

{
// check_remove() is always called when _finished or _ended is set on a duplex,
// so it will have been removed from duplex. Apart from above in end(),
// where the duplex is destroyed, which we check below anyway (check_remove()
// will eventually be called there too via the duplex's 'close' handler).
for (var duplex of ths.duplexes.values())
{
if ((EventEmitter.listenerCount(duplex, 'error') > 0) &&
!(duplex._ended && duplex._finished) &&
!duplex.destroyed)
if (!duplex._removed && // in case destroyed by previous iteration's error handler
!duplex.destroyed &&
(duplex.listenerCount('error') > 0))
{

@@ -784,22 +991,8 @@ duplex.emit('error', err);

chan = buf.readUInt32BE(1, true),
duplex = this.duplexes.get(chan),
handshake_data,
handshake_delayed = false,
dhs,
free,
seq;
duplex = this.duplexes.get(chan);
function delay_handshake()
{
handshake_delayed = true;
return function (handshake_data)
{
duplex._send_handshake(handshake_data);
};
}
function handle_status()
{
free = buf.readUInt32BE(5, true);
seq = buf.length === 13 ? buf.readUInt32BE(9, true) : 0;
let free = buf.readUInt32BE(5, true);
const seq = buf.length === 13 ? buf.readUInt32BE(9, true) : 0;

@@ -884,6 +1077,7 @@ free = duplex._max_write_size > 0 ?

case TYPE_HANDSHAKE:
{
if (!this._check_buffer(buf, 9)) { return; }
if (duplex._seq === 0)
{
free = buf.readUInt32BE(5, true);
const free = buf.readUInt32BE(5, true);
duplex._remote_free = duplex._max_write_size > 0 ?

@@ -894,6 +1088,28 @@ Math.min(free, duplex._max_write_size) : free;

duplex._handshake_received = true;
handshake_data = this._parse_handshake_data ?
this._parse_handshake_data(buf.slice(9)) :
buf.slice(9);
dhs = duplex._handshake_sent ? null : delay_handshake;
let handshake_data = buf.slice(9);
if (this._parse_handshake_data)
{
try
{
handshake_data = this._parse_handshake_data(handshake_data);
}
catch (ex)
{
if (duplex.listenerCount('error') > 0)
{
duplex.emit('error', ex);
}
this.emit('error', ex);
}
}
let handshake_delayed = false;
const delay_handshake = () =>
{
handshake_delayed = true;
return function (handshake_data)
{
duplex._send_handshake(handshake_data);
};
};
const dhs = duplex._handshake_sent ? null : delay_handshake;
this.emit('handshake', duplex, handshake_data, dhs);

@@ -924,2 +1140,3 @@ duplex.emit('handshake', handshake_data, dhs);

break;
}

@@ -1204,10 +1421,25 @@ case TYPE_FINISHED_STATUS:

if (this.carrier._writableState.ending)
if (this.carrier instanceof Http2Sessions)
{
throw new Error('finished');
if (this.carrier.client.closed || this.carrier.client.destroyed)
{
throw new Error('closed');
}
}
else
{
if (this.carrier.destroyed)
{
throw new Error('closed');
}
if (this.carrier._readableState.ended)
{
throw new Error('ended');
if (this.carrier._writableState.ending)
{
throw new Error('finished');
}
if (this.carrier._readableState.ended)
{
throw new Error('ended');
}
}

@@ -1221,2 +1453,43 @@

{
if (ths.carrier instanceof Http2Sessions)
{
const http2 = require('http2');
const chan = Buffer.alloc(4);
chan.writeUInt32BE(channel);
const http2_options = options.http2 || {};
const duplex = ths.carrier.client.request({
[http2.constants.HTTP2_HEADER_PATH]: '/',
[http2.constants.HTTP2_HEADER_METHOD]: 'POST',
...http2_options.headers,
'bpmux-channel': chan.toString('base64'),
...ths._make_http2_handshake(options.handshake_data)
}, {
...http2_options.options,
endStream: false,
waitForTrailers: true
});
ths._add_http2_duplex(duplex, channel);
setImmediate(() =>
{
duplex._handshake_sent = true;
ths.emit('handshake_sent', duplex, true);
duplex.emit('handshake_sent', true);
});
duplex.on('response', headers =>
{
const status = headers[http2.constants.HTTP2_HEADER_STATUS];
if (status !== 200)
{
const msg = `peer returned status ${status} for channel ${channel}`;
const err = new Error(msg);
err.status = status;
err.duplex = duplex;
duplex.destroy(err);
return ths.emit('error', err);
}
ths._parse_http2_handshake(duplex, headers, null);
});
return duplex;
}
var duplex = new BPDuplex(options, ths, channel);

@@ -1237,3 +1510,3 @@

{
return done(options.channel);
return this.duplexes.get(options.channel) || done(options.channel);
}

@@ -1261,2 +1534,74 @@

BPMux.prototype._add_http2_duplex = function (duplex, channel)
{
duplex._mux = this;
duplex.mux = this;
duplex._chan = channel;
duplex.get_channel = () => channel;
duplex._handshake_sent = false;
duplex._handshake_received = false;
duplex._error_end = false;
this.duplexes.set(channel, duplex);
if ((this._max_open > 0) && (this.duplexes.size === this._max_open))
{
this.emit('full');
}
duplex.on('close', () =>
{
this.duplexes.delete(channel);
this.emit('removed', duplex);
});
duplex.peer_error_then_end = function (chunk, encoding, cb)
{
this._error_end = true;
return this.end(chunk, encoding, cb);
};
duplex.on('wantTrailers', function ()
{
this.sendTrailers(
{
'bpmux-error': this._error_end.toString()
});
});
duplex.on('trailers', function (headers)
{
if (headers['bpmux-error'] === 'true')
{
this.emit('error', new Error('peer error'));
}
});
};
BPMux.prototype._make_http2_handshake = function (handshake)
{
return {
'bpmux-handshake': (handshake || Buffer.alloc(0)).toString('base64')
};
};
BPMux.prototype._parse_http2_handshake = function (duplex, headers, delay_handshake)
{
duplex._handshake_received = true;
let handshake_data = Buffer.alloc(0);
try
{
handshake_data = Buffer.from(headers['bpmux-handshake'], 'base64');
if (this._parse_handshake_data)
{
handshake_data = this._parse_handshake_data(handshake_data);
}
}
catch (ex)
{
if (duplex.listenerCount('error') > 0)
{
duplex.emit('error', ex);
}
this.emit('error', ex);
}
this.emit('handshake', duplex, handshake_data, delay_handshake);
duplex.emit('handshake', handshake_data, delay_handshake);
};
exports.BPMux = BPMux;
exports.Http2Sessions = Http2Sessions;
{
"name": "bpmux",
"description": "Node stream multiplexing with back-pressure on each stream",
"version": "8.1.3",
"version": "8.2.0",
"homepage": "https://github.com/davedoesdev/bpmux",

@@ -21,6 +21,3 @@ "author": {

"scripts": {
"test": "grunt lint test-fast",
"ci-coverage": "grunt lint coverage",
"ci-browser": "grunt test-browser",
"ci-inline": "grunt test-inline"
"test": "grunt lint test-fast"
},

@@ -46,30 +43,43 @@ "directories": {

"dependencies": {
"frame-stream": "^2.0.3"
"frame-stream": "^3.0.0"
},
"devDependencies": {
"@babel/cli": "^7.11.6",
"@babel/core": "^7.11.6",
"@babel/plugin-transform-modules-commonjs": "^7.10.4",
"async": "^3.2.0",
"chai": "^4.2.0",
"eslint": "^7.8.1",
"@babel/cli": "^7.17.0",
"@babel/core": "^7.17.2",
"@babel/plugin-transform-modules-commonjs": "^7.16.8",
"async": "^3.2.3",
"c8": "^7.11.0",
"chai": "^4.3.6",
"crypto-browserify": "^3.12.0",
"eslint": "^8.8.0",
"finalhandler": "^1.1.2",
"grunt": "^1.3.0",
"grunt-apidox": "^2.0.10",
"grunt": "^1.4.1",
"grunt-apidox": "^2.0.16",
"grunt-env": "^1.0.1",
"grunt-eslint": "^23.0.0",
"grunt-eslint": "^24.0.0",
"grunt-exec": "^3.0.0",
"grunt-mocha-test": "^0.13.3",
"grunt-shell": "^3.0.1",
"http2-duplex": "^5.0.1",
"mocha": "^8.1.3",
"http2-duplex": "^5.3.1",
"mocha": "^9.2.0",
"nw-builder": "davedoesdev/nw-builder",
"nyc": "^15.1.0",
"primus": "^7.3.5",
"primus-backpressure": "^2.0.3",
"serve-static": "^1.14.1",
"primus": "^8.0.5",
"primus-backpressure": "^2.0.8",
"process": "^0.11.10",
"serve-static": "^1.14.2",
"setimmediate": "^1.0.5",
"stream-browserify": "^3.0.0",
"tmp": "^0.2.1",
"webpack": "^4.44.1",
"webpack-cli": "^3.3.12",
"ws": "^7.3.1"
"util": "^0.12.4",
"webpack": "^5.68.0",
"webpack-cli": "^4.9.2",
"ws": "^8.5.0"
},
"overrides": {
"jsdoctypeparser": {
"lodash": "^4.17.21"
},
"dox": {
"markdown-it": "^12.3.2"
}
}
}

@@ -192,2 +192,4 @@ # bpmux&nbsp;&nbsp;&nbsp;[![Build Status](https://github.com/davedoesdev/bpmux/workflows/ci/badge.svg)](https://github.com/davedoesdev/bpmux/actions) [![Coverage Status](https://coveralls.io/repos/davedoesdev/bpmux/badge.png?branch=master&service=github)](https://coveralls.io/r/davedoesdev/bpmux?branch=master) [![NPM version](https://badge.fury.io/js/bpmux.png)](http://badge.fury.io/js/bpmux)

### [multiplex](https://github.com/maxogden/multiplex) library
Multiplexing libraries which don't exert backpressure on individual streams

@@ -290,2 +292,53 @@ suffer from starvation. A stream which doesn't read its data stops other streams

### HTTP/2 sessions
[HTTP/2 sessions](https://nodejs.org/dist/latest-v16.x/docs/api/http2.html#class-http2session)
do exert backpressure on individual streams, as this test shows:
```javascript
const fs = require('fs');
const http2 = require('http2');
const server = http2.createServer();
server.on('stream', (stream, headers) => {
stream.on('data', function (d) {
console.log('data', headers[':path'], d.length);
if (headers[':path'] === '/stream1') {
this.pause();
}
});
});
server.listen(8000);
const client = http2.connect('http://localhost:8000');
const stream1 = client.request({ ':path': '/stream1' }, { endStream: false });
const stream2 = client.request({ ':path': '/stream2' }, { endStream: false });
fs.createReadStream('/dev/urandom').pipe(stream1);
fs.createReadStream('/dev/urandom').pipe(stream2);
```
```
data /stream1 16384
data /stream2 16384
data /stream2 16348
data /stream2 35
data /stream2 16384
data /stream2 16384
data /stream2 1
data /stream2 16384
data /stream2 16366
data /stream2 18
data /stream2 16384
data /stream2 16382
data /stream2 2
data /stream2 16384
...
```
If you pass a pair of sessions (one client, one server) to [`BPMux()`](#bpmuxcarrier-options),
they will be used for multiplexing streams, with no additional overhead. This is useful if
you want to use the bpmux API.
## Errors

@@ -362,3 +415,3 @@

[Istanbul](http://gotwarlost.github.io/istanbul/) results are available [here](http://rawgit.davedoesdev.com/davedoesdev/bpmux/master/coverage/lcov-report/index.html).
[c8](https://github.com/bcoe/c8) results are available [here](http://rawgit.davedoesdev.com/davedoesdev/bpmux/master/coverage/lcov-report/index.html).

@@ -377,2 +430,3 @@ Coveralls page is [here](https://coveralls.io/r/davedoesdev/bpmux).

- <a name="toc_new-http2sessionsclient-server"></a>[new Http2Sessions](#new-http2sessionsclient-server)
- <a name="toc_bpmuxcarrier-options"></a>[BPMux](#bpmuxcarrier-options)

@@ -390,2 +444,17 @@ - <a name="toc_bpmuxprototypemultiplexoptions"></a><a name="toc_bpmuxprototype"></a>[BPMux.prototype.multiplex](#bpmuxprototypemultiplexoptions)

## new Http2Sessions(client, server)
> Class for holding a pair of HTTP/2 sessions.
Pass this to [BPMux()](#bpmuxcarrier-options) and it will use the sessions'
existing support for multiplexing streams. Both [client](https://nodejs.org/dist/latest-v16.x/docs/api/http2.html#class-clienthttp2session) and [server](https://nodejs.org/dist/latest-v16.x/docs/api/http2.html#class-serverhttp2session) sessions
are required because HTTP/2 push streams are unidirectional.
**Parameters:**
- `{ClientHttp2Session} client` Client session
- `{ServerHttp2Session} server` Server session
<sub>Go: [TOC](#tableofcontents)</sub>
## BPMux(carrier, [options])

@@ -397,3 +466,3 @@

- `{Duplex} carrier` The `Duplex` stream over which other `Duplex` streams will be multiplexed.
- `{Duplex | Http2Sessions} carrier` The `Duplex` stream over which other `Duplex` streams will be multiplexed.
- `{Object} [options]` Configuration options. This is passed down to [`frame-stream`](https://github.com/davedoesdev/frame-stream). It also supports the following additional properties:

@@ -400,0 +469,0 @@ - `{Object} [peer_multiplex_options]` When your `BPMux` object detects a new multiplexed stream from the peer on the carrier, it creates a new `Duplex` and emits a [`peer_multiplex`](#bpmuxeventspeer_multiplexduplex) event. When it creates the `Duplex`, it uses `peer_multiplex_options` to configure it with the following options:

@@ -0,2 +1,3 @@

require('setimmediate');
PrimusDuplex = require('primus-backpressure').PrimusDuplex;
BPMux = require('../../..').BPMux;

@@ -0,1 +1,2 @@

require('setimmediate');
PrimusDuplex = require('primus-backpressure').PrimusDuplex;

@@ -2,0 +3,0 @@ make_client_http2_duplex = require('http2-duplex').default;

@@ -83,2 +83,6 @@ /*eslint-env node */

{
if (conn.destroyed)
{
return cb();
}
conn.on('end', function ()

@@ -98,2 +102,6 @@ {

{
if (conn.destroyed)
{
return cb();
}
conn.on('end', cb);

@@ -149,6 +157,14 @@ conn.end();

{
if (conn.destroyed)
{
return cb();
}
// we have to listen for 'close' separately in case the actual
// connection terminates, in which case we don't get 'end'
// because browser-http2-duplex forgets about it before the
// end message arrives
conn.on('close', cb);
conn.on('end', function ()
{
this.end();
cb();
});

@@ -165,2 +181,6 @@ },

{
if (conn.destroyed)
{
return cb();
}
conn.on('end', cb);

@@ -167,0 +187,0 @@ conn.end();

@@ -76,5 +76,11 @@ /*eslint-env node */

{
if (conn.destroyed)
{
return cb();
}
conn.on('end', function ()
{
this.end();
// note: Http2Stream hardcodes autoDestroy to false so
// we don't get close event here
cb();

@@ -114,2 +120,6 @@ });

{
if (conn.destroyed)
{
return cb();
}
var called = false;

@@ -116,0 +126,0 @@ function cb2()

@@ -495,4 +495,4 @@ /*eslint-env node */

{
expect(count_complete).to.equal(4342);
expect(count_incomplete).to.equal(1);
expect(count_complete).to.be.oneOf([4341, 4342]);
expect(count_incomplete).to.equal(4343 - count_complete);
expect(count_drain).to.equal(0);

@@ -499,0 +499,0 @@ left._write = orig_write;

@@ -33,2 +33,7 @@ /*eslint-env node, mocha */

{
if (conn.destroyed)
{
return cb();
}
conn.on('close', cb);
conn.on('end', function ()

@@ -40,3 +45,2 @@ {

}
cb();
});

@@ -55,2 +59,6 @@ },

{
if (conn.destroyed)
{
return cb();
}
conn.on('close', cb);

@@ -57,0 +65,0 @@ conn.end();

/*eslint-env node */
var path = require('path');
const webpack = require('webpack');
const path = require('path');

@@ -12,3 +13,25 @@ module.exports = {

performance: { hints: false },
optimization: { minimize: false }
optimization: { minimize: false },
resolve: {
fallback: {
util: 'util',
stream: 'stream-browserify',
crypto: 'crypto-browserify',
buffer: 'buffer'
},
alias: {
process: 'process/browser'
}
},
plugins: [
new webpack.ProvidePlugin({
process: 'process'
}),
new webpack.ProvidePlugin({
Buffer: ['buffer', 'Buffer']
}),
new webpack.IgnorePlugin({
resourceRegExp: /^http2$/
})
]
};
/*eslint-env node */
var path = require('path');
const path = require('path');
const cfg = require('./webpack.config.js');
module.exports = {
...cfg,
context: __dirname,

@@ -10,4 +12,3 @@ entry: './fixtures/example/bundler.js',

path: path.join(__dirname, 'fixtures/example')
},
performance: { hints: false }
}
};

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc