http2-wrapper
Advanced tools
Comparing version 1.0.0-alpha.0 to 1.0.0-beta.0
{ | ||
"name": "http2-wrapper", | ||
"version": "1.0.0-alpha.0", | ||
"version": "1.0.0-beta.0", | ||
"description": "HTTP2 client, just with the familiar `https` API", | ||
"main": "source", | ||
"engines": { | ||
"node": ">=8.16.0" | ||
"node": ">=10" | ||
}, | ||
"scripts": { | ||
"test": "echo XO $(xo --version) && echo AVA $(ava --version) && xo && nyc ava", | ||
"test": "echo XO $(xo --version) && echo AVA $(ava --version) && xo && nyc --reporter=html --reporter=text ava", | ||
"coveralls": "nyc report --reporter=text-lcov | coveralls" | ||
@@ -38,3 +38,3 @@ }, | ||
"@sindresorhus/is": "^1.0.0", | ||
"ava": "^2.2.0", | ||
"ava": "^2.4.0", | ||
"benchmark": "^2.1.4", | ||
@@ -44,2 +44,5 @@ "coveralls": "^3.0.5", | ||
"get-stream": "^5.1.0", | ||
"got": "^9.6.0", | ||
"lolex": "^4.2.0", | ||
"many-keys-map": "^1.0.2", | ||
"nyc": "^14.1.1", | ||
@@ -49,2 +52,3 @@ "p-event": "^4.1.0", | ||
"to-readable-stream": "^2.1.0", | ||
"tsd": "^0.8.0", | ||
"xo": "^0.24.0" | ||
@@ -51,0 +55,0 @@ }, |
139
README.md
@@ -19,5 +19,6 @@ # http2-wrapper | ||
It's best to run `http2-wrapper` under [**the latest**](https://nodejs.org/en/download/current/) version of Node. It provides the best stability. | ||
## Usage | ||
```js | ||
'use strict'; | ||
const http2 = require('http2-wrapper'); | ||
@@ -54,15 +55,18 @@ | ||
// statusCode: 200 | ||
// headers: { ':status': 200, | ||
// date: 'Sat, 11 Aug 2018 09:37:41 GMT', | ||
// headers: [Object: null prototype] { | ||
// ':status': 200, | ||
// date: 'Fri, 27 Sep 2019 19:45:46 GMT', | ||
// 'content-type': 'application/json', | ||
// 'content-length': '264', | ||
// 'access-control-allow-origin': '*', | ||
// 'access-control-allow-credentials': 'true', | ||
// 'x-backend-header-rtt': '0.002997', | ||
// 'content-length': '239', | ||
// 'x-backend-header-rtt': '0.002516', | ||
// 'strict-transport-security': 'max-age=31536000', | ||
// server: 'nghttpx', | ||
// via: '1.1 nghttpx', | ||
// 'alt-svc': 'h3-23=":4433"; ma=3600', | ||
// 'x-frame-options': 'SAMEORIGIN', | ||
// 'x-xss-protection': '1; mode=block', | ||
// 'x-content-type-options': 'nosniff' } | ||
// 'x-content-type-options': 'nosniff' | ||
// } | ||
// body: { | ||
@@ -75,8 +79,7 @@ // "args": {}, | ||
// "Content-Length": "6", | ||
// "Host": "nghttp2.org:443", | ||
// "Via": "2 nghttpx" | ||
// "Host": "nghttp2.org" | ||
// }, | ||
// "json": 123456, | ||
// "origin": "xxx.xxx.xxx.xxx", | ||
// "url": "https://nghttp2.org:443/httpbin/post" | ||
// "url": "https://nghttp2.org/httpbin/post" | ||
// } | ||
@@ -97,3 +100,2 @@ ``` | ||
```js | ||
'use strict'; | ||
const http2 = require('http2-wrapper'); | ||
@@ -169,7 +171,7 @@ | ||
Resolves ALPN using HTTP options. | ||
Returns a Promise giving the best ALPN protocol possible. It can be either `h2` or `http/1.1`. | ||
### http2.auto.protocolCache | ||
An instance of [`quick-lru`](https://github.com/sindresorhus/quick-lru) used for caching ALPN. | ||
An instance of [`quick-lru`](https://github.com/sindresorhus/quick-lru) used for ALPN cache. | ||
@@ -182,2 +184,9 @@ There is a maximum of 100 entries. You can modify the limit through `protocolCache.maxSize` - note that the change will be visible globally. | ||
##### options.preconnect | ||
Type: `boolean`<br> | ||
Default: `true` | ||
If set to `true`, it will try to connect to the server before sending the request. | ||
### http2.get(url, options, callback) | ||
@@ -202,3 +211,2 @@ | ||
```js | ||
'use strict'; | ||
const http2 = require('http2-wrapper'); | ||
@@ -230,3 +238,3 @@ | ||
If there's no activity in given time (milliseconds), the session is closed. | ||
If there's no activity after `timeout` milliseconds, the session will be closed. | ||
@@ -238,3 +246,3 @@ ##### maxSessions | ||
Max sessions per origin. | ||
The maximum amount of sessions per origin. | ||
@@ -246,29 +254,72 @@ ##### maxFreeSessions | ||
Max free sessions per origin. | ||
The maximum amount of free sessions per origin. | ||
#### agent.getName(authority, options) | ||
##### maxCachedTlsSessions | ||
Returns a `string` containing a proper name for sessions created with these options. | ||
Type: `number`<br> | ||
Default: `100` | ||
The maximum amount of cached TLS sessions. | ||
#### Agent.normalizeAuthority([authority](#authority), servername) | ||
Normalizes the authority URL. | ||
```js | ||
Agent.normalizeAuthority('https://example.com:443'); | ||
// => 'https://example.com' | ||
``` | ||
#### Agent.normalizeOptions([options](https://github.com/szmarczak/http2-wrapper/blob/master/source/agent.js)) | ||
Returns a string containing normalized options. | ||
```js | ||
Agent.normalizeOptions({servername: 'example.com'}); | ||
// => ':example.com' | ||
``` | ||
#### agent.settings | ||
Type: `object`<br> | ||
Default: `{enablePush: false}` | ||
[Settings](https://nodejs.org/api/http2.html#http2_settings_object) used by the current agent instance. | ||
#### agent.getSession(authority, options) | ||
Returns a Promise giving free `Http2Session`. If no free sessions are found, a new one is created. | ||
##### [authority](https://nodejs.org/api/http2.html#http2_http2_connect_authority_options_listener) | ||
##### authority | ||
Type: `string` `URL` `object` | ||
Type: `string` `URL` `Object` | ||
Authority used to create a new session. | ||
##### options | ||
##### [options](https://nodejs.org/api/http2.html#http2_http2_connect_authority_options_listener) | ||
Type: `Object` | ||
Type: `object` | ||
Options used to create a new session. | ||
#### agent.request(authority, options, headers) | ||
Returns a Promise giving free `Http2Session`. If no free sessions are found, a new one is created. | ||
#### agent.getSession([authority](#authority), [options](options-1), listener) | ||
##### listener | ||
Type: `object` | ||
``` | ||
{ | ||
reject: error => void, | ||
resolve: session => void | ||
} | ||
``` | ||
If the `listener` argument is present, the Promise will resolve immediately. It will use the `resolve` function to pass the session. | ||
#### agent.request([authority](#authority), [options](#options-1), [headers](https://nodejs.org/api/http2.html#http2_headers_object)) | ||
Returns a Promise giving `Http2Stream`. | ||
#### agent.createConnection(authority, options) | ||
#### agent.createConnection([authority](#authority), [options](#options-1)) | ||
@@ -279,3 +330,3 @@ Returns a new `TLSSocket`. It defaults to `Agent.connect(authority, options)`. | ||
Makes an attempt to close free sessions. Only sessions with no concurrent streams are closed. | ||
Makes an attempt to close free sessions. Only sessions with 0 concurrent streams are closed. | ||
@@ -288,5 +339,5 @@ #### agent.destroy(reason) | ||
- [WebSockets over HTTP2 is not supported yet](https://github.com/nodejs/node/issues/15230), although there is [a proposal](https://tools.ietf.org/html/rfc8441) already. | ||
- If you're interested in [WebSockets over HTTP2](https://tools.ietf.org/html/rfc8441), then [check out this discussion](https://github.com/websockets/ws/issues/1458). | ||
- [HTTP2 sockets cannot be malformed](https://github.com/nodejs/node/blob/cc8250fab86486632fdeb63892be735d7628cd13/lib/internal/http2/core.js#L725), therefore modifying the socket will have no effect. | ||
- HTTP2 is a binary protocol. Headers are sent without any validation. | ||
- You can make [a custom Agent](examples/push-stream/index.js) to support push streams. | ||
@@ -297,11 +348,11 @@ ## Benchmarks | ||
Server: H2O 2.2.5 [`h2o.conf`](h2o.conf)<br> | ||
Node: v12.6.0 | ||
Node: v12.10.0 | ||
``` | ||
http2-wrapper x 11,886 ops/sec ±1.90% (84 runs sampled) | ||
http2-wrapper - preconfigured session x 14,815 ops/sec ±1.58% (87 runs sampled) | ||
http2 x 18,272 ops/sec ±1.76% (80 runs sampled) | ||
http2 - using PassThrough proxies x 15,215 ops/sec ±2.18% (85 runs sampled) | ||
https x 1,613 ops/sec ±4.56% (75 runs sampled) | ||
http x 6,676 ops/sec ±5.17% (78 runs sampled) | ||
http2-wrapper x 9,954 ops/sec ±3.72% (81 runs sampled) | ||
http2-wrapper - preconfigured session x 12,309 ops/sec ±1.48% (87 runs sampled) | ||
http2 x 14,664 ops/sec ±1.63% (78 runs sampled) | ||
http2 - using PassThrough proxies x 11,884 ops/sec ±2.43% (82 runs sampled) | ||
https x 1,586 ops/sec ±4.05% (79 runs sampled) | ||
http x 5,886 ops/sec ±2.73% (76 runs sampled) | ||
Fastest is http2 | ||
@@ -312,13 +363,13 @@ ``` | ||
- It's `1.537x` slower than `http2`. | ||
- It's `1.280x` slower than `http2` with `PassThrough`. | ||
- It's `7.369x` faster than `https`. | ||
- It's `1.780x` faster than `http`. | ||
- It's `1.473x` slower than `http2`. | ||
- It's `1.194x` slower than `http2` with `2xPassThrough`. | ||
- It's `6.276x` faster than `https`. | ||
- It's `1.691x` faster than `http`. | ||
`http2-wrapper - preconfigured session`: | ||
- It's `1.233x` slower than `http2`. | ||
- It's `1.027x` slower than `http2` with `PassThrough`. | ||
- It's `9.185x` faster than `https`. | ||
- It's `2.219x` faster than `http`. | ||
- It's `1.191x` slower than `http2`. | ||
- It's `1.036x` faster than `http2` with `2xPassThrough`. | ||
- It's `7.761x` faster than `https`. | ||
- It's `2.091x` faster than `http`. | ||
@@ -325,0 +376,0 @@ ## Related |
'use strict'; | ||
const {URL} = require('url'); | ||
const EventEmitter = require('events'); | ||
const tls = require('tls'); | ||
const http2 = require('http2'); | ||
const QuickLRU = require('quick-lru'); | ||
@@ -19,8 +18,5 @@ const kCurrentStreamsCount = Symbol('currentStreamsCount'); | ||
'paddingStrategy', | ||
'peerMaxConcurrentStreams', | ||
'settings', | ||
// `tls.connect()` options | ||
'localAddress', | ||
'family', | ||
'path', | ||
@@ -67,16 +63,100 @@ 'rejectUnauthorized', | ||
class Agent extends EventEmitter { | ||
constructor({timeout = 60000, maxSessions = Infinity, maxFreeSessions = 1} = {}) { | ||
super(); | ||
const addSession = (where, name, session) => { | ||
if (Reflect.has(where, name)) { | ||
where[name].push(session); | ||
} else { | ||
where[name] = [session]; | ||
} | ||
}; | ||
const getSessions = (where, name, normalizedAuthority) => { | ||
if (Reflect.has(where, name)) { | ||
return where[name].filter(session => { | ||
return !session.closed && !session.destroyed && session.originSet.includes(normalizedAuthority); | ||
}); | ||
} | ||
return []; | ||
}; | ||
// See https://tools.ietf.org/html/rfc8336 | ||
const closeCoveredSessions = (where, name, session) => { | ||
if (!Reflect.has(where, name)) { | ||
return; | ||
} | ||
// Clients SHOULD NOT emit new requests on any connection whose Origin | ||
// Set is a proper subset of another connection's Origin Set, and they | ||
// SHOULD close it once all outstanding requests are satisfied. | ||
for (const coveredSession of where[name]) { | ||
if ( | ||
// The set is a proper subset when its length is less than the other set. | ||
coveredSession.originSet.length < session.originSet.length && | ||
// And the other set includes all elements of the subset. | ||
coveredSession.originSet.every(origin => session.originSet.includes(origin)) && | ||
// Makes sure that the session can handle all requests from the covered session. | ||
// TODO: can the session become uncovered when a stream is closed after checking this condition? | ||
coveredSession[kCurrentStreamsCount] + session[kCurrentStreamsCount] <= session.remoteSettings.maxConcurrentStreams | ||
) { | ||
// This allows pending requests to finish and prevents making new requests. | ||
coveredSession.close(); | ||
} | ||
} | ||
}; | ||
// This is basically inverted `closeCoveredSessions(...)`. | ||
const closeSessionIfCovered = (where, name, coveredSession) => { | ||
for (const session of where[name]) { | ||
if ( | ||
coveredSession.originSet.length < session.originSet.length && | ||
coveredSession.originSet.every(origin => session.originSet.includes(origin)) && | ||
coveredSession[kCurrentStreamsCount] + session[kCurrentStreamsCount] <= session.remoteSettings.maxConcurrentStreams | ||
) { | ||
coveredSession.close(); | ||
} | ||
} | ||
}; | ||
class Agent { | ||
constructor({timeout = 60000, maxSessions = Infinity, maxFreeSessions = 1, maxCachedTlsSessions = 100} = {}) { | ||
// A session is considered busy when its current streams count | ||
// is equal to or greater than the `maxConcurrentStreams` value. | ||
this.busySessions = {}; | ||
// A session is considered free when its current streams count | ||
// is less than the `maxConcurrentStreams` value. | ||
this.freeSessions = {}; | ||
// The queue for creating new sessions. It looks like this: | ||
// QUEUE[NORMALIZED_OPTIONS][NORMALIZED_AUTHORITY] = ENTRY_FUNCTION | ||
// | ||
// The entry function has `listeners`, `completed` and `destroyed` properties. | ||
// `listeners` is an array of objects containing `resolve` and `reject` functions. | ||
// `completed` is a boolean. It's set to true after ENTRY_FUNCTION is executed. | ||
// `destroyed` is a boolean. If it's set to true, the session will be destroyed if hasn't connected yet. | ||
this.queue = {}; | ||
// Each session will use this timeout value. | ||
this.timeout = timeout; | ||
// Max sessions per origin. | ||
this.maxSessions = maxSessions; | ||
// Max free sessions per origin. | ||
// TODO: decreasing `maxFreeSessions` should close some sessions | ||
// TODO: should `maxFreeSessions` be related only to sessions with 0 pending streams? | ||
this.maxFreeSessions = maxFreeSessions; | ||
// We don't support push streams by default. | ||
this.settings = { | ||
enablePush: false | ||
}; | ||
// Reusing TLS sessions increases performance. | ||
this.tlsSessionCache = new QuickLRU({maxSize: maxCachedTlsSessions}); | ||
} | ||
getName(authority, options = {}) { | ||
static normalizeAuthority(authority, servername) { | ||
if (typeof authority === 'string') { | ||
@@ -86,14 +166,19 @@ authority = new URL(authority); | ||
const host = servername || authority.hostname || authority.host || 'localhost'; | ||
const port = authority.port || 443; | ||
const host = authority.hostname || authority.host || 'localhost'; | ||
let name = `${host}:${port}`; | ||
if (port === 443) { | ||
return `https://${host}`; | ||
} | ||
// TODO: this should ignore defaults too | ||
for (const key of nameKeys) { | ||
if (Reflect.has(options, key)) { | ||
if (typeof options[key] === 'object') { | ||
name += `:${JSON.stringify(options[key])}`; | ||
} else { | ||
name += `:${options[key]}`; | ||
return `https://${host}:${port}`; | ||
} | ||
static normalizeOptions(options) { | ||
let normalized = ''; | ||
if (options) { | ||
for (const key of nameKeys) { | ||
if (options[key]) { | ||
normalized += `:${options[key]}`; | ||
} | ||
@@ -103,49 +188,105 @@ } | ||
return name; | ||
return normalized; | ||
} | ||
_processQueue(name) { | ||
const busyLength = Reflect.has(this.busySessions, name) ? this.busySessions[name].length : 0; | ||
_tryToCreateNewSession(normalizedOptions, normalizedAuthority) { | ||
if (!Reflect.has(this.queue, normalizedOptions) || !Reflect.has(this.queue[normalizedOptions], normalizedAuthority)) { | ||
return; | ||
} | ||
if (busyLength < this.maxSessions && Reflect.has(this.queue, name) && !this.queue[name].completed) { | ||
this.queue[name].completed = true; | ||
// We need the busy sessions length to check if a session can be created. | ||
const busyLength = getSessions(this.busySessions, normalizedOptions, normalizedAuthority).length; | ||
const item = this.queue[normalizedOptions][normalizedAuthority]; | ||
this.queue[name](); | ||
// The entry function can be run only once. | ||
if (busyLength < this.maxSessions && !item.completed) { | ||
item.completed = true; | ||
item(); | ||
} | ||
} | ||
async getSession(authority, options) { | ||
_closeCoveredSessions(normalizedOptions, session) { | ||
closeCoveredSessions(this.freeSessions, normalizedOptions, session); | ||
closeCoveredSessions(this.busySessions, normalizedOptions, session); | ||
} | ||
getSession(authority, options, listeners) { | ||
return new Promise((resolve, reject) => { | ||
const name = this.getName(authority, options); | ||
const detached = {resolve, reject}; | ||
if (Array.isArray(listeners)) { | ||
listeners = [...listeners]; | ||
if (Reflect.has(this.freeSessions, name)) { | ||
resolve(this.freeSessions[name][0]); | ||
// Resolve the current promise ASAP, we're just moving the listeners. | ||
// They will be executed at a different time. | ||
resolve(); | ||
} else { | ||
listeners = [{resolve, reject}]; | ||
} | ||
return; | ||
const normalizedOptions = Agent.normalizeOptions(options); | ||
const normalizedAuthority = Agent.normalizeAuthority(authority, options && options.servername); | ||
if (Reflect.has(this.freeSessions, normalizedOptions)) { | ||
// Look for all available free sessions. | ||
const freeSessions = getSessions(this.freeSessions, normalizedOptions, normalizedAuthority); | ||
if (freeSessions.length !== 0) { | ||
// Use session which has the biggest stream capacity in order to use the smallest number of sessions possible. | ||
const session = freeSessions.reduce((previousSession, nextSession) => { | ||
if ( | ||
nextSession.remoteSettings.maxConcurrentStreams >= previousSession.remoteSettings.maxConcurrentStreams && | ||
nextSession[kCurrentStreamsCount] > previousSession[kCurrentStreamsCount] | ||
) { | ||
return nextSession; | ||
} | ||
return previousSession; | ||
}); | ||
for (const listener of listeners) { | ||
listener.resolve(session); | ||
} | ||
return; | ||
} | ||
} | ||
if (Reflect.has(this.queue, name)) { | ||
// TODO: limit the maximum amount of listeners | ||
this.queue[name].listeners.push(detached); | ||
if (Reflect.has(this.queue, normalizedOptions)) { | ||
if (Reflect.has(this.queue[normalizedOptions], normalizedAuthority)) { | ||
// There's already an item in the queue, just attach ourselves to it. | ||
this.queue[normalizedOptions][normalizedAuthority].listeners.push(...listeners); | ||
return; | ||
return; | ||
} | ||
} else { | ||
this.queue[normalizedOptions] = {}; | ||
} | ||
const listeners = [detached]; | ||
// The entry must be removed from the queue IMMEDIATELY when: | ||
// 1. a session connects successfully, | ||
// 2. an error occurs. | ||
const removeFromQueue = () => { | ||
// Our entry can be replaced. We cannot remove the new one. | ||
if (Reflect.has(this.queue, normalizedOptions) && this.queue[normalizedOptions][normalizedAuthority] === entry) { | ||
delete this.queue[normalizedOptions][normalizedAuthority]; | ||
const free = () => { | ||
// If our entry is replaced,`completed` will be `false`. | ||
// Or the entry will be `undefined` if all seats are taken. | ||
if (Reflect.has(this.queue, name) && this.queue[name].completed) { | ||
delete this.queue[name]; | ||
if (Object.keys(this.queue[normalizedOptions]).length === 0) { | ||
delete this.queue[normalizedOptions]; | ||
} | ||
} | ||
}; | ||
this.queue[name] = () => { | ||
// The main logic is here | ||
const entry = () => { | ||
const name = `${normalizedAuthority}:${normalizedOptions}`; | ||
let receivedSettings = false; | ||
let servername; | ||
try { | ||
let receivedSettings = false; | ||
const tlsSessionCache = this.tlsSessionCache.get(name); | ||
const session = http2.connect(authority, { | ||
createConnection: this.createConnection, | ||
settings: this.settings, | ||
session: tlsSessionCache ? tlsSessionCache.session : undefined, | ||
...options | ||
@@ -155,44 +296,185 @@ }); | ||
session.setTimeout(this.timeout, () => { | ||
// `.close()` would wait until all streams all closed | ||
session.destroy(); | ||
// Tries to free the session. | ||
const freeSession = () => { | ||
// Fetch the smallest amount of free sessions of any origin we have. | ||
const freeSessionsCount = session.originSet.reduce((accumulator, origin) => { | ||
return Math.min(accumulator, getSessions(this.freeSessions, normalizedOptions, origin).length); | ||
}, Infinity); | ||
// Check the limit. | ||
if (freeSessionsCount < this.maxFreeSessions) { | ||
addSession(this.freeSessions, normalizedOptions, session); | ||
return true; | ||
} | ||
return false; | ||
}; | ||
const isFree = () => session[kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams; | ||
session.socket.once('session', tlsSession => { | ||
// We need to cache the servername due to a bug in OpenSSL. | ||
setImmediate(() => { | ||
this.tlsSessionCache.set(name, { | ||
session: tlsSession, | ||
servername | ||
}); | ||
}); | ||
}); | ||
session.once('error', error => { | ||
session.destroy(); | ||
// OpenSSL bug workaround. | ||
// See https://github.com/nodejs/node/issues/28985 | ||
session.socket.once('secureConnect', () => { | ||
servername = session.socket.servername; | ||
for (const listener of listeners) { | ||
listener.reject(error); | ||
if (servername === false && typeof tlsSessionCache !== 'undefined' && typeof tlsSessionCache.servername !== 'undefined') { | ||
session.socket.servername = tlsSessionCache.servername; | ||
} | ||
}); | ||
session.once('error', error => { | ||
// `receivedSettings` is true when the session has successfully connected. | ||
if (!receivedSettings) { | ||
for (const listener of listeners) { | ||
listener.reject(error); | ||
} | ||
} | ||
// The connection got broken, purge the cache. | ||
this.tlsSessionCache.delete(name); | ||
}); | ||
session.setTimeout(this.timeout, () => { | ||
// Terminates all streams owned by this session. | ||
session.destroy(); | ||
}); | ||
session.once('close', () => { | ||
if (!receivedSettings) { | ||
// Broken connection | ||
const error = new Error('Session closed without receiving a SETTINGS frame'); | ||
for (const listener of listeners) { | ||
listener.reject(new Error('Session closed without receiving a SETTINGS frame')); | ||
listener.reject(error); | ||
} | ||
} | ||
free(); | ||
removeFromQueue(); | ||
removeSession(this.freeSessions, name, session); | ||
this._processQueue(name); | ||
// This cannot be moved to the stream logic, | ||
// because there may be a session that hadn't made a single request. | ||
removeSession(this.freeSessions, normalizedOptions, session); | ||
// There may be another session awaiting. | ||
this._tryToCreateNewSession(normalizedOptions, normalizedAuthority); | ||
}); | ||
// Iterates over the queue and processes listeners. | ||
const processListeners = () => { | ||
if (!Reflect.has(this.queue, normalizedOptions)) { | ||
return; | ||
} | ||
for (const origin of session.originSet) { | ||
if (Reflect.has(this.queue[normalizedOptions], origin)) { | ||
const {listeners} = this.queue[normalizedOptions][origin]; | ||
// Prevents session overloading. | ||
while (listeners.length !== 0 && isFree()) { | ||
// We assume `resolve(...)` calls `request(...)` *directly*, | ||
// otherwise the session will get overloaded. | ||
listeners.shift().resolve(session); | ||
} | ||
if (this.queue[normalizedOptions][origin].listeners.length === 0) { | ||
delete this.queue[normalizedOptions][origin]; | ||
if (Object.keys(this.queue[normalizedOptions]).length === 0) { | ||
delete this.queue[normalizedOptions]; | ||
break; | ||
} | ||
} | ||
// We're no longer free, no point in continuing. | ||
if (!isFree()) { | ||
break; | ||
} | ||
} | ||
} | ||
}; | ||
// The Origin Set cannot shrink. No need to check if it suddenly became covered by another one. | ||
session.once('origin', () => { | ||
if (!isFree()) { | ||
// The session is full. | ||
return; | ||
} | ||
// Close covered sessions (if possible). | ||
this._closeCoveredSessions(normalizedOptions, session); | ||
processListeners(); | ||
// `session.remoteSettings.maxConcurrentStreams` might get increased | ||
session.on('remoteSettings', () => { | ||
this._closeCoveredSessions(normalizedOptions, session); | ||
}); | ||
}); | ||
session.once('remoteSettings', () => { | ||
free(); | ||
// The Agent could have been destroyed already. | ||
if (entry.destroyed) { | ||
const error = new Error('Agent has been destroyed'); | ||
if (Reflect.has(this.freeSessions, name)) { | ||
this.freeSessions[name].push(session); | ||
for (const listener of listeners) { | ||
listener.reject(error); | ||
} | ||
session.destroy(); | ||
return; | ||
} | ||
if (freeSession()) { | ||
// Process listeners, we're free. | ||
processListeners(); | ||
} else if (this.maxFreeSessions === 0) { | ||
processListeners(); | ||
// We're closing ASAP, when all possible requests have been made for this event loop tick. | ||
setImmediate(() => { | ||
session.close(); | ||
}); | ||
} else { | ||
this.freeSessions[name] = [session]; | ||
// Too late, another free session took these listeners. | ||
session.close(); | ||
} | ||
for (const listener of listeners) { | ||
listener.resolve(session); | ||
removeFromQueue(); | ||
// Check if we haven't managed to execute all listeners. | ||
if (listeners.length !== 0) { | ||
// Request for a new session with predefined listeners. | ||
this.getSession(normalizedAuthority, options, listeners); | ||
listeners.length = 0; | ||
} | ||
receivedSettings = true; | ||
// `session.remoteSettings.maxConcurrentStreams` might get increased | ||
session.on('remoteSettings', () => { | ||
// Check if we're eligible to become a free session | ||
if (isFree() && removeSession(this.busySessions, normalizedOptions, session)) { | ||
// Check for free seats | ||
if (freeSession()) { | ||
processListeners(); | ||
} else { | ||
// Assume it's still a busy session | ||
addSession(this.busySessions, normalizedOptions, session); | ||
} | ||
} | ||
}); | ||
}); | ||
// Shim `session.request()` in order to catch all streams | ||
session[kRequest] = session.request; | ||
@@ -204,27 +486,27 @@ session.request = headers => { | ||
// The process won't exit until the session is closed. | ||
session.ref(); | ||
if (++session[kCurrentStreamsCount] >= session.remoteSettings.maxConcurrentStreams) { | ||
removeSession(this.freeSessions, name, session); | ||
++session[kCurrentStreamsCount]; | ||
if (Reflect.has(this.busySessions, name)) { | ||
this.busySessions[name].push(session); | ||
} else { | ||
this.busySessions[name] = [session]; | ||
} | ||
// Check if we became busy | ||
if (!isFree() && removeSession(this.freeSessions, normalizedOptions, session)) { | ||
addSession(this.busySessions, normalizedOptions, session); | ||
} | ||
stream.once('close', () => { | ||
if (--session[kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams) { | ||
--session[kCurrentStreamsCount]; | ||
if (isFree()) { | ||
if (session[kCurrentStreamsCount] === 0) { | ||
// All requests are finished, the process may exit now. | ||
session.unref(); | ||
} | ||
if (removeSession(this.busySessions, name, session) && !session.destroyed) { | ||
if ((this.freeSessions[name] || []).length < this.maxFreeSessions) { | ||
if (Reflect.has(this.freeSessions, name)) { | ||
this.freeSessions[name].push(session); | ||
} else { | ||
this.freeSessions[name] = [session]; | ||
} | ||
// Check if we are no longer busy and the session is not broken. | ||
if (removeSession(this.busySessions, normalizedOptions, session) && !session.destroyed && !session.closed) { | ||
// Check the sessions count of this authority and compare it to `maxSessionsCount`. | ||
if (freeSession()) { | ||
this._closeCoveredSessions(normalizedOptions, session); | ||
processListeners(); | ||
} else { | ||
@@ -235,2 +517,6 @@ session.close(); | ||
} | ||
if (!session.destroyed && !session.closed) { | ||
closeSessionIfCovered(this.freeSessions, normalizedOptions, session); | ||
} | ||
}); | ||
@@ -245,17 +531,24 @@ | ||
delete this.queue[name]; | ||
removeFromQueue(); | ||
} | ||
}; | ||
this.queue[name].listeners = listeners; | ||
this.queue[name].completed = false; | ||
this._processQueue(name); | ||
entry.listeners = listeners; | ||
entry.completed = false; | ||
entry.destroyed = false; | ||
this.queue[normalizedOptions][normalizedAuthority] = entry; | ||
this._tryToCreateNewSession(normalizedOptions, normalizedAuthority); | ||
}); | ||
} | ||
async request(authority, options, headers) { | ||
const session = await this.getSession(authority, options); | ||
const stream = session.request(headers); | ||
return stream; | ||
request(authority, options, headers) { | ||
return new Promise((resolve, reject) => { | ||
this.getSession(authority, options, [{ | ||
reject, | ||
resolve: session => { | ||
resolve(session.request(headers)); | ||
} | ||
}]); | ||
}); | ||
} | ||
@@ -270,9 +563,9 @@ | ||
const port = authority.port || 443; | ||
const host = authority.hostname || authority.host; | ||
if (typeof options.servername === 'undefined') { | ||
options.servername = authority.host; | ||
options.servername = host; | ||
} | ||
const port = authority.port || 443; | ||
const host = authority.hostname || authority.host; | ||
return tls.connect(port, host, options); | ||
@@ -303,2 +596,11 @@ } | ||
} | ||
for (const entriesOfAuthority of Object.values(this.queue)) { | ||
for (const entry of Object.values(entriesOfAuthority)) { | ||
entry.destroyed = true; | ||
} | ||
} | ||
// New requests should NOT attach to destroyed sessions | ||
this.queue = {}; | ||
} | ||
@@ -305,0 +607,0 @@ } |
'use strict'; | ||
const {URL} = require('url'); | ||
const http = require('http'); | ||
const https = require('https'); | ||
const QuickLRU = require('quick-lru'); | ||
const isCompatible = require('./utils/is-compatible'); | ||
const Http2ClientRequest = isCompatible ? require('./client-request') : undefined; | ||
const Http2ClientRequest = require('./client-request'); | ||
const httpResolveALPN = require('./utils/http-resolve-alpn'); | ||
@@ -17,2 +15,3 @@ const urlToOptions = require('./utils/url-to-options'); | ||
const port = options.port || 443; | ||
const ALPNProtocols = options.ALPNProtocols || ['h2', 'http/1.1']; | ||
@@ -28,3 +27,3 @@ const name = `${host}:${port}:${ALPNProtocols.sort()}`; | ||
if (alpnProtocol === 'h2' && isCompatible) { | ||
if (alpnProtocol === 'h2') { | ||
return (options, callback) => { | ||
@@ -31,0 +30,0 @@ if (options.agent && options.agent.http2) { |
'use strict'; | ||
const {URL} = require('url'); | ||
const http2 = require('http2'); | ||
@@ -12,3 +11,6 @@ const {Writable} = require('stream'); | ||
ERR_INVALID_PROTOCOL, | ||
ERR_HTTP_HEADERS_SENT | ||
ERR_HTTP_HEADERS_SENT, | ||
ERR_INVALID_HTTP_TOKEN, | ||
ERR_HTTP_INVALID_HEADER_VALUE, | ||
ERR_INVALID_CHAR | ||
} = require('./utils/errors'); | ||
@@ -30,2 +32,5 @@ | ||
const isValidHttpToken = /^[\^_`a-zA-Z\-0-9!#$%&'*+.|~]+$/; | ||
const isInvalidHeaderValue = /[^\t\u0020-\u007e\u0080-\u00ff]/; | ||
class ClientRequest extends Writable { | ||
@@ -35,4 +40,5 @@ constructor(input, options, callback) { | ||
if (typeof input === 'string' || input instanceof URL) { | ||
input = urlToOptions(new URL(input)); | ||
const hasInput = typeof input === 'string' || input instanceof URL; | ||
if (hasInput) { | ||
input = urlToOptions(input instanceof URL ? input : new URL(input)); | ||
} | ||
@@ -43,3 +49,3 @@ | ||
callback = options; | ||
options = input; | ||
options = hasInput ? input : {...input}; | ||
} else { | ||
@@ -50,13 +56,7 @@ // (input, options, callback) | ||
if (options.agent) { | ||
if (typeof options.agent.request !== 'function') { | ||
throw new ERR_INVALID_ARG_TYPE('options.agent', ['Agent-like Object', 'undefined', 'false'], options.agent); | ||
} | ||
this.agent = options.agent; | ||
} else if (options.session) { | ||
if (options.session) { | ||
this[kSession] = options.session; | ||
} else if (options.agent === false) { | ||
this.agent = new Agent({maxFreeSessions: 0}); | ||
} else if (options.agent === null || typeof options.agent === 'undefined') { | ||
} else if (typeof options.agent === 'undefined' || options.agent === null) { | ||
if (typeof options.createConnection === 'function') { | ||
@@ -69,2 +69,6 @@ // This is a workaround - we don't have to create the session on our own. | ||
} | ||
} else if (typeof options.agent.request === 'function') { | ||
this.agent = options.agent; | ||
} else { | ||
throw new ERR_INVALID_ARG_TYPE('options.agent', ['Agent-like Object', 'undefined', 'false'], options.agent); | ||
} | ||
@@ -110,5 +114,9 @@ | ||
this[kOptions] = options; | ||
this[kAuthority] = options.authority || new URL(`https://${options.hostname || options.host}:${options.port}`); | ||
this[kAuthority] = Agent.normalizeAuthority(options, options.servername); | ||
if (this.agent && (options.preconnect || typeof options.preconnect === 'undefined')) { | ||
if (!Reflect.has(this[kHeaders], ':authority')) { | ||
this[kHeaders][':authority'] = this[kAuthority].slice(8); | ||
} | ||
if (this.agent && options.preconnect !== false) { | ||
this.agent.getSession(this[kAuthority], options).catch(() => {}); | ||
@@ -160,2 +168,6 @@ } | ||
_final(callback) { | ||
if (this.destroyed || this.aborted) { | ||
return; | ||
} | ||
this.flushHeaders(); | ||
@@ -196,3 +208,3 @@ | ||
flushHeaders() { | ||
if (this[kFlushedHeaders] && !this.destroyed && !this.aborted) { | ||
if (this[kFlushedHeaders] || this.destroyed || this.aborted) { | ||
return; | ||
@@ -209,65 +221,66 @@ } | ||
if (!this.destroyed && !this.aborted) { | ||
// Forwards `timeout`, `continue`, `close` and `error` events to this instance. | ||
if (!isConnectMethod) { | ||
proxyEvents(this._request, this, ['timeout', 'continue', 'close', 'error']); | ||
} | ||
if (this.destroyed || this.aborted) { | ||
this._request.close(NGHTTP2_CANCEL); | ||
return; | ||
} | ||
// This event tells we are ready to listen for the data. | ||
this._request.once('response', (headers, flags, rawHeaders) => { | ||
this.res = new IncomingMessage(this.socket); | ||
this.res.req = this; | ||
this.res.statusCode = headers[HTTP2_HEADER_STATUS]; | ||
this.res.headers = headers; | ||
this.res.rawHeaders = rawHeaders; | ||
// Forwards `timeout`, `continue`, `close` and `error` events to this instance. | ||
if (!isConnectMethod) { | ||
proxyEvents(this._request, this, ['timeout', 'continue', 'close', 'error']); | ||
} | ||
this.res.once('end', () => { | ||
if (this.aborted) { | ||
this.res.aborted = true; | ||
this.res.emit('aborted'); | ||
} else { | ||
this.res.complete = true; | ||
} | ||
}); | ||
// This event tells we are ready to listen for the data. | ||
this._request.once('response', (headers, flags, rawHeaders) => { | ||
this.res = new IncomingMessage(this.socket); | ||
this.res.req = this; | ||
this.res.statusCode = headers[HTTP2_HEADER_STATUS]; | ||
this.res.headers = headers; | ||
this.res.rawHeaders = rawHeaders; | ||
if (isConnectMethod) { | ||
this.res.upgrade = true; | ||
this.res.once('end', () => { | ||
if (this.aborted) { | ||
this.res.aborted = true; | ||
this.res.emit('aborted'); | ||
} else { | ||
this.res.complete = true; | ||
} | ||
}); | ||
// The HTTP1 API says the socket is detached here, | ||
// but we can't do that so we pass the original HTTP2 request. | ||
if (this.emit('connect', this.res, this._request, Buffer.alloc(0))) { | ||
this.emit('close'); | ||
} else { | ||
// No listeners attached, destroy the original request. | ||
this._request.destroy(); | ||
} | ||
if (isConnectMethod) { | ||
this.res.upgrade = true; | ||
// The HTTP1 API says the socket is detached here, | ||
// but we can't do that so we pass the original HTTP2 request. | ||
if (this.emit('connect', this.res, this._request, Buffer.alloc(0))) { | ||
this.emit('close'); | ||
} else { | ||
// Forwards data | ||
this._request.pipe(this.res); | ||
// No listeners attached, destroy the original request. | ||
this._request.destroy(); | ||
} | ||
} else { | ||
// Forwards data | ||
this._request.pipe(this.res); | ||
if (!this.emit('response', this.res)) { | ||
// No listeners attached, dump the response. | ||
this.res._dump(); | ||
} | ||
if (!this.emit('response', this.res)) { | ||
// No listeners attached, dump the response. | ||
this.res._dump(); | ||
} | ||
}); | ||
} | ||
}); | ||
// Emits `information` event | ||
this._request.once('headers', headers => this.emit('information', {statusCode: headers[HTTP2_HEADER_STATUS]})); | ||
// Emits `information` event | ||
this._request.once('headers', headers => this.emit('information', {statusCode: headers[HTTP2_HEADER_STATUS]})); | ||
this._request.once('trailers', (trailers, flags, rawTrailers) => { | ||
// Assigns trailers to the response object. | ||
this.res.trailers = trailers; | ||
this.res.rawTrailers = rawTrailers; | ||
}); | ||
this._request.once('trailers', (trailers, flags, rawTrailers) => { | ||
// Assigns trailers to the response object. | ||
this.res.trailers = trailers; | ||
this.res.rawTrailers = rawTrailers; | ||
}); | ||
this.socket = this._request.session.socket; | ||
this.connection = this._request.session.socket; | ||
this.socket = this._request.session.socket; | ||
this.connection = this._request.session.socket; | ||
process.nextTick(() => { | ||
this.emit('socket', this._request.session.socket); | ||
}); | ||
} else { | ||
this._request.close(NGHTTP2_CANCEL); | ||
} | ||
process.nextTick(() => { | ||
this.emit('socket', this._request.session.socket); | ||
}); | ||
}; | ||
@@ -293,2 +306,6 @@ | ||
getHeader(name) { | ||
if (typeof name !== 'string') { | ||
throw new ERR_INVALID_ARG_TYPE('name', 'string', name); | ||
} | ||
return this[kHeaders][name.toLowerCase()]; | ||
@@ -302,2 +319,6 @@ } | ||
removeHeader(name) { | ||
if (typeof name !== 'string') { | ||
throw new ERR_INVALID_ARG_TYPE('name', 'string', name); | ||
} | ||
if (this.headersSent) { | ||
@@ -315,2 +336,14 @@ throw new ERR_HTTP_HEADERS_SENT('remove'); | ||
if (typeof name !== 'string' || !isValidHttpToken.test(name)) { | ||
throw new ERR_INVALID_HTTP_TOKEN('Header name', name); | ||
} | ||
if (typeof value === 'undefined') { | ||
throw new ERR_HTTP_INVALID_HEADER_VALUE(value, name); | ||
} | ||
if (isInvalidHeaderValue.test(value)) { | ||
throw new ERR_INVALID_CHAR('header content', name); | ||
} | ||
this[kHeaders][name.toLowerCase()] = value; | ||
@@ -317,0 +350,0 @@ } |
'use strict'; | ||
const isCompatible = require('./utils/is-compatible'); | ||
const http2 = require('http2'); | ||
const agent = require('./agent'); | ||
const ClientRequest = require('./client-request'); | ||
const IncomingMessage = require('./incoming-message'); | ||
const auto = require('./auto'); | ||
if (isCompatible) { | ||
const http2 = require('http2'); | ||
const agent = require('./agent'); | ||
const ClientRequest = require('./client-request'); | ||
const IncomingMessage = require('./incoming-message'); | ||
const get = (url, options, callback) => { | ||
const req = ClientRequest.request(url, options, callback); | ||
req.end(); | ||
const get = (url, options, callback) => { | ||
const req = ClientRequest.request(url, options, callback); | ||
req.end(); | ||
return req; | ||
}; | ||
return req; | ||
}; | ||
module.exports = { | ||
...http2, | ||
...agent, | ||
auto, | ||
request: ClientRequest.request, | ||
get, | ||
ClientRequest, | ||
IncomingMessage | ||
}; | ||
} else { | ||
module.exports = {auto}; | ||
} | ||
module.exports = { | ||
...http2, | ||
...agent, | ||
auto, | ||
request: ClientRequest.request, | ||
get, | ||
ClientRequest, | ||
IncomingMessage | ||
}; |
'use strict'; | ||
/* istanbul ignore file: https://github.com/nodejs/node/blob/d4c91f28148af8a6c1a95392e5c88cb93d4b61c6/lib/_http_agent.js */ | ||
const net = require('net'); | ||
/* istanbul ignore file: https://github.com/nodejs/node/blob/v12.10.0/lib/_http_agent.js */ | ||
@@ -21,3 +22,7 @@ module.exports = (options, headers) => { | ||
if (net.isIP(servername)) { | ||
return ''; | ||
} | ||
return servername; | ||
}; |
@@ -34,1 +34,13 @@ 'use strict'; | ||
}); | ||
makeError(TypeError, 'ERR_INVALID_HTTP_TOKEN', args => { | ||
return `${args[0]} must be a valid HTTP token [${args[1]}]`; | ||
}); | ||
makeError(TypeError, 'ERR_HTTP_INVALID_HEADER_VALUE', args => { | ||
return `Invalid value "${args[0]} for header "${args[1]}"`; | ||
}); | ||
makeError(TypeError, 'ERR_INVALID_CHAR', args => { | ||
return `Invalid character in ${args[0]} [${args[1]}]`; | ||
}); |
'use strict'; | ||
const resolveALPN = require('resolve-alpn'); | ||
const calculateServerName = require('./calculate-server-name'); | ||
const isCompatible = require('./is-compatible'); | ||
const ALPNProtocols = isCompatible ? ['h2', 'http/1.1'] : ['http/1.1']; | ||
const ALPNProtocols = ['h2', 'http/1.1']; | ||
@@ -8,0 +7,0 @@ // Transforms HTTP options into Socket options and resolves ALPN. |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
46341
1020
369
15
13
7