Comparing version 8.1.3 to 8.2.0
@@ -66,2 +66,10 @@ /* eslint-disable */ | ||
return function jump(event) { | ||
if ( | ||
document.getElementById('fileSearch') === document.activeElement && | ||
document.activeElement != null | ||
) { | ||
// if we're currently focused on the search input, we don't want to navigate | ||
return; | ||
} | ||
switch (event.which) { | ||
@@ -68,0 +76,0 @@ case 78: // n |
@@ -27,2 +27,27 @@ /* eslint-disable */ | ||
function onFilterInput() { | ||
const searchValue = document.getElementById('fileSearch').value; | ||
const rows = document.getElementsByTagName('tbody')[0].children; | ||
for (let i = 0; i < rows.length; i++) { | ||
const row = rows[i]; | ||
if ( | ||
row.textContent | ||
.toLowerCase() | ||
.includes(searchValue.toLowerCase()) | ||
) { | ||
row.style.display = ''; | ||
} else { | ||
row.style.display = 'none'; | ||
} | ||
} | ||
} | ||
// loads the search box | ||
function addSearchBox() { | ||
var template = document.getElementById('filterTemplate'); | ||
var templateClone = template.content.cloneNode(true); | ||
templateClone.getElementById('fileSearch').oninput = onFilterInput; | ||
template.parentElement.appendChild(templateClone); | ||
} | ||
// loads all columns | ||
@@ -166,2 +191,3 @@ function loadColumns() { | ||
loadData(); | ||
addSearchBox(); | ||
addSortIndicators(); | ||
@@ -168,0 +194,0 @@ enableUI(); |
118
Gruntfile.js
@@ -13,2 +13,4 @@ /*eslint-env node */ | ||
const c8 = "npx c8 -x Gruntfile.js -x 'test/**'"; | ||
module.exports = function (grunt) | ||
@@ -32,2 +34,9 @@ { | ||
apidox: { | ||
input: [ 'index.js', 'events_doc.js' ], | ||
output: 'README.md', | ||
fullSourceDescription: true, | ||
extraHeadingLevels: 1 | ||
}, | ||
mochaTest: { | ||
@@ -39,3 +48,4 @@ default: { | ||
'test/test_inline_stream.js', | ||
'test/test_http2.js' | ||
'test/test_http2.js', | ||
'test/test_http2_session.js' | ||
], | ||
@@ -54,64 +64,24 @@ options: { | ||
apidox: { | ||
input: [ 'index.js', 'events_doc.js' ], | ||
output: 'README.md', | ||
fullSourceDescription: true, | ||
extraHeadingLevels: 1 | ||
}, | ||
shell: { | ||
cover: { | ||
command: "./node_modules/.bin/nyc -x Gruntfile.js -x 'test/**' ./node_modules/.bin/grunt test-fast", | ||
options: { | ||
execOptions: { | ||
maxBuffer: Infinity | ||
} | ||
} | ||
}, | ||
cover_report: { | ||
command: './node_modules/.bin/nyc report -r lcov' | ||
}, | ||
cover_check: { | ||
command: './node_modules/.bin/nyc check-coverage --statements 100 --branches 100 --functions 100 --lines 100' | ||
}, | ||
nw_build: { | ||
command: [ | ||
'rsync -a node_modules test/fixtures/nw --exclude nw-builder', | ||
'NODE_ENV=test ./node_modules/.bin/babel --config-file ./test/fixtures/nw/.babelrc.json test/fixtures/nw/node_modules/http2-duplex/server.js --out-file test/fixtures/nw/node_modules/http2_duplex_server.js --source-maps', | ||
'cp index.js test/fixtures/nw/node_modules/bpmux.js', | ||
'cp test/test_browser.js test/fixtures/nw/node_modules', | ||
'cp test/test_comms.js test/fixtures/nw/node_modules', | ||
'cp test/fixtures/webpack/bundle.js test/fixtures/nw', | ||
'mkdir -p test/fixtures/nw/node_modules/fixtures', | ||
'touch test/fixtures/nw/node_modules/fixtures/keep', | ||
'mkdir -p test/fixtures/nw/node_modules/certs', | ||
'cp test/certs/server.* test/fixtures/nw/node_modules/certs', | ||
'./node_modules/.bin/nwbuild --quiet -p linux64 test/fixtures/nw' | ||
].join('&&') | ||
}, | ||
bpmux_test: { | ||
command: 'export TEST_ERR_FILE=/tmp/test_err_$$; ./build/bpmux-test/linux64/bpmux-test; if [ -f $TEST_ERR_FILE ]; then exit 1; fi', | ||
options: { | ||
execOptions: { | ||
maxBuffer: Infinity | ||
} | ||
} | ||
}, | ||
bundle: { | ||
command: './node_modules/.bin/webpack --mode production --config test/webpack.config.js' | ||
}, | ||
bundle_example: { | ||
command: './node_modules/.bin/webpack --mode production --config test/webpack.example.config.js' | ||
}, | ||
certs: { | ||
command: 'if [ ! -f test/certs/server.crt ]; then ./test/certs/make_ca_cert.sh && ./test/certs/make_server_cert.sh; fi' | ||
} | ||
} | ||
exec: Object.fromEntries(Object.entries({ | ||
cover: `${c8} grunt test-fast`, | ||
cover_report: `${c8} report -r lcov`, | ||
cover_check: `${c8} check-coverage --statements 100 --branches 100 --functions 100 --lines 100`, | ||
nw_build: [ | ||
'rsync -aL node_modules test/fixtures/nw --exclude nw-builder --exclude nwbuild', | ||
'NODE_ENV=test npx babel --config-file ./test/fixtures/nw/.babelrc.json test/fixtures/nw/node_modules/http2-duplex/server.js --out-file test/fixtures/nw/node_modules/http2_duplex_server.js --source-maps', | ||
'cp index.js test/fixtures/nw/node_modules/bpmux.js', | ||
'cp test/test_browser.js test/fixtures/nw/node_modules', | ||
'cp test/test_comms.js test/fixtures/nw/node_modules', | ||
'cp test/fixtures/webpack/bundle.js test/fixtures/nw', | ||
'mkdir -p test/fixtures/nw/node_modules/fixtures', | ||
'touch test/fixtures/nw/node_modules/fixtures/keep', | ||
'mkdir -p test/fixtures/nw/node_modules/certs', | ||
'cp test/certs/server.* test/fixtures/nw/node_modules/certs', | ||
'npx nwbuild --quiet -p linux64 test/fixtures/nw' | ||
].join('&&'), | ||
bpmux_test: 'export TEST_ERR_FILE=/tmp/test_err_$$; ./build/bpmux-test/linux64/bpmux-test; if [ -f $TEST_ERR_FILE ]; then exit 1; fi', | ||
bundle: 'npx webpack --mode production --config test/webpack.config.js', | ||
bundle_example: 'npx webpack --mode production --config test/webpack.example.config.js', | ||
certs: 'if [ ! -f test/certs/server.crt ]; then ./test/certs/make_ca_cert.sh && ./test/certs/make_server_cert.sh; fi' | ||
}).map(([k, cmd]) => [k, { cmd, stdio: 'inherit' }])) | ||
}); | ||
@@ -122,3 +92,3 @@ | ||
grunt.loadNpmTasks('grunt-apidox'); | ||
grunt.loadNpmTasks('grunt-shell'); | ||
grunt.loadNpmTasks('grunt-exec'); | ||
grunt.loadNpmTasks('grunt-env'); | ||
@@ -128,7 +98,7 @@ | ||
grunt.registerTask('test', [ | ||
'shell:certs', | ||
'exec:certs', | ||
'mochaTest:default' | ||
]); | ||
grunt.registerTask('test-fast', [ | ||
'shell:certs', | ||
'exec:certs', | ||
'env:fast', | ||
@@ -139,3 +109,3 @@ 'mochaTest:default' | ||
grunt.registerTask('test-examples', [ | ||
'shell:bundle_example', | ||
'exec:bundle_example', | ||
'mochaTest:examples' | ||
@@ -145,12 +115,12 @@ ]); | ||
'save-primus', | ||
'shell:certs', | ||
'shell:bundle', | ||
'shell:nw_build', | ||
'shell:bpmux_test' | ||
'exec:certs', | ||
'exec:bundle', | ||
'exec:nw_build', | ||
'exec:bpmux_test' | ||
]); | ||
grunt.registerTask('docs', 'apidox'); | ||
grunt.registerTask('coverage', [ | ||
'shell:cover', | ||
'shell:cover_report', | ||
'shell:cover_check' | ||
'exec:cover', | ||
'exec:cover_report', | ||
'exec:cover_check' | ||
]); | ||
@@ -157,0 +127,0 @@ grunt.registerTask('default', ['lint', 'test']); |
433
index.js
@@ -193,2 +193,4 @@ /** | ||
### [multiplex](https://github.com/maxogden/multiplex) library | ||
Multiplexing libraries which don't exert backpressure on individual streams | ||
@@ -291,2 +293,53 @@ suffer from starvation. A stream which doesn't read its data stops other streams | ||
### HTTP/2 sessions | ||
[HTTP/2 sessions](https://nodejs.org/dist/latest-v16.x/docs/api/http2.html#class-http2session) | ||
do exert backpressure on individual streams, as this test shows: | ||
```javascript | ||
const fs = require('fs'); | ||
const http2 = require('http2'); | ||
const server = http2.createServer(); | ||
server.on('stream', (stream, headers) => { | ||
stream.on('data', function (d) { | ||
console.log('data', headers[':path'], d.length); | ||
if (headers[':path'] === '/stream1') { | ||
this.pause(); | ||
} | ||
}); | ||
}); | ||
server.listen(8000); | ||
const client = http2.connect('http://localhost:8000'); | ||
const stream1 = client.request({ ':path': '/stream1' }, { endStream: false }); | ||
const stream2 = client.request({ ':path': '/stream2' }, { endStream: false }); | ||
fs.createReadStream('/dev/urandom').pipe(stream1); | ||
fs.createReadStream('/dev/urandom').pipe(stream2); | ||
``` | ||
``` | ||
data /stream1 16384 | ||
data /stream2 16384 | ||
data /stream2 16348 | ||
data /stream2 35 | ||
data /stream2 16384 | ||
data /stream2 16384 | ||
data /stream2 1 | ||
data /stream2 16384 | ||
data /stream2 16366 | ||
data /stream2 18 | ||
data /stream2 16384 | ||
data /stream2 16382 | ||
data /stream2 2 | ||
data /stream2 16384 | ||
... | ||
``` | ||
If you pass a pair of sessions (one client, one server) to [`BPMux()`](#bpmuxcarrier-options), | ||
they will be used for multiplexing streams, with no additional overhead. This is useful if | ||
you want to use the bpmux API. | ||
## Errors | ||
@@ -363,3 +416,3 @@ | ||
[Istanbul](http://gotwarlost.github.io/istanbul/) results are available [here](http://rawgit.davedoesdev.com/davedoesdev/bpmux/master/coverage/lcov-report/index.html). | ||
[c8](https://github.com/bcoe/c8) results are available [here](http://rawgit.davedoesdev.com/davedoesdev/bpmux/master/coverage/lcov-report/index.html). | ||
@@ -395,2 +448,31 @@ Coveralls page is [here](https://coveralls.io/r/davedoesdev/bpmux). | ||
/** | ||
Class for holding a pair of HTTP/2 sessions. | ||
Pass this to [BPMux()](#bpmuxcarrier-options) and it will use the sessions' | ||
existing support for multiplexing streams. Both [client](https://nodejs.org/dist/latest-v16.x/docs/api/http2.html#class-clienthttp2session) and [server](https://nodejs.org/dist/latest-v16.x/docs/api/http2.html#class-serverhttp2session) sessions | ||
are required because HTTP/2 push streams are unidirectional. | ||
@param {ClientHttp2Session} client Client session | ||
@param {ServerHttp2Session} server Server session | ||
*/ | ||
class Http2Sessions | ||
{ | ||
constructor(client, server) | ||
{ | ||
this._client = client; | ||
this._server = server; | ||
} | ||
get client() | ||
{ | ||
return this._client; | ||
} | ||
get server() | ||
{ | ||
return this._server; | ||
} | ||
} | ||
function BPDuplex(options, mux, chan) | ||
@@ -406,2 +488,3 @@ { | ||
this._mux = mux; | ||
this.mux = mux; | ||
this._chan = chan; | ||
@@ -441,10 +524,3 @@ this._max_write_size = options.max_write_size; | ||
} | ||
if (this._ended) | ||
{ | ||
this._check_remove(); | ||
} | ||
// Don't call _check_remove if not ended because the close event may be | ||
// due to a local destroy and so data may still come from the peer | ||
// (but be ignored because we don't push to destroyed streams). | ||
// Duplex will be removed when TYPE_END is received. | ||
this._check_remove(); | ||
}); | ||
@@ -467,2 +543,6 @@ | ||
{ | ||
// Don't call _remove if not ended because the duplex may have closed | ||
// due to a local destroy and so data may still come from the peer | ||
// (but be ignored because we don't push to destroyed streams). | ||
// Duplex will be removed when TYPE_END is received. | ||
if (this._finished && this._ended && !this._removed) | ||
@@ -522,3 +602,3 @@ { | ||
@param {Duplex} carrier The `Duplex` stream over which other `Duplex` streams will be multiplexed. | ||
@param {Duplex|Http2Sessions} carrier The `Duplex` stream over which other `Duplex` streams will be multiplexed. | ||
@@ -557,6 +637,121 @@ @param {Object} [options] Configuration options. This is passed down to [`frame-stream`](https://github.com/davedoesdev/frame-stream). It also supports the following additional properties: | ||
this._max_open = options.max_open; | ||
this._max_header_size = options.max_header_size; | ||
this.duplexes = new Map(); | ||
this._chan = 0; | ||
this._chan_offset = options.high_channels ? this._max_duplexes : 0; | ||
this._parse_handshake_data = options.parse_handshake_data; | ||
this.carrier = carrier; | ||
if (carrier instanceof Http2Sessions) | ||
{ | ||
const http2 = require('http2'); | ||
const http2_options = options.http2 || {}; | ||
const response_headers = { | ||
...http2_options.headers, | ||
[http2.constants.HTTP2_HEADER_STATUS]: 200, | ||
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/octet-stream', | ||
}; | ||
carrier.server.on('stream', (duplex, headers) => | ||
{ | ||
if ((this._max_open > 0) && (this.duplexes.size === this._max_open)) | ||
{ | ||
this.emit('full'); | ||
return duplex.respond({ | ||
[http2.constants.HTTP2_HEADER_STATUS]: 503 | ||
}, { | ||
endStream: true | ||
}); | ||
} | ||
const channel = Buffer.from(headers['bpmux-channel'], 'base64').readUint32BE(); | ||
if (this.duplexes.has(channel)) | ||
{ | ||
return duplex.respond({ | ||
[http2.constants.HTTP2_HEADER_STATUS]: 409 | ||
}, { | ||
endStream: true | ||
}); | ||
} | ||
duplex.cork(); | ||
this._add_http2_duplex(duplex, channel); | ||
this.emit('peer_multiplex', duplex); | ||
let handshake_delayed = false; | ||
this._parse_http2_handshake(duplex, headers, () => | ||
{ | ||
handshake_delayed = true; | ||
let delayed_handshake; | ||
const uncork = duplex.uncork; | ||
duplex.uncork = () => | ||
{ | ||
duplex.uncork = uncork; | ||
if (!duplex.destroyed) // Node 12 calls uncork on end even if destroyed | ||
{ | ||
duplex.respond({ | ||
...response_headers, | ||
...this._make_http2_handshake(delayed_handshake) | ||
}); | ||
duplex._handshake_sent = true; | ||
this.emit('handshake_sent', duplex, true); | ||
duplex.emit('handshake_sent', true); | ||
duplex.uncork(); | ||
} | ||
}; | ||
return handshake => | ||
{ | ||
delayed_handshake = handshake; | ||
duplex.uncork(); | ||
}; | ||
}); | ||
if (handshake_delayed) | ||
{ | ||
this.emit('pre_handshake_sent', duplex, true); | ||
duplex.emit('pre_handshake_sent', true); | ||
} | ||
else | ||
{ | ||
duplex.respond({ | ||
...response_headers, | ||
...this._make_http2_handshake() | ||
}); | ||
duplex._handshake_sent = true; | ||
this.emit('handshake_sent', duplex, true); | ||
duplex.emit('handshake_sent', true); | ||
duplex.uncork(); | ||
} | ||
}); | ||
let closed = 0; | ||
for (const session of [carrier.client, carrier.server]) | ||
{ | ||
session.on('close', () => | ||
{ | ||
// Note: http2 sessions only close once all their | ||
// streams have closed so we don't need to go | ||
// through the duplexes here and close them | ||
if (++closed === 2) | ||
{ | ||
this.emit('finish'); | ||
this.emit('end'); | ||
this.emit('close'); | ||
} | ||
}); | ||
session.on('error', err => | ||
{ | ||
for (const duplex of this.duplexes.values()) | ||
{ | ||
if ((duplex.session === session) && | ||
!duplex.destroyed && | ||
(duplex.listenerCount('error') > 0)) | ||
{ | ||
duplex.emit('error', err); | ||
} | ||
} | ||
this.emit('error', err); | ||
}); | ||
} | ||
return; | ||
} | ||
this._max_header_size = options.max_header_size; | ||
this._finished = false; | ||
@@ -568,5 +763,3 @@ this._ended = false; | ||
this._peer_multiplex_options = options.peer_multiplex_options; | ||
this._parse_handshake_data = options.parse_handshake_data; | ||
this._coalesce_writes = options.coalesce_writes; | ||
this.carrier = carrier; | ||
this._sending = false; | ||
@@ -601,2 +794,10 @@ this._send_requested = false; | ||
function check_close() | ||
{ | ||
if (ths._finished && ths._ended) | ||
{ | ||
ths.emit('close'); | ||
} | ||
} | ||
function finish() | ||
@@ -620,2 +821,3 @@ { | ||
ths.emit('finish'); | ||
check_close(); | ||
} | ||
@@ -640,2 +842,3 @@ | ||
ths.emit('end'); | ||
check_close(); | ||
} | ||
@@ -651,7 +854,11 @@ | ||
{ | ||
// check_remove() is always called when _finished or _ended is set on a duplex, | ||
// so it will have been removed from duplex. Apart from above in end(), | ||
// where the duplex is destroyed, which we check below anyway (check_remove() | ||
// will eventually be called there too via the duplex's 'close' handler). | ||
for (var duplex of ths.duplexes.values()) | ||
{ | ||
if ((EventEmitter.listenerCount(duplex, 'error') > 0) && | ||
!(duplex._ended && duplex._finished) && | ||
!duplex.destroyed) | ||
if (!duplex._removed && // in case destroyed by previous iteration's error handler | ||
!duplex.destroyed && | ||
(duplex.listenerCount('error') > 0)) | ||
{ | ||
@@ -784,22 +991,8 @@ duplex.emit('error', err); | ||
chan = buf.readUInt32BE(1, true), | ||
duplex = this.duplexes.get(chan), | ||
handshake_data, | ||
handshake_delayed = false, | ||
dhs, | ||
free, | ||
seq; | ||
duplex = this.duplexes.get(chan); | ||
function delay_handshake() | ||
{ | ||
handshake_delayed = true; | ||
return function (handshake_data) | ||
{ | ||
duplex._send_handshake(handshake_data); | ||
}; | ||
} | ||
function handle_status() | ||
{ | ||
free = buf.readUInt32BE(5, true); | ||
seq = buf.length === 13 ? buf.readUInt32BE(9, true) : 0; | ||
let free = buf.readUInt32BE(5, true); | ||
const seq = buf.length === 13 ? buf.readUInt32BE(9, true) : 0; | ||
@@ -884,6 +1077,7 @@ free = duplex._max_write_size > 0 ? | ||
case TYPE_HANDSHAKE: | ||
{ | ||
if (!this._check_buffer(buf, 9)) { return; } | ||
if (duplex._seq === 0) | ||
{ | ||
free = buf.readUInt32BE(5, true); | ||
const free = buf.readUInt32BE(5, true); | ||
duplex._remote_free = duplex._max_write_size > 0 ? | ||
@@ -894,6 +1088,28 @@ Math.min(free, duplex._max_write_size) : free; | ||
duplex._handshake_received = true; | ||
handshake_data = this._parse_handshake_data ? | ||
this._parse_handshake_data(buf.slice(9)) : | ||
buf.slice(9); | ||
dhs = duplex._handshake_sent ? null : delay_handshake; | ||
let handshake_data = buf.slice(9); | ||
if (this._parse_handshake_data) | ||
{ | ||
try | ||
{ | ||
handshake_data = this._parse_handshake_data(handshake_data); | ||
} | ||
catch (ex) | ||
{ | ||
if (duplex.listenerCount('error') > 0) | ||
{ | ||
duplex.emit('error', ex); | ||
} | ||
this.emit('error', ex); | ||
} | ||
} | ||
let handshake_delayed = false; | ||
const delay_handshake = () => | ||
{ | ||
handshake_delayed = true; | ||
return function (handshake_data) | ||
{ | ||
duplex._send_handshake(handshake_data); | ||
}; | ||
}; | ||
const dhs = duplex._handshake_sent ? null : delay_handshake; | ||
this.emit('handshake', duplex, handshake_data, dhs); | ||
@@ -924,2 +1140,3 @@ duplex.emit('handshake', handshake_data, dhs); | ||
break; | ||
} | ||
@@ -1204,10 +1421,25 @@ case TYPE_FINISHED_STATUS: | ||
if (this.carrier._writableState.ending) | ||
if (this.carrier instanceof Http2Sessions) | ||
{ | ||
throw new Error('finished'); | ||
if (this.carrier.client.closed || this.carrier.client.destroyed) | ||
{ | ||
throw new Error('closed'); | ||
} | ||
} | ||
else | ||
{ | ||
if (this.carrier.destroyed) | ||
{ | ||
throw new Error('closed'); | ||
} | ||
if (this.carrier._readableState.ended) | ||
{ | ||
throw new Error('ended'); | ||
if (this.carrier._writableState.ending) | ||
{ | ||
throw new Error('finished'); | ||
} | ||
if (this.carrier._readableState.ended) | ||
{ | ||
throw new Error('ended'); | ||
} | ||
} | ||
@@ -1221,2 +1453,43 @@ | ||
{ | ||
if (ths.carrier instanceof Http2Sessions) | ||
{ | ||
const http2 = require('http2'); | ||
const chan = Buffer.alloc(4); | ||
chan.writeUInt32BE(channel); | ||
const http2_options = options.http2 || {}; | ||
const duplex = ths.carrier.client.request({ | ||
[http2.constants.HTTP2_HEADER_PATH]: '/', | ||
[http2.constants.HTTP2_HEADER_METHOD]: 'POST', | ||
...http2_options.headers, | ||
'bpmux-channel': chan.toString('base64'), | ||
...ths._make_http2_handshake(options.handshake_data) | ||
}, { | ||
...http2_options.options, | ||
endStream: false, | ||
waitForTrailers: true | ||
}); | ||
ths._add_http2_duplex(duplex, channel); | ||
setImmediate(() => | ||
{ | ||
duplex._handshake_sent = true; | ||
ths.emit('handshake_sent', duplex, true); | ||
duplex.emit('handshake_sent', true); | ||
}); | ||
duplex.on('response', headers => | ||
{ | ||
const status = headers[http2.constants.HTTP2_HEADER_STATUS]; | ||
if (status !== 200) | ||
{ | ||
const msg = `peer returned status ${status} for channel ${channel}`; | ||
const err = new Error(msg); | ||
err.status = status; | ||
err.duplex = duplex; | ||
duplex.destroy(err); | ||
return ths.emit('error', err); | ||
} | ||
ths._parse_http2_handshake(duplex, headers, null); | ||
}); | ||
return duplex; | ||
} | ||
var duplex = new BPDuplex(options, ths, channel); | ||
@@ -1237,3 +1510,3 @@ | ||
{ | ||
return done(options.channel); | ||
return this.duplexes.get(options.channel) || done(options.channel); | ||
} | ||
@@ -1261,2 +1534,74 @@ | ||
BPMux.prototype._add_http2_duplex = function (duplex, channel) | ||
{ | ||
duplex._mux = this; | ||
duplex.mux = this; | ||
duplex._chan = channel; | ||
duplex.get_channel = () => channel; | ||
duplex._handshake_sent = false; | ||
duplex._handshake_received = false; | ||
duplex._error_end = false; | ||
this.duplexes.set(channel, duplex); | ||
if ((this._max_open > 0) && (this.duplexes.size === this._max_open)) | ||
{ | ||
this.emit('full'); | ||
} | ||
duplex.on('close', () => | ||
{ | ||
this.duplexes.delete(channel); | ||
this.emit('removed', duplex); | ||
}); | ||
duplex.peer_error_then_end = function (chunk, encoding, cb) | ||
{ | ||
this._error_end = true; | ||
return this.end(chunk, encoding, cb); | ||
}; | ||
duplex.on('wantTrailers', function () | ||
{ | ||
this.sendTrailers( | ||
{ | ||
'bpmux-error': this._error_end.toString() | ||
}); | ||
}); | ||
duplex.on('trailers', function (headers) | ||
{ | ||
if (headers['bpmux-error'] === 'true') | ||
{ | ||
this.emit('error', new Error('peer error')); | ||
} | ||
}); | ||
}; | ||
BPMux.prototype._make_http2_handshake = function (handshake) | ||
{ | ||
return { | ||
'bpmux-handshake': (handshake || Buffer.alloc(0)).toString('base64') | ||
}; | ||
}; | ||
BPMux.prototype._parse_http2_handshake = function (duplex, headers, delay_handshake) | ||
{ | ||
duplex._handshake_received = true; | ||
let handshake_data = Buffer.alloc(0); | ||
try | ||
{ | ||
handshake_data = Buffer.from(headers['bpmux-handshake'], 'base64'); | ||
if (this._parse_handshake_data) | ||
{ | ||
handshake_data = this._parse_handshake_data(handshake_data); | ||
} | ||
} | ||
catch (ex) | ||
{ | ||
if (duplex.listenerCount('error') > 0) | ||
{ | ||
duplex.emit('error', ex); | ||
} | ||
this.emit('error', ex); | ||
} | ||
this.emit('handshake', duplex, handshake_data, delay_handshake); | ||
duplex.emit('handshake', handshake_data, delay_handshake); | ||
}; | ||
exports.BPMux = BPMux; | ||
exports.Http2Sessions = Http2Sessions; |
{ | ||
"name": "bpmux", | ||
"description": "Node stream multiplexing with back-pressure on each stream", | ||
"version": "8.1.3", | ||
"version": "8.2.0", | ||
"homepage": "https://github.com/davedoesdev/bpmux", | ||
@@ -21,6 +21,3 @@ "author": { | ||
"scripts": { | ||
"test": "grunt lint test-fast", | ||
"ci-coverage": "grunt lint coverage", | ||
"ci-browser": "grunt test-browser", | ||
"ci-inline": "grunt test-inline" | ||
"test": "grunt lint test-fast" | ||
}, | ||
@@ -46,30 +43,43 @@ "directories": { | ||
"dependencies": { | ||
"frame-stream": "^2.0.3" | ||
"frame-stream": "^3.0.0" | ||
}, | ||
"devDependencies": { | ||
"@babel/cli": "^7.11.6", | ||
"@babel/core": "^7.11.6", | ||
"@babel/plugin-transform-modules-commonjs": "^7.10.4", | ||
"async": "^3.2.0", | ||
"chai": "^4.2.0", | ||
"eslint": "^7.8.1", | ||
"@babel/cli": "^7.17.0", | ||
"@babel/core": "^7.17.2", | ||
"@babel/plugin-transform-modules-commonjs": "^7.16.8", | ||
"async": "^3.2.3", | ||
"c8": "^7.11.0", | ||
"chai": "^4.3.6", | ||
"crypto-browserify": "^3.12.0", | ||
"eslint": "^8.8.0", | ||
"finalhandler": "^1.1.2", | ||
"grunt": "^1.3.0", | ||
"grunt-apidox": "^2.0.10", | ||
"grunt": "^1.4.1", | ||
"grunt-apidox": "^2.0.16", | ||
"grunt-env": "^1.0.1", | ||
"grunt-eslint": "^23.0.0", | ||
"grunt-eslint": "^24.0.0", | ||
"grunt-exec": "^3.0.0", | ||
"grunt-mocha-test": "^0.13.3", | ||
"grunt-shell": "^3.0.1", | ||
"http2-duplex": "^5.0.1", | ||
"mocha": "^8.1.3", | ||
"http2-duplex": "^5.3.1", | ||
"mocha": "^9.2.0", | ||
"nw-builder": "davedoesdev/nw-builder", | ||
"nyc": "^15.1.0", | ||
"primus": "^7.3.5", | ||
"primus-backpressure": "^2.0.3", | ||
"serve-static": "^1.14.1", | ||
"primus": "^8.0.5", | ||
"primus-backpressure": "^2.0.8", | ||
"process": "^0.11.10", | ||
"serve-static": "^1.14.2", | ||
"setimmediate": "^1.0.5", | ||
"stream-browserify": "^3.0.0", | ||
"tmp": "^0.2.1", | ||
"webpack": "^4.44.1", | ||
"webpack-cli": "^3.3.12", | ||
"ws": "^7.3.1" | ||
"util": "^0.12.4", | ||
"webpack": "^5.68.0", | ||
"webpack-cli": "^4.9.2", | ||
"ws": "^8.5.0" | ||
}, | ||
"overrides": { | ||
"jsdoctypeparser": { | ||
"lodash": "^4.17.21" | ||
}, | ||
"dox": { | ||
"markdown-it": "^12.3.2" | ||
} | ||
} | ||
} |
@@ -192,2 +192,4 @@ # bpmux [![Build Status](https://github.com/davedoesdev/bpmux/workflows/ci/badge.svg)](https://github.com/davedoesdev/bpmux/actions) [![Coverage Status](https://coveralls.io/repos/davedoesdev/bpmux/badge.png?branch=master&service=github)](https://coveralls.io/r/davedoesdev/bpmux?branch=master) [![NPM version](https://badge.fury.io/js/bpmux.png)](http://badge.fury.io/js/bpmux) | ||
### [multiplex](https://github.com/maxogden/multiplex) library | ||
Multiplexing libraries which don't exert backpressure on individual streams | ||
@@ -290,2 +292,53 @@ suffer from starvation. A stream which doesn't read its data stops other streams | ||
### HTTP/2 sessions | ||
[HTTP/2 sessions](https://nodejs.org/dist/latest-v16.x/docs/api/http2.html#class-http2session) | ||
do exert backpressure on individual streams, as this test shows: | ||
```javascript | ||
const fs = require('fs'); | ||
const http2 = require('http2'); | ||
const server = http2.createServer(); | ||
server.on('stream', (stream, headers) => { | ||
stream.on('data', function (d) { | ||
console.log('data', headers[':path'], d.length); | ||
if (headers[':path'] === '/stream1') { | ||
this.pause(); | ||
} | ||
}); | ||
}); | ||
server.listen(8000); | ||
const client = http2.connect('http://localhost:8000'); | ||
const stream1 = client.request({ ':path': '/stream1' }, { endStream: false }); | ||
const stream2 = client.request({ ':path': '/stream2' }, { endStream: false }); | ||
fs.createReadStream('/dev/urandom').pipe(stream1); | ||
fs.createReadStream('/dev/urandom').pipe(stream2); | ||
``` | ||
``` | ||
data /stream1 16384 | ||
data /stream2 16384 | ||
data /stream2 16348 | ||
data /stream2 35 | ||
data /stream2 16384 | ||
data /stream2 16384 | ||
data /stream2 1 | ||
data /stream2 16384 | ||
data /stream2 16366 | ||
data /stream2 18 | ||
data /stream2 16384 | ||
data /stream2 16382 | ||
data /stream2 2 | ||
data /stream2 16384 | ||
... | ||
``` | ||
If you pass a pair of sessions (one client, one server) to [`BPMux()`](#bpmuxcarrier-options), | ||
they will be used for multiplexing streams, with no additional overhead. This is useful if | ||
you want to use the bpmux API. | ||
## Errors | ||
@@ -362,3 +415,3 @@ | ||
[Istanbul](http://gotwarlost.github.io/istanbul/) results are available [here](http://rawgit.davedoesdev.com/davedoesdev/bpmux/master/coverage/lcov-report/index.html). | ||
[c8](https://github.com/bcoe/c8) results are available [here](http://rawgit.davedoesdev.com/davedoesdev/bpmux/master/coverage/lcov-report/index.html). | ||
@@ -377,2 +430,3 @@ Coveralls page is [here](https://coveralls.io/r/davedoesdev/bpmux). | ||
- <a name="toc_new-http2sessionsclient-server"></a>[new Http2Sessions](#new-http2sessionsclient-server) | ||
- <a name="toc_bpmuxcarrier-options"></a>[BPMux](#bpmuxcarrier-options) | ||
@@ -390,2 +444,17 @@ - <a name="toc_bpmuxprototypemultiplexoptions"></a><a name="toc_bpmuxprototype"></a>[BPMux.prototype.multiplex](#bpmuxprototypemultiplexoptions) | ||
## new Http2Sessions(client, server) | ||
> Class for holding a pair of HTTP/2 sessions. | ||
Pass this to [BPMux()](#bpmuxcarrier-options) and it will use the sessions' | ||
existing support for multiplexing streams. Both [client](https://nodejs.org/dist/latest-v16.x/docs/api/http2.html#class-clienthttp2session) and [server](https://nodejs.org/dist/latest-v16.x/docs/api/http2.html#class-serverhttp2session) sessions | ||
are required because HTTP/2 push streams are unidirectional. | ||
**Parameters:** | ||
- `{ClientHttp2Session} client` Client session | ||
- `{ServerHttp2Session} server` Server session | ||
<sub>Go: [TOC](#tableofcontents)</sub> | ||
## BPMux(carrier, [options]) | ||
@@ -397,3 +466,3 @@ | ||
- `{Duplex} carrier` The `Duplex` stream over which other `Duplex` streams will be multiplexed. | ||
- `{Duplex | Http2Sessions} carrier` The `Duplex` stream over which other `Duplex` streams will be multiplexed. | ||
- `{Object} [options]` Configuration options. This is passed down to [`frame-stream`](https://github.com/davedoesdev/frame-stream). It also supports the following additional properties: | ||
@@ -400,0 +469,0 @@ - `{Object} [peer_multiplex_options]` When your `BPMux` object detects a new multiplexed stream from the peer on the carrier, it creates a new `Duplex` and emits a [`peer_multiplex`](#bpmuxeventspeer_multiplexduplex) event. When it creates the `Duplex`, it uses `peer_multiplex_options` to configure it with the following options: |
@@ -0,2 +1,3 @@ | ||
require('setimmediate'); | ||
PrimusDuplex = require('primus-backpressure').PrimusDuplex; | ||
BPMux = require('../../..').BPMux; |
@@ -0,1 +1,2 @@ | ||
require('setimmediate'); | ||
PrimusDuplex = require('primus-backpressure').PrimusDuplex; | ||
@@ -2,0 +3,0 @@ make_client_http2_duplex = require('http2-duplex').default; |
@@ -83,2 +83,6 @@ /*eslint-env node */ | ||
{ | ||
if (conn.destroyed) | ||
{ | ||
return cb(); | ||
} | ||
conn.on('end', function () | ||
@@ -98,2 +102,6 @@ { | ||
{ | ||
if (conn.destroyed) | ||
{ | ||
return cb(); | ||
} | ||
conn.on('end', cb); | ||
@@ -149,6 +157,14 @@ conn.end(); | ||
{ | ||
if (conn.destroyed) | ||
{ | ||
return cb(); | ||
} | ||
// we have to listen for 'close' separately in case the actual | ||
// connection terminates, in which case we don't get 'end' | ||
// because browser-http2-duplex forgets about it before the | ||
// end message arrives | ||
conn.on('close', cb); | ||
conn.on('end', function () | ||
{ | ||
this.end(); | ||
cb(); | ||
}); | ||
@@ -165,2 +181,6 @@ }, | ||
{ | ||
if (conn.destroyed) | ||
{ | ||
return cb(); | ||
} | ||
conn.on('end', cb); | ||
@@ -167,0 +187,0 @@ conn.end(); |
@@ -76,5 +76,11 @@ /*eslint-env node */ | ||
{ | ||
if (conn.destroyed) | ||
{ | ||
return cb(); | ||
} | ||
conn.on('end', function () | ||
{ | ||
this.end(); | ||
// note: Http2Stream hardcodes autoDestroy to false so | ||
// we don't get close event here | ||
cb(); | ||
@@ -114,2 +120,6 @@ }); | ||
{ | ||
if (conn.destroyed) | ||
{ | ||
return cb(); | ||
} | ||
var called = false; | ||
@@ -116,0 +126,0 @@ function cb2() |
@@ -495,4 +495,4 @@ /*eslint-env node */ | ||
{ | ||
expect(count_complete).to.equal(4342); | ||
expect(count_incomplete).to.equal(1); | ||
expect(count_complete).to.be.oneOf([4341, 4342]); | ||
expect(count_incomplete).to.equal(4343 - count_complete); | ||
expect(count_drain).to.equal(0); | ||
@@ -499,0 +499,0 @@ left._write = orig_write; |
@@ -33,2 +33,7 @@ /*eslint-env node, mocha */ | ||
{ | ||
if (conn.destroyed) | ||
{ | ||
return cb(); | ||
} | ||
conn.on('close', cb); | ||
conn.on('end', function () | ||
@@ -40,3 +45,2 @@ { | ||
} | ||
cb(); | ||
}); | ||
@@ -55,2 +59,6 @@ }, | ||
{ | ||
if (conn.destroyed) | ||
{ | ||
return cb(); | ||
} | ||
conn.on('close', cb); | ||
@@ -57,0 +65,0 @@ conn.end(); |
/*eslint-env node */ | ||
var path = require('path'); | ||
const webpack = require('webpack'); | ||
const path = require('path'); | ||
@@ -12,3 +13,25 @@ module.exports = { | ||
performance: { hints: false }, | ||
optimization: { minimize: false } | ||
optimization: { minimize: false }, | ||
resolve: { | ||
fallback: { | ||
util: 'util', | ||
stream: 'stream-browserify', | ||
crypto: 'crypto-browserify', | ||
buffer: 'buffer' | ||
}, | ||
alias: { | ||
process: 'process/browser' | ||
} | ||
}, | ||
plugins: [ | ||
new webpack.ProvidePlugin({ | ||
process: 'process' | ||
}), | ||
new webpack.ProvidePlugin({ | ||
Buffer: ['buffer', 'Buffer'] | ||
}), | ||
new webpack.IgnorePlugin({ | ||
resourceRegExp: /^http2$/ | ||
}) | ||
] | ||
}; |
/*eslint-env node */ | ||
var path = require('path'); | ||
const path = require('path'); | ||
const cfg = require('./webpack.config.js'); | ||
module.exports = { | ||
...cfg, | ||
context: __dirname, | ||
@@ -10,4 +12,3 @@ entry: './fixtures/example/bundler.js', | ||
path: path.join(__dirname, 'fixtures/example') | ||
}, | ||
performance: { hints: false } | ||
} | ||
}; |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
6054
615
476909
29
43
13
14
+ Addedframe-stream@3.0.1(transitive)
- Removedframe-stream@2.1.0(transitive)
Updatedframe-stream@^3.0.0