http2-wrapper
Advanced tools
Comparing version 1.0.0-beta.5.2 to 1.0.0
{ | ||
"name": "http2-wrapper", | ||
"version": "1.0.0-beta.5.2", | ||
"version": "1.0.0", | ||
"description": "HTTP2 client, just with the familiar `https` API", | ||
"main": "source", | ||
"types": "index.d.ts", | ||
"engines": { | ||
"node": ">=10.19.0" | ||
"node": ">=15.10.0" | ||
}, | ||
"scripts": { | ||
"test": "xo && nyc --reporter=lcovonly --reporter=text --reporter=html ava" | ||
"test": "xo && nyc --reporter=lcovonly --reporter=text --reporter=html ava && tsd" | ||
}, | ||
"files": [ | ||
"source" | ||
"source", | ||
"index.d.ts" | ||
], | ||
@@ -36,20 +38,33 @@ "keywords": [ | ||
"devDependencies": { | ||
"@sindresorhus/is": "^3.0.0", | ||
"ava": "^3.10.1", | ||
"@sindresorhus/is": "^4.0.0", | ||
"ava": "^3.15.0", | ||
"benchmark": "^2.1.4", | ||
"get-stream": "^5.1.0", | ||
"got": "^11.5.0", | ||
"http2-proxy": "^5.0.51", | ||
"get-stream": "^6.0.0", | ||
"got": "^11.8.1", | ||
"http2-proxy": "^5.0.53", | ||
"https-proxy-agent": "^5.0.0", | ||
"lolex": "^6.0.0", | ||
"many-keys-map": "^1.0.2", | ||
"many-keys-map": "^1.0.3", | ||
"nyc": "^15.1.0", | ||
"p-event": "^4.2.0", | ||
"tempy": "^0.5.0", | ||
"tempy": "^1.0.0", | ||
"to-readable-stream": "^2.1.0", | ||
"tsd": "^0.13.1", | ||
"xo": "^0.32.1" | ||
"tsd": "^0.14.0", | ||
"websocket-stream": "^5.5.2", | ||
"ws": "^7.4.2", | ||
"xo": "^0.37.1" | ||
}, | ||
"ava": { | ||
"timeout": "2m" | ||
"timeout": "10s" | ||
}, | ||
"nyc": { | ||
"include": [ | ||
"source" | ||
] | ||
}, | ||
"xo": { | ||
"rules": { | ||
"unicorn/no-for-loop": "off" | ||
} | ||
} | ||
} |
202
README.md
@@ -101,3 +101,3 @@ # http2-wrapper | ||
hostname: 'httpbin.org', | ||
protocol: 'http:', // Note the `http:` protocol here | ||
protocol: 'http:', // Try changing this to https: | ||
path: '/post', | ||
@@ -171,3 +171,3 @@ method: 'POST', | ||
The session used to make the actual request. If none provided, it will use `options.agent`. | ||
The session used to make the actual request. If none provided, it will use `options.agent` to get one. | ||
@@ -205,4 +205,4 @@ ### http2.get(url, options, callback) | ||
agent: new MyAgent() | ||
}, res => { | ||
res.on('data', chunk => console.log(`Received chunk of ${chunk.length} bytes`)); | ||
}, response => { | ||
response.on('data', chunk => console.log(`Received chunk of ${chunk.length} bytes`)); | ||
}); | ||
@@ -213,3 +213,3 @@ ``` | ||
Each option is assigned to each `Agent` instance and can be changed later. | ||
Each option is an `Agent` property and can be changed later. | ||
@@ -219,5 +219,5 @@ ##### timeout | ||
Type: `number`<br> | ||
Default: `60000` | ||
Default: `0` | ||
If there's no activity after `timeout` milliseconds, the session will be closed. | ||
If there's no activity after `timeout` milliseconds, the session will be closed. If `0`, no timeout is applied. | ||
@@ -231,3 +231,3 @@ ##### maxSessions | ||
##### maxFreeSessions | ||
##### maxEmptySessions | ||
@@ -237,6 +237,4 @@ Type: `number`<br> | ||
The maximum amount of free sessions in total. This only applies to sessions with no pending requests. | ||
The maximum amount of empty sessions in total. An empty session is a session with no pending requests. | ||
**Note:** It is possible that the amount will be exceeded when sessions have at least 1 pending request. | ||
##### maxCachedTlsSessions | ||
@@ -249,5 +247,6 @@ | ||
#### Agent.normalizeOrigin(url) | ||
#### agent.protocol | ||
Returns a string representing the origin of the URL. | ||
Type: `string`<br> | ||
Default: `https:` | ||
@@ -267,3 +266,3 @@ #### agent.settings | ||
Agent.normalizeOptions({servername: 'example.com'}); | ||
// => ':example.com' | ||
// => ':::::::::::::::::::::::::::::::::::::' | ||
``` | ||
@@ -277,3 +276,3 @@ | ||
An origin used to create new session. | ||
Origin used to create new session. | ||
@@ -284,6 +283,8 @@ ##### [options](https://nodejs.org/api/http2.html#http2_http2_connect_authority_options_listener) | ||
The options used to create new session. | ||
Options used to create new session. | ||
Returns a Promise giving free `Http2Session`. If no free sessions are found, a new one is created. | ||
A session is considered free when pending streams count is less than max concurrent streams settings. | ||
#### agent.getSession([origin](#origin), [options](options-1), listener) | ||
@@ -312,6 +313,11 @@ | ||
#### agent.closeFreeSessions() | ||
#### agent.closeEmptySessions(count) | ||
Makes an attempt to close free sessions. Only sessions with 0 concurrent streams will be closed. | ||
##### count | ||
Type: `number` | ||
Default: `Number.POSITIVE_INFINITY` | ||
Makes an attempt to close empty sessions. Only sessions with 0 concurrent streams will be closed. | ||
#### agent.destroy(reason) | ||
@@ -321,160 +327,56 @@ | ||
#### Event: 'session' | ||
#### agent.emptySessionCount | ||
```js | ||
agent.on('session', session => { | ||
// A new session has been created by the Agent. | ||
}); | ||
``` | ||
Type: `number` | ||
## Proxy support | ||
A number of empty sessions. | ||
An example of a full-featured proxy server can be found [here](examples/proxy/server.js). It supports **mirroring, custom authorities and the CONNECT protocol**. | ||
#### agent.pendingSessionCount | ||
### Mirroring | ||
Type: `number` | ||
To mirror another server we need to use only [`http2-proxy`](https://github.com/nxtedition/node-http2-proxy). We don't need the CONNECT protocol or custom authorities. | ||
A number of pending sessions. | ||
To see the result, just navigate to the server's address. | ||
#### agent.sessionCount | ||
### HTTP/1 over HTTP/2 | ||
Type: `number` | ||
Since we don't care about mirroring, the server needs to support the CONNECT protocol in this case. | ||
A number of all sessions held by the Agent. | ||
The client looks like this: | ||
#### Event: 'session' | ||
```js | ||
const https = require('https'); | ||
const http2 = require('http2'); | ||
const session = http2.connect('https://localhost:8000', { | ||
// For demo purposes only! | ||
rejectUnauthorized: false | ||
agent.on('session', session => { | ||
// A new session has been created by the Agent. | ||
}); | ||
session.ref(); | ||
https.request('https://httpbin.org/anything', { | ||
createConnection: options => { | ||
return session.request({ | ||
':method': 'CONNECT', | ||
':authority': `${options.host}:${options.port}` | ||
}); | ||
} | ||
}, response => { | ||
console.log('statusCode:', response.statusCode); | ||
console.log('headers:', response.headers); | ||
const body = []; | ||
response.on('data', chunk => { | ||
body.push(chunk); | ||
}); | ||
response.on('end', () => { | ||
console.log('body:', Buffer.concat(body).toString()); | ||
session.unref(); | ||
}); | ||
}).end(); | ||
``` | ||
### HTTP/2 over HTTP/2 | ||
## Proxy support | ||
It's a tricky one! We cannot create an HTTP/2 session on top of an HTTP/2 stream. But... we can still specify the `:authority` header, no need to use the CONNECT protocol here. | ||
Currently `http2-wrapper` provides support for these proxies: | ||
The client looks like this: | ||
- `HttpOverHttp2` | ||
- `HttpsOverHttp2` | ||
- `Http2OverHttp2` | ||
- `Http2OverHttp` | ||
- `Http2OverHttps` | ||
```js | ||
const http2 = require('../../source'); | ||
const {Agent} = http2; | ||
Any of the above can be accessed via `http2wrapper.proxies`. Check out the [`examples/proxies`](examples/proxies) directory to learn more. | ||
class ProxyAgent extends Agent { | ||
constructor(url, options) { | ||
super(options); | ||
## Mirroring another server | ||
this.origin = url; | ||
} | ||
See [`examples/proxies/mirror.js`](examples/proxies/mirror.js) for an example. | ||
request(origin, sessionOptions, headers, streamOptions) { | ||
return super.request(this.origin, sessionOptions, { | ||
...headers, | ||
':authority': (new URL(origin)).host | ||
}, streamOptions); | ||
} | ||
} | ||
## [WebSockets over HTTP/2](https://tools.ietf.org/html/rfc8441) | ||
const request = http2.request({ | ||
hostname: 'httpbin.org', | ||
protocol: 'https:', | ||
path: '/anything', | ||
agent: new ProxyAgent('https://localhost:8000'), | ||
// For demo purposes only! | ||
rejectUnauthorized: false | ||
}, response => { | ||
console.log('statusCode:', response.statusCode); | ||
console.log('headers:', response.headers); | ||
See [`examples/ws`](examples/ws) for an example. | ||
const body = []; | ||
response.on('data', chunk => { | ||
body.push(chunk); | ||
}); | ||
response.on('end', () => { | ||
console.log('body:', Buffer.concat(body).toString()); | ||
}); | ||
}); | ||
## Push streams | ||
request.on('error', console.error); | ||
See [`examples/push-stream`](examples/push-stream) for an example. | ||
request.end(); | ||
``` | ||
## Notes | ||
- If you're interested in [WebSockets over HTTP/2](https://tools.ietf.org/html/rfc8441), then [check out this discussion](https://github.com/websockets/ws/issues/1458). | ||
- [HTTP/2 sockets cannot be malformed](https://github.com/nodejs/node/blob/cc8250fab86486632fdeb63892be735d7628cd13/lib/internal/http2/core.js#L725), therefore modifying the socket will have no effect. | ||
- You can make [a custom Agent](examples/push-stream/index.js) to support push streams. | ||
## Benchmarks | ||
CPU: Intel i7-7700k (governor: performance)<br> | ||
Server: H2O v2.2.5 [`h2o.conf`](h2o.conf)<br> | ||
Node: v14.5.0 | ||
Linux: 5.6.18-156.current | ||
`auto` means `http2wrapper.auto`. | ||
``` | ||
http2-wrapper x 12,181 ops/sec ±3.39% (75 runs sampled) | ||
http2-wrapper - preconfigured session x 13,140 ops/sec ±2.51% (79 runs sampled) | ||
http2-wrapper - auto x 11,412 ops/sec ±2.55% (78 runs sampled) | ||
http2 x 16,050 ops/sec ±1.39% (86 runs sampled) | ||
https - auto - keepalive x 12,288 ops/sec ±2.69% (79 runs sampled) | ||
https - keepalive x 12,155 ops/sec ±3.32% (78 runs sampled) | ||
https x 1,604 ops/sec ±2.03% (77 runs sampled) | ||
http x 6,041 ops/sec ±3.82% (76 runs sampled) | ||
Fastest is http2 | ||
``` | ||
`http2-wrapper`: | ||
- 32% **less** performant than `http2` | ||
- as performant as `https - keepalive` | ||
- 100% **more** performant than `http` | ||
`http2-wrapper - preconfigured session`: | ||
- 22% **less** performant than `http2` | ||
- 8% **more** performant than `https - keepalive` | ||
- 118% **more** performant than `http` | ||
`http2-wrapper - auto`: | ||
- 41% **less** performant than `http2` | ||
- 8% **less** performant than `https - keepalive` | ||
- 89% **more** performant than `http` | ||
`https - auto - keepalive`: | ||
- 31% **less** performant than `http2` | ||
- as performant as `https - keepalive` | ||
- 103% **more** performant than `http` | ||
## Related | ||
- [`got`](https://github.com/sindresorhus/got) - Simplified HTTP requests | ||
- [`got`](https://github.com/sindresorhus/got) - Simplified HTTP requests | ||
- [`http2-proxy`](https://github.com/nxtedition/node-http2-proxy) - A simple http/2 & http/1.1 spec compliant proxy helper for Node. | ||
@@ -481,0 +383,0 @@ ## License |
'use strict'; | ||
// See https://github.com/facebook/jest/issues/2549 | ||
// eslint-disable-next-line node/prefer-global/url | ||
const {URL} = require('url'); | ||
const EventEmitter = require('events'); | ||
@@ -6,11 +9,17 @@ const tls = require('tls'); | ||
const QuickLRU = require('quick-lru'); | ||
const delayAsyncDestroy = require('./utils/delay-async-destroy'); | ||
const kCurrentStreamsCount = Symbol('currentStreamsCount'); | ||
const kCurrentStreamCount = Symbol('currentStreamCount'); | ||
const kRequest = Symbol('request'); | ||
const kOriginSet = Symbol('cachedOriginSet'); | ||
const kGracefullyClosing = Symbol('gracefullyClosing'); | ||
const kLength = Symbol('length'); | ||
const nameKeys = [ | ||
// Not an Agent option actually | ||
'createConnection', | ||
// `http2.connect()` options | ||
'maxDeflateDynamicTableSize', | ||
'maxSettings', | ||
'maxSessionMemory', | ||
@@ -22,26 +31,40 @@ 'maxHeaderListPairs', | ||
'paddingStrategy', | ||
'peerMaxConcurrentStreams', | ||
'settings', | ||
// `tls.connect()` options | ||
// `tls.connect()` source options | ||
'family', | ||
'localAddress', | ||
'path', | ||
'rejectUnauthorized', | ||
// `tls.connect()` secure context options | ||
'pskCallback', | ||
'minDHSize', | ||
// `tls.connect()` destination options | ||
// - `servername` is automatically validated, skip it | ||
// - `host` and `port` just describe the destination server, | ||
'path', | ||
'socket', | ||
// `tls.createSecureContext()` options | ||
'ca', | ||
'cert', | ||
'sigalgs', | ||
'ciphers', | ||
'clientCertEngine', | ||
'ciphers', | ||
'crl', | ||
'dhparam', | ||
'ecdhCurve', | ||
'honorCipherOrder', | ||
'key', | ||
'privateKeyEngine', | ||
'privateKeyIdentifier', | ||
'maxVersion', | ||
'minVersion', | ||
'pfx', | ||
'servername', | ||
'minVersion', | ||
'maxVersion', | ||
'secureOptions', | ||
'secureProtocol', | ||
'crl', | ||
'honorCipherOrder', | ||
'ecdhCurve', | ||
'dhparam', | ||
'secureOptions', | ||
'sessionIdContext' | ||
'sessionIdContext', | ||
'ticketKeys' | ||
]; | ||
@@ -56,5 +79,3 @@ | ||
/* istanbul ignore next */ | ||
if (compare(array[mid], value)) { | ||
// This never gets called because we use descending sort. Better to have this anyway. | ||
low = mid + 1; | ||
@@ -78,4 +99,9 @@ } else { | ||
// SHOULD close it once all outstanding requests are satisfied. | ||
for (const coveredSession of where) { | ||
for (let index = 0; index < where.length; index++) { | ||
const coveredSession = where[index]; | ||
if ( | ||
// Unfortunately `.every()` returns true for an empty array | ||
coveredSession[kOriginSet].length > 0 && | ||
// The set is a proper subset when its length is less than the other set. | ||
@@ -88,3 +114,3 @@ coveredSession[kOriginSet].length < session[kOriginSet].length && | ||
// Makes sure that the session can handle all requests from the covered session. | ||
coveredSession[kCurrentStreamsCount] + session[kCurrentStreamsCount] <= session.remoteSettings.maxConcurrentStreams | ||
(coveredSession[kCurrentStreamCount] + session[kCurrentStreamCount]) <= session.remoteSettings.maxConcurrentStreams | ||
) { | ||
@@ -99,32 +125,18 @@ // This allows pending requests to finish and prevents making new requests. | ||
const closeSessionIfCovered = (where, coveredSession) => { | ||
for (const session of where) { | ||
for (let index = 0; index < where.length; index++) { | ||
const session = where[index]; | ||
if ( | ||
coveredSession[kOriginSet].length > 0 && | ||
coveredSession[kOriginSet].length < session[kOriginSet].length && | ||
coveredSession[kOriginSet].every(origin => session[kOriginSet].includes(origin)) && | ||
coveredSession[kCurrentStreamsCount] + session[kCurrentStreamsCount] <= session.remoteSettings.maxConcurrentStreams | ||
(coveredSession[kCurrentStreamCount] + session[kCurrentStreamCount]) <= session.remoteSettings.maxConcurrentStreams | ||
) { | ||
gracefullyClose(coveredSession); | ||
} | ||
} | ||
}; | ||
const getSessions = ({agent, isFree}) => { | ||
const result = {}; | ||
// eslint-disable-next-line guard-for-in | ||
for (const normalizedOptions in agent.sessions) { | ||
const sessions = agent.sessions[normalizedOptions]; | ||
const filtered = sessions.filter(session => { | ||
const result = session[Agent.kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams; | ||
return isFree ? result : !result; | ||
}); | ||
if (filtered.length !== 0) { | ||
result[normalizedOptions] = filtered; | ||
return true; | ||
} | ||
} | ||
return result; | ||
return false; | ||
}; | ||
@@ -135,3 +147,3 @@ | ||
if (session[kCurrentStreamsCount] === 0) { | ||
if (session[kCurrentStreamCount] === 0) { | ||
session.close(); | ||
@@ -142,11 +154,5 @@ } | ||
class Agent extends EventEmitter { | ||
constructor({timeout = 60000, maxSessions = Infinity, maxFreeSessions = 10, maxCachedTlsSessions = 100} = {}) { | ||
constructor({timeout = 0, maxSessions = Number.POSITIVE_INFINITY, maxEmptySessions = 10, maxCachedTlsSessions = 100} = {}) { | ||
super(); | ||
// A session is considered busy when its current streams count | ||
// is equal to or greater than the `maxConcurrentStreams` value. | ||
// A session is considered free when its current streams count | ||
// is less than the `maxConcurrentStreams` value. | ||
// SESSIONS[NORMALIZED_OPTIONS] = []; | ||
@@ -158,2 +164,5 @@ this.sessions = {}; | ||
// | ||
// It's faster when there are many origins. If there's only one, then QUEUE[`${options}:${origin}`] is faster. | ||
// I guess object creation / deletion is causing the slowdown. | ||
// | ||
// The entry function has `listeners`, `completed` and `destroyed` properties. | ||
@@ -171,8 +180,7 @@ // `listeners` is an array of objects containing `resolve` and `reject` functions. | ||
// Max free sessions in total | ||
// TODO: decreasing `maxFreeSessions` should close some sessions | ||
this.maxFreeSessions = maxFreeSessions; | ||
// Max empty sessions in total | ||
this.maxEmptySessions = maxEmptySessions; | ||
this._freeSessionsCount = 0; | ||
this._sessionsCount = 0; | ||
this._emptySessionCount = 0; | ||
this._sessionCount = 0; | ||
@@ -188,12 +196,4 @@ // We don't support push streams by default. | ||
static normalizeOrigin(url, servername) { | ||
if (typeof url === 'string') { | ||
url = new URL(url); | ||
} | ||
if (servername && url.hostname !== servername) { | ||
url.hostname = servername; | ||
} | ||
return url.origin; | ||
get protocol() { | ||
return 'https:'; | ||
} | ||
@@ -204,7 +204,9 @@ | ||
if (options) { | ||
for (const key of nameKeys) { | ||
if (options[key]) { | ||
normalized += `:${options[key]}`; | ||
} | ||
for (let index = 0; index < nameKeys.length; index++) { | ||
const key = nameKeys[index]; | ||
normalized += ':'; | ||
if (options && options[key] !== undefined) { | ||
normalized += options[key]; | ||
} | ||
@@ -216,23 +218,50 @@ } | ||
_tryToCreateNewSession(normalizedOptions, normalizedOrigin) { | ||
if (!(normalizedOptions in this.queue) || !(normalizedOrigin in this.queue[normalizedOptions])) { | ||
_processQueue() { | ||
if (this._sessionCount >= this.maxSessions) { | ||
this.closeEmptySessions(this.maxSessions - this._sessionCount + 1); | ||
return; | ||
} | ||
const item = this.queue[normalizedOptions][normalizedOrigin]; | ||
// eslint-disable-next-line guard-for-in | ||
for (const normalizedOptions in this.queue) { | ||
// eslint-disable-next-line guard-for-in | ||
for (const normalizedOrigin in this.queue[normalizedOptions]) { | ||
const item = this.queue[normalizedOptions][normalizedOrigin]; | ||
// The entry function can be run only once. | ||
// BUG: The session may be never created when: | ||
// - the first condition is false AND | ||
// - this function is never called with the same arguments in the future. | ||
if (this._sessionsCount < this.maxSessions && !item.completed) { | ||
item.completed = true; | ||
// The entry function can be run only once. | ||
if (!item.completed) { | ||
item.completed = true; | ||
item(); | ||
item(); | ||
} | ||
} | ||
} | ||
} | ||
_isBetterSession(thisStreamCount, thatStreamCount) { | ||
return thisStreamCount > thatStreamCount; | ||
} | ||
_accept(session, listeners, normalizedOrigin, options) { | ||
let index = 0; | ||
while (index < listeners.length && session[kCurrentStreamCount] < session.remoteSettings.maxConcurrentStreams) { | ||
// We assume `resolve(...)` calls `request(...)` *directly*, | ||
// otherwise the session will get overloaded. | ||
listeners[index].resolve(session); | ||
index++; | ||
} | ||
listeners.splice(0, index); | ||
if (listeners.length > 0) { | ||
this.getSession(normalizedOrigin, options, listeners); | ||
listeners.length = 0; | ||
} | ||
} | ||
getSession(origin, options, listeners) { | ||
return new Promise((resolve, reject) => { | ||
if (Array.isArray(listeners)) { | ||
if (Array.isArray(listeners) && listeners.length > 0) { | ||
listeners = [...listeners]; | ||
@@ -247,9 +276,22 @@ | ||
const normalizedOptions = this.normalizeOptions(options); | ||
const normalizedOrigin = Agent.normalizeOrigin(origin, options && options.servername); | ||
try { | ||
// Parse origin | ||
if (typeof origin === 'string') { | ||
origin = new URL(origin); | ||
} else if (!(origin instanceof URL)) { | ||
throw new TypeError('The `origin` argument needs to be a string or an URL object'); | ||
} | ||
if (normalizedOrigin === undefined) { | ||
for (const {reject} of listeners) { | ||
reject(new TypeError('The `origin` argument needs to be a string or an URL object')); | ||
if (options) { | ||
// Validate servername | ||
const {servername} = options; | ||
const {hostname} = origin; | ||
if (servername && hostname !== servername) { | ||
throw new Error(`Origin ${hostname} differs from servername ${servername}`); | ||
} | ||
} | ||
} catch (error) { | ||
for (let index = 0; index < listeners.length; index++) { | ||
listeners[index].reject(error); | ||
} | ||
@@ -259,2 +301,5 @@ return; | ||
const normalizedOptions = this.normalizeOptions(options); | ||
const normalizedOrigin = origin.origin; | ||
if (normalizedOptions in this.sessions) { | ||
@@ -269,3 +314,15 @@ const sessions = this.sessions[normalizedOptions]; | ||
// Additionally, we are looking for session which has biggest current pending streams count. | ||
for (const session of sessions) { | ||
// | ||
// |------------| |------------| |------------| |------------| | ||
// | Session: A | | Session: B | | Session: C | | Session: D | | ||
// | Pending: 5 |-| Pending: 8 |-| Pending: 9 |-| Pending: 4 | | ||
// | Max: 10 | | Max: 10 | | Max: 9 | | Max: 5 | | ||
// |------------| |------------| |------------| |------------| | ||
// ^ | ||
// | | ||
// pick this one -- | ||
// | ||
for (let index = 0; index < sessions.length; index++) { | ||
const session = sessions[index]; | ||
const sessionMaxConcurrentStreams = session.remoteSettings.maxConcurrentStreams; | ||
@@ -277,45 +334,32 @@ | ||
if (session[kOriginSet].includes(normalizedOrigin)) { | ||
const sessionCurrentStreamsCount = session[kCurrentStreamsCount]; | ||
if (!session[kOriginSet].includes(normalizedOrigin)) { | ||
continue; | ||
} | ||
if ( | ||
sessionCurrentStreamsCount >= sessionMaxConcurrentStreams || | ||
session[kGracefullyClosing] || | ||
// Unfortunately the `close` event isn't called immediately, | ||
// so `session.destroyed` is `true`, but `session.closed` is `false`. | ||
session.destroyed | ||
) { | ||
continue; | ||
} | ||
const sessionCurrentStreamsCount = session[kCurrentStreamCount]; | ||
// We only need set this once. | ||
if (!optimalSession) { | ||
maxConcurrentStreams = sessionMaxConcurrentStreams; | ||
} | ||
if ( | ||
sessionCurrentStreamsCount >= sessionMaxConcurrentStreams || | ||
session[kGracefullyClosing] || | ||
// Unfortunately the `close` event isn't called immediately, | ||
// so `session.destroyed` is `true`, but `session.closed` is `false`. | ||
session.destroyed | ||
) { | ||
continue; | ||
} | ||
// We're looking for the session which has biggest current pending stream count, | ||
// in order to minimalize the amount of active sessions. | ||
if (sessionCurrentStreamsCount > currentStreamsCount) { | ||
optimalSession = session; | ||
currentStreamsCount = sessionCurrentStreamsCount; | ||
} | ||
// We only need set this once. | ||
if (!optimalSession) { | ||
maxConcurrentStreams = sessionMaxConcurrentStreams; | ||
} | ||
// Either get the session which has biggest current stream count or the lowest. | ||
if (this._isBetterSession(sessionCurrentStreamsCount, currentStreamsCount)) { | ||
optimalSession = session; | ||
currentStreamsCount = sessionCurrentStreamsCount; | ||
} | ||
} | ||
if (optimalSession) { | ||
/* istanbul ignore next: safety check */ | ||
if (listeners.length !== 1) { | ||
for (const {reject} of listeners) { | ||
const error = new Error( | ||
`Expected the length of listeners to be 1, got ${listeners.length}.\n` + | ||
'Please report this to https://github.com/szmarczak/http2-wrapper/' | ||
); | ||
reject(error); | ||
} | ||
return; | ||
} | ||
listeners[0].resolve(optimalSession); | ||
this._accept(optimalSession, listeners, normalizedOrigin, options); | ||
return; | ||
@@ -329,10 +373,8 @@ } | ||
this.queue[normalizedOptions][normalizedOrigin].listeners.push(...listeners); | ||
// This shouldn't be executed here. | ||
// See the comment inside _tryToCreateNewSession. | ||
this._tryToCreateNewSession(normalizedOptions, normalizedOrigin); | ||
return; | ||
} | ||
} else { | ||
this.queue[normalizedOptions] = {}; | ||
this.queue[normalizedOptions] = { | ||
[kLength]: 0 | ||
}; | ||
} | ||
@@ -348,3 +390,3 @@ | ||
if (Object.keys(this.queue[normalizedOptions]).length === 0) { | ||
if (--this.queue[normalizedOptions][kLength] === 0) { | ||
delete this.queue[normalizedOptions]; | ||
@@ -356,8 +398,11 @@ } | ||
// The main logic is here | ||
const entry = () => { | ||
const entry = async () => { | ||
this._sessionCount++; | ||
const name = `${normalizedOrigin}:${normalizedOptions}`; | ||
let receivedSettings = false; | ||
let socket; | ||
try { | ||
const session = http2.connect(origin, { | ||
const computedOptions = { | ||
createConnection: this.createConnection, | ||
@@ -367,8 +412,13 @@ settings: this.settings, | ||
...options | ||
}); | ||
session[kCurrentStreamsCount] = 0; | ||
}; | ||
// A hacky workaround to enable async `createConnection` | ||
socket = await computedOptions.createConnection.call(this, origin, computedOptions); | ||
computedOptions.createConnection = () => socket; | ||
const session = http2.connect(origin, computedOptions); | ||
session[kCurrentStreamCount] = 0; | ||
session[kGracefullyClosing] = false; | ||
const isFree = () => session[kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams; | ||
let wasFree = true; | ||
const isFree = () => session[kCurrentStreamCount] < session.remoteSettings.maxConcurrentStreams; | ||
@@ -381,4 +431,4 @@ session.socket.once('session', tlsSession => { | ||
// Listeners are empty when the session successfully connected. | ||
for (const {reject} of listeners) { | ||
reject(error); | ||
for (let index = 0; index < listeners.length; index++) { | ||
listeners[index].reject(error); | ||
} | ||
@@ -392,3 +442,2 @@ | ||
// Terminates all streams owned by this session. | ||
// TODO: Maybe the streams should have a "Session timed out" error? | ||
session.destroy(); | ||
@@ -398,35 +447,31 @@ }); | ||
session.once('close', () => { | ||
this._sessionCount--; | ||
if (receivedSettings) { | ||
// 1. If it wasn't free then no need to decrease because | ||
// it has been decreased already in session.request(). | ||
// 2. `stream.once('close')` won't increment the count | ||
// because the session is already closed. | ||
if (wasFree) { | ||
this._freeSessionsCount--; | ||
} | ||
// Assumes session `close` is emitted after request `close` | ||
this._emptySessionCount--; | ||
this._sessionsCount--; | ||
// This cannot be moved to the stream logic, | ||
// because there may be a session that hadn't made a single request. | ||
const where = this.sessions[normalizedOptions]; | ||
where.splice(where.indexOf(session), 1); | ||
if (where.length === 0) { | ||
if (where.length === 1) { | ||
delete this.sessions[normalizedOptions]; | ||
} else { | ||
where.splice(where.indexOf(session), 1); | ||
} | ||
} else { | ||
// Broken connection | ||
removeFromQueue(); | ||
const error = new Error('Session closed without receiving a SETTINGS frame'); | ||
error.code = 'HTTP2WRAPPER_NOSETTINGS'; | ||
for (const {reject} of listeners) { | ||
reject(error); | ||
for (let index = 0; index < listeners.length; index++) { | ||
listeners[index].reject(error); | ||
} | ||
removeFromQueue(); | ||
} | ||
// There may be another session awaiting. | ||
this._tryToCreateNewSession(normalizedOptions, normalizedOrigin); | ||
this._processQueue(); | ||
}); | ||
@@ -436,22 +481,32 @@ | ||
const processListeners = () => { | ||
if (!(normalizedOptions in this.queue) || !isFree()) { | ||
const queue = this.queue[normalizedOptions]; | ||
if (!queue) { | ||
return; | ||
} | ||
for (const origin of session[kOriginSet]) { | ||
if (origin in this.queue[normalizedOptions]) { | ||
const {listeners} = this.queue[normalizedOptions][origin]; | ||
const originSet = session[kOriginSet]; | ||
for (let index = 0; index < originSet.length; index++) { | ||
const origin = originSet[index]; | ||
if (origin in queue) { | ||
const {listeners, completed} = queue[origin]; | ||
let index = 0; | ||
// Prevents session overloading. | ||
while (listeners.length !== 0 && isFree()) { | ||
while (index < listeners.length && isFree()) { | ||
// We assume `resolve(...)` calls `request(...)` *directly*, | ||
// otherwise the session will get overloaded. | ||
listeners.shift().resolve(session); | ||
listeners[index].resolve(session); | ||
index++; | ||
} | ||
const where = this.queue[normalizedOptions]; | ||
if (where[origin].listeners.length === 0) { | ||
delete where[origin]; | ||
queue[origin].listeners.splice(0, index); | ||
if (Object.keys(where).length === 0) { | ||
if (queue[origin].listeners.length === 0 && !completed) { | ||
delete queue[origin]; | ||
if (--queue[kLength] === 0) { | ||
delete this.queue[normalizedOptions]; | ||
@@ -472,6 +527,7 @@ break; | ||
session.on('origin', () => { | ||
session[kOriginSet] = session.originSet; | ||
session[kOriginSet] = session.originSet || []; | ||
session[kGracefullyClosing] = false; | ||
closeSessionIfCovered(this.sessions[normalizedOptions], session); | ||
if (!isFree()) { | ||
// The session is full. | ||
if (session[kGracefullyClosing] || !isFree()) { | ||
return; | ||
@@ -482,2 +538,6 @@ } | ||
if (!isFree()) { | ||
return; | ||
} | ||
// Close covered sessions (if possible). | ||
@@ -488,8 +548,2 @@ closeCoveredSessions(this.sessions[normalizedOptions], session); | ||
session.once('remoteSettings', () => { | ||
// Fix Node.js bug preventing the process from exiting | ||
session.ref(); | ||
session.unref(); | ||
this._sessionsCount++; | ||
// The Agent could have been destroyed already. | ||
@@ -499,4 +553,4 @@ if (entry.destroyed) { | ||
for (const listener of listeners) { | ||
listener.reject(error); | ||
for (let index = 0; index < listeners.length; index++) { | ||
listeners[index].reject(error); | ||
} | ||
@@ -508,4 +562,20 @@ | ||
session[kOriginSet] = session.originSet; | ||
session[kOriginSet] = session.originSet || []; | ||
if (session.socket.encrypted) { | ||
const mainOrigin = session[kOriginSet][0]; | ||
if (mainOrigin !== normalizedOrigin) { | ||
const error = new Error(`Requested origin ${normalizedOrigin} does not match server ${mainOrigin}`); | ||
for (let index = 0; index < listeners.length; index++) { | ||
listeners[index].reject(error); | ||
} | ||
session.destroy(); | ||
return; | ||
} | ||
} | ||
removeFromQueue(); | ||
{ | ||
@@ -522,26 +592,24 @@ const where = this.sessions; | ||
this._freeSessionsCount += 1; | ||
receivedSettings = true; | ||
this._emptySessionCount++; | ||
this.emit('session', session); | ||
this._accept(session, listeners, normalizedOrigin, options); | ||
processListeners(); | ||
removeFromQueue(); | ||
// TODO: Close last recently used (or least used?) session | ||
if (session[kCurrentStreamsCount] === 0 && this._freeSessionsCount > this.maxFreeSessions) { | ||
session.close(); | ||
if (session[kCurrentStreamCount] === 0 && this._emptySessionCount > this.maxEmptySessions) { | ||
this.closeEmptySessions(this._emptySessionCount - this.maxEmptySessions); | ||
} | ||
// Check if we haven't managed to execute all listeners. | ||
if (listeners.length !== 0) { | ||
// Request for a new session with predefined listeners. | ||
this.getSession(normalizedOrigin, options, listeners); | ||
listeners.length = 0; | ||
} | ||
// `session.remoteSettings.maxConcurrentStreams` might get increased | ||
session.on('remoteSettings', () => { | ||
if (!isFree()) { | ||
return; | ||
} | ||
processListeners(); | ||
if (!isFree()) { | ||
return; | ||
} | ||
// In case the Origin Set changes | ||
@@ -564,41 +632,27 @@ closeCoveredSessions(this.sessions[normalizedOptions], session); | ||
++session[kCurrentStreamsCount]; | ||
if (session[kCurrentStreamsCount] === session.remoteSettings.maxConcurrentStreams) { | ||
this._freeSessionsCount--; | ||
if (session[kCurrentStreamCount]++ === 0) { | ||
this._emptySessionCount--; | ||
} | ||
stream.once('close', () => { | ||
wasFree = isFree(); | ||
if (--session[kCurrentStreamCount] === 0) { | ||
this._emptySessionCount++; | ||
session.unref(); | ||
--session[kCurrentStreamsCount]; | ||
if (this._emptySessionCount > this.maxEmptySessions || session[kGracefullyClosing]) { | ||
session.close(); | ||
return; | ||
} | ||
} | ||
if (!session.destroyed && !session.closed) { | ||
closeSessionIfCovered(this.sessions[normalizedOptions], session); | ||
if (session.destroyed || session.closed) { | ||
return; | ||
} | ||
if (isFree() && !session.closed) { | ||
if (!wasFree) { | ||
this._freeSessionsCount++; | ||
if (isFree() && !closeSessionIfCovered(this.sessions[normalizedOptions], session)) { | ||
closeCoveredSessions(this.sessions[normalizedOptions], session); | ||
processListeners(); | ||
wasFree = true; | ||
} | ||
const isEmpty = session[kCurrentStreamsCount] === 0; | ||
if (isEmpty) { | ||
session.unref(); | ||
} | ||
if ( | ||
isEmpty && | ||
( | ||
this._freeSessionsCount > this.maxFreeSessions || | ||
session[kGracefullyClosing] | ||
) | ||
) { | ||
session.close(); | ||
} else { | ||
closeCoveredSessions(this.sessions[normalizedOptions], session); | ||
processListeners(); | ||
} | ||
if (session[kCurrentStreamCount] === 0) { | ||
this._processQueue(); | ||
} | ||
@@ -611,7 +665,8 @@ } | ||
} catch (error) { | ||
for (const listener of listeners) { | ||
listener.reject(error); | ||
removeFromQueue(); | ||
this._sessionCount--; | ||
for (let index = 0; index < listeners.length; index++) { | ||
listeners[index].reject(error); | ||
} | ||
removeFromQueue(); | ||
} | ||
@@ -625,3 +680,4 @@ }; | ||
this.queue[normalizedOptions][normalizedOrigin] = entry; | ||
this._tryToCreateNewSession(normalizedOptions, normalizedOrigin); | ||
this.queue[normalizedOptions][kLength]++; | ||
this._processQueue(); | ||
}); | ||
@@ -636,3 +692,8 @@ } | ||
try { | ||
resolve(session.request(headers, streamOptions)); | ||
const stream = session.request(headers, streamOptions); | ||
// Do not throw before `request(...)` has been awaited | ||
delayAsyncDestroy(stream); | ||
resolve(stream); | ||
} catch (error) { | ||
@@ -646,3 +707,3 @@ reject(error); | ||
createConnection(origin, options) { | ||
async createConnection(origin, options) { | ||
return Agent.connect(origin, options); | ||
@@ -655,3 +716,3 @@ } | ||
const port = origin.port || 443; | ||
const host = origin.hostname || origin.host; | ||
const host = origin.hostname; | ||
@@ -662,25 +723,60 @@ if (typeof options.servername === 'undefined') { | ||
return tls.connect(port, host, options); | ||
const socket = tls.connect(port, host, options); | ||
if (options.socket) { | ||
socket._peername = { | ||
family: undefined, | ||
address: undefined, | ||
port | ||
}; | ||
} | ||
return socket; | ||
} | ||
closeFreeSessions() { | ||
for (const sessions of Object.values(this.sessions)) { | ||
for (const session of sessions) { | ||
if (session[kCurrentStreamsCount] === 0) { | ||
closeEmptySessions(maxCount = Number.POSITIVE_INFINITY) { | ||
let closedCount = 0; | ||
const {sessions} = this; | ||
// eslint-disable-next-line guard-for-in | ||
for (const key in sessions) { | ||
const thisSessions = sessions[key]; | ||
for (let index = 0; index < thisSessions.length; index++) { | ||
const session = thisSessions[index]; | ||
if (session[kCurrentStreamCount] === 0) { | ||
closedCount++; | ||
session.close(); | ||
if (closedCount >= maxCount) { | ||
return closedCount; | ||
} | ||
} | ||
} | ||
} | ||
return closedCount; | ||
} | ||
destroy(reason) { | ||
for (const sessions of Object.values(this.sessions)) { | ||
for (const session of sessions) { | ||
session.destroy(reason); | ||
const {sessions, queue} = this; | ||
// eslint-disable-next-line guard-for-in | ||
for (const key in sessions) { | ||
const thisSessions = sessions[key]; | ||
for (let index = 0; index < thisSessions.length; index++) { | ||
thisSessions[index].destroy(reason); | ||
} | ||
} | ||
for (const entriesOfAuthority of Object.values(this.queue)) { | ||
for (const entry of Object.values(entriesOfAuthority)) { | ||
entry.destroyed = true; | ||
// eslint-disable-next-line guard-for-in | ||
for (const normalizedOptions in queue) { | ||
const entries = queue[normalizedOptions]; | ||
// eslint-disable-next-line guard-for-in | ||
for (const normalizedOrigin in entries) { | ||
entries[normalizedOrigin].destroyed = true; | ||
} | ||
@@ -691,14 +787,19 @@ } | ||
this.queue = {}; | ||
this.tlsSessionCache.clear(); | ||
} | ||
get freeSessions() { | ||
return getSessions({agent: this, isFree: true}); | ||
get emptySessionCount() { | ||
return this._emptySessionCount; | ||
} | ||
get busySessions() { | ||
return getSessions({agent: this, isFree: false}); | ||
get pendingSessionCount() { | ||
return this._sessionCount - this._emptySessionCount; | ||
} | ||
get sessionCount() { | ||
return this._sessionCount; | ||
} | ||
} | ||
Agent.kCurrentStreamsCount = kCurrentStreamsCount; | ||
Agent.kCurrentStreamCount = kCurrentStreamCount; | ||
Agent.kGracefullyClosing = kGracefullyClosing; | ||
@@ -705,0 +806,0 @@ |
'use strict'; | ||
// See https://github.com/facebook/jest/issues/2549 | ||
// eslint-disable-next-line node/prefer-global/url | ||
const {URL, urlToHttpOptions} = require('url'); | ||
const http = require('http'); | ||
@@ -6,5 +9,5 @@ const https = require('https'); | ||
const QuickLRU = require('quick-lru'); | ||
const {Agent, globalAgent} = require('./agent'); | ||
const Http2ClientRequest = require('./client-request'); | ||
const calculateServerName = require('./utils/calculate-server-name'); | ||
const urlToOptions = require('./utils/url-to-options'); | ||
@@ -29,2 +32,15 @@ const cache = new QuickLRU({maxSize: 100}); | ||
const onTimeout = () => { | ||
const {freeSockets} = agent; | ||
for (const sockets of Object.values(freeSockets)) { | ||
if (sockets.includes(socket)) { | ||
socket.destroy(); | ||
return; | ||
} | ||
} | ||
}; | ||
socket.on('timeout', onTimeout); | ||
const onRemove = () => { | ||
@@ -34,2 +50,3 @@ agent.removeSocket(socket, options); | ||
socket.off('free', onFree); | ||
socket.off('timeout', onTimeout); | ||
socket.off('agentRemove', onRemove); | ||
@@ -49,6 +66,6 @@ }; | ||
const result = await queue.get(name); | ||
return result.alpnProtocol; | ||
return {alpnProtocol: result.alpnProtocol}; | ||
} | ||
const {path, agent} = options; | ||
const {path} = options; | ||
options.path = options.socketPath; | ||
@@ -60,33 +77,15 @@ | ||
try { | ||
const {socket, alpnProtocol} = await resultPromise; | ||
cache.set(name, alpnProtocol); | ||
const result = await resultPromise; | ||
cache.set(name, result.alpnProtocol); | ||
queue.delete(name); | ||
options.path = path; | ||
if (alpnProtocol === 'h2') { | ||
// https://github.com/nodejs/node/issues/33343 | ||
socket.destroy(); | ||
} else { | ||
const {globalAgent} = https; | ||
const defaultCreateConnection = https.Agent.prototype.createConnection; | ||
if (agent) { | ||
if (agent.createConnection === defaultCreateConnection) { | ||
installSocket(agent, socket, options); | ||
} else { | ||
socket.destroy(); | ||
} | ||
} else if (globalAgent.createConnection === defaultCreateConnection) { | ||
installSocket(globalAgent, socket, options); | ||
} else { | ||
socket.destroy(); | ||
} | ||
} | ||
queue.delete(name); | ||
return alpnProtocol; | ||
return result; | ||
} catch (error) { | ||
queue.delete(name); | ||
options.path = path; | ||
throw error; | ||
@@ -96,21 +95,24 @@ } | ||
return cache.get(name); | ||
return {alpnProtocol: cache.get(name)}; | ||
}; | ||
module.exports = async (input, options, callback) => { | ||
if (typeof input === 'string' || input instanceof URL) { | ||
input = urlToOptions(new URL(input)); | ||
if (typeof input === 'string') { | ||
input = urlToHttpOptions(new URL(input)); | ||
} else if (input instanceof URL) { | ||
input = urlToHttpOptions(input); | ||
} else { | ||
input = {...input}; | ||
} | ||
if (typeof options === 'function') { | ||
if (typeof options === 'function' || options === undefined) { | ||
// (options, callback) | ||
callback = options; | ||
options = undefined; | ||
options = input; | ||
} else { | ||
// (input, options, callback) | ||
options = Object.assign(input, options); | ||
} | ||
options = { | ||
ALPNProtocols: ['h2', 'http/1.1'], | ||
...input, | ||
...options, | ||
resolveSocket: true | ||
}; | ||
options.ALPNProtocols = options.ALPNProtocols || ['h2', 'http/1.1']; | ||
@@ -130,22 +132,54 @@ if (!Array.isArray(options.ALPNProtocols) || options.ALPNProtocols.length === 0) { | ||
const agents = options.agent; | ||
// Note: We don't support `h2session` here | ||
if (agents) { | ||
if (agents.addRequest) { | ||
throw new Error('The `options.agent` object can contain only `http`, `https` or `http2` properties'); | ||
} | ||
options.agent = agents[isHttps ? 'https' : 'http']; | ||
let {agent} = options; | ||
if (agent !== undefined && agent !== false && agent.constructor.name !== 'Object') { | ||
throw new Error('The `options.agent` can be only an object `http`, `https` or `http2` properties'); | ||
} | ||
if (isHttps) { | ||
const protocol = await resolveProtocol(options); | ||
options.resolveSocket = true; | ||
if (protocol === 'h2') { | ||
if (agents) { | ||
options.agent = agents.http2; | ||
let {socket, alpnProtocol} = await resolveProtocol(options); | ||
// We can't accept custom `createConnection` because the API is different for HTTP/2 | ||
if (socket && options.createConnection) { | ||
socket.destroy(); | ||
socket = undefined; | ||
} | ||
delete options.resolveSocket; | ||
const isHttp2 = alpnProtocol === 'h2'; | ||
if (agent) { | ||
agent = isHttp2 ? agent.http2 : agent.https; | ||
options.agent = agent; | ||
} else if (agent === undefined) { | ||
agent = isHttp2 ? globalAgent : https.globalAgent; | ||
} | ||
if (socket) { | ||
if (agent === false) { | ||
socket.destroy(); | ||
} else { | ||
const defaultCreateConnection = (isHttp2 ? Agent : https.Agent).prototype.createConnection; | ||
if (agent.createConnection === defaultCreateConnection) { | ||
if (isHttp2) { | ||
options._reuseSocket = socket; | ||
} else { | ||
installSocket(agent, socket, options); | ||
} | ||
} else { | ||
socket.destroy(); | ||
} | ||
} | ||
} | ||
if (isHttp2) { | ||
return new Http2ClientRequest(options, callback); | ||
} | ||
} else if (agent) { | ||
options.agent = agent.http; | ||
} | ||
@@ -157,1 +191,2 @@ | ||
module.exports.protocolCache = cache; | ||
module.exports.resolveProtocol = resolveProtocol; |
'use strict'; | ||
// See https://github.com/facebook/jest/issues/2549 | ||
// eslint-disable-next-line node/prefer-global/url | ||
const {URL, urlToHttpOptions} = require('url'); | ||
const http2 = require('http2'); | ||
@@ -6,13 +9,11 @@ const {Writable} = require('stream'); | ||
const IncomingMessage = require('./incoming-message'); | ||
const urlToOptions = require('./utils/url-to-options'); | ||
const proxyEvents = require('./utils/proxy-events'); | ||
const isRequestPseudoHeader = require('./utils/is-request-pseudo-header'); | ||
const { | ||
ERR_INVALID_ARG_TYPE, | ||
ERR_INVALID_PROTOCOL, | ||
ERR_HTTP_HEADERS_SENT, | ||
ERR_INVALID_HTTP_TOKEN, | ||
ERR_HTTP_INVALID_HEADER_VALUE, | ||
ERR_INVALID_CHAR | ||
ERR_HTTP_HEADERS_SENT | ||
} = require('./utils/errors'); | ||
const validateHeaderName = require('./utils/validate-header-name'); | ||
const validateHeaderValue = require('./utils/validate-header-value'); | ||
const proxySocketHandler = require('./utils/proxy-socket-handler'); | ||
@@ -32,6 +33,4 @@ const { | ||
const kJobs = Symbol('jobs'); | ||
const kPendingAgentPromise = Symbol('pendingAgentPromise'); | ||
const isValidHttpToken = /^[\^`\-\w!#$%&*+.|~]+$/; | ||
const isInvalidHeaderValue = /[^\t\u0020-\u007E\u0080-\u00FF]/; | ||
class ClientRequest extends Writable { | ||
@@ -43,5 +42,8 @@ constructor(input, options, callback) { | ||
const hasInput = typeof input === 'string' || input instanceof URL; | ||
if (hasInput) { | ||
input = urlToOptions(input instanceof URL ? input : new URL(input)); | ||
if (typeof input === 'string') { | ||
input = urlToHttpOptions(new URL(input)); | ||
} else if (input instanceof URL) { | ||
input = urlToHttpOptions(input); | ||
} else { | ||
input = {...input}; | ||
} | ||
@@ -52,6 +54,6 @@ | ||
callback = options; | ||
options = hasInput ? input : {...input}; | ||
options = input; | ||
} else { | ||
// (input, options, callback) | ||
options = {...input, ...options}; | ||
options = Object.assign(input, options); | ||
} | ||
@@ -61,29 +63,34 @@ | ||
this[kSession] = options.h2session; | ||
if (this[kSession].destroyed) { | ||
throw new Error('The session has been closed already'); | ||
} | ||
this.protocol = this[kSession].socket.encrypted ? 'https:' : 'http:'; | ||
} else if (options.agent === false) { | ||
this.agent = new Agent({maxFreeSessions: 0}); | ||
this.agent = new Agent({maxEmptySessions: 0}); | ||
} else if (typeof options.agent === 'undefined' || options.agent === null) { | ||
if (typeof options.createConnection === 'function') { | ||
// This is a workaround - we don't have to create the session on our own. | ||
this.agent = new Agent({maxFreeSessions: 0}); | ||
this.agent.createConnection = options.createConnection; | ||
} else { | ||
this.agent = globalAgent; | ||
} | ||
this.agent = globalAgent; | ||
} else if (typeof options.agent.request === 'function') { | ||
this.agent = options.agent; | ||
} else { | ||
throw new ERR_INVALID_ARG_TYPE('options.agent', ['Agent-like Object', 'undefined', 'false'], options.agent); | ||
throw new ERR_INVALID_ARG_TYPE('options.agent', ['http2wrapper.Agent-like Object', 'undefined', 'false'], options.agent); | ||
} | ||
if (options.protocol && options.protocol !== 'https:') { | ||
throw new ERR_INVALID_PROTOCOL(options.protocol, 'https:'); | ||
if (this.agent) { | ||
this.protocol = this.agent.protocol; | ||
} | ||
const port = options.port || options.defaultPort || (this.agent && this.agent.defaultPort) || 443; | ||
const host = options.hostname || options.host || 'localhost'; | ||
if (options.protocol && options.protocol !== this.protocol) { | ||
throw new ERR_INVALID_PROTOCOL(options.protocol, this.protocol); | ||
} | ||
// Don't enforce the origin via options. It may be changed in an Agent. | ||
if (!options.port) { | ||
options.port = options.defaultPort || (this.agent && this.agent.defaultPort) || 443; | ||
} | ||
options.host = options.hostname || options.host || 'localhost'; | ||
// Unused | ||
delete options.hostname; | ||
delete options.host; | ||
delete options.port; | ||
@@ -96,2 +103,4 @@ const {timeout} = options; | ||
this[kPendingAgentPromise] = undefined; | ||
this.socket = null; | ||
@@ -107,5 +116,7 @@ this.connection = null; | ||
if (options.headers) { | ||
for (const [header, value] of Object.entries(options.headers)) { | ||
this.setHeader(header, value); | ||
const {headers} = options; | ||
if (headers) { | ||
// eslint-disable-next-line guard-for-in | ||
for (const header in headers) { | ||
this.setHeader(header, headers[header]); | ||
} | ||
@@ -124,14 +135,16 @@ } | ||
// Clients that generate HTTP/2 requests directly SHOULD use the :authority pseudo-header field instead of the Host header field. | ||
if (port === 443) { | ||
this[kOrigin] = `https://${host}`; | ||
this[kOrigin] = new URL(`${this.protocol}//${options.servername || options.host}:${options.port}`); | ||
if (!(':authority' in this[kHeaders])) { | ||
this[kHeaders][':authority'] = host; | ||
} | ||
} else { | ||
this[kOrigin] = `https://${host}:${port}`; | ||
// A socket is being reused | ||
const reuseSocket = options._reuseSocket; | ||
if (reuseSocket) { | ||
options.createConnection = (...args) => { | ||
if (reuseSocket.destroyed) { | ||
return this.agent.createConnection(...args); | ||
} | ||
if (!(':authority' in this[kHeaders])) { | ||
this[kHeaders][':authority'] = `${host}:${port}`; | ||
} | ||
return reuseSocket; | ||
}; | ||
this.agent.getSession(this[kOrigin], this[kOptions]).catch(() => {}); | ||
} | ||
@@ -170,2 +183,10 @@ | ||
get host() { | ||
return this[kOrigin].hostname; | ||
} | ||
set host(_value) { | ||
// Do nothing as this is read only. | ||
} | ||
get _mustNotHaveABody() { | ||
@@ -194,6 +215,2 @@ return this.method === 'GET' || this.method === 'HEAD' || this.method === 'DELETE'; | ||
_final(callback) { | ||
if (this.destroyed) { | ||
return; | ||
} | ||
this.flushHeaders(); | ||
@@ -232,3 +249,3 @@ | ||
_destroy(error, callback) { | ||
async _destroy(error, callback) { | ||
if (this.res) { | ||
@@ -240,4 +257,16 @@ this.res._dump(); | ||
this._request.destroy(); | ||
} else { | ||
process.nextTick(() => { | ||
this.emit('close'); | ||
}); | ||
} | ||
try { | ||
await this[kPendingAgentPromise]; | ||
} catch (internalError) { | ||
if (this.aborted) { | ||
error = internalError; | ||
} | ||
} | ||
callback(error); | ||
@@ -266,21 +295,22 @@ } | ||
if (!isConnectMethod) { | ||
proxyEvents(stream, this, ['timeout', 'continue', 'close', 'error']); | ||
proxyEvents(stream, this, ['timeout', 'continue', 'close']); | ||
} | ||
// Wait for the `finish` event. We don't want to emit the `response` event | ||
// before `request.end()` is called. | ||
const waitForEnd = fn => { | ||
return (...args) => { | ||
if (!this.writable && !this.destroyed) { | ||
fn(...args); | ||
} else { | ||
this.once('finish', () => { | ||
fn(...args); | ||
}); | ||
} | ||
}; | ||
}; | ||
stream.once('error', error => { | ||
this.destroy(error); | ||
}); | ||
stream.once('aborted', () => { | ||
const {res} = this; | ||
if (res) { | ||
res.aborted = true; | ||
res.emit('aborted'); | ||
res.destroy(); | ||
} else { | ||
this.destroy(new Error('The server aborted the HTTP/2 stream')); | ||
} | ||
}); | ||
// This event tells we are ready to listen for the data. | ||
stream.once('response', waitForEnd((headers, flags, rawHeaders) => { | ||
stream.once('response', (headers, flags, rawHeaders) => { | ||
// If we were to emit raw request stream, it would be as fast as the native approach. | ||
@@ -291,2 +321,5 @@ // Note that wrapping the raw stream in a Proxy instance won't improve the performance (already tested it). | ||
// Undocumented, but it used by `cacheable-request` | ||
response.url = `${this[kOrigin].origin}${this.path}`; | ||
response.req = this; | ||
@@ -298,12 +331,7 @@ response.statusCode = headers[HTTP2_HEADER_STATUS]; | ||
response.once('end', () => { | ||
if (this.aborted) { | ||
response.aborted = true; | ||
response.emit('aborted'); | ||
} else { | ||
response.complete = true; | ||
response.complete = true; | ||
// Has no effect, just be consistent with the Node.js behavior | ||
response.socket = null; | ||
response.connection = null; | ||
} | ||
// Has no effect, just be consistent with the Node.js behavior | ||
response.socket = null; | ||
response.connection = null; | ||
}); | ||
@@ -331,3 +359,5 @@ | ||
stream.once('end', () => { | ||
response.push(null); | ||
if (!this.aborted) { | ||
response.push(null); | ||
} | ||
}); | ||
@@ -340,10 +370,8 @@ | ||
} | ||
})); | ||
}); | ||
// Emits `information` event | ||
stream.once('headers', waitForEnd( | ||
headers => this.emit('information', {statusCode: headers[HTTP2_HEADER_STATUS]}) | ||
)); | ||
stream.once('headers', headers => this.emit('information', {statusCode: headers[HTTP2_HEADER_STATUS]})); | ||
stream.once('trailers', waitForEnd((trailers, flags, rawTrailers) => { | ||
stream.once('trailers', (trailers, flags, rawTrailers) => { | ||
const {res} = this; | ||
@@ -354,8 +382,35 @@ | ||
res.rawTrailers = rawTrailers; | ||
})); | ||
}); | ||
const {socket} = stream.session; | ||
this.socket = socket; | ||
this.connection = socket; | ||
stream.once('close', () => { | ||
const {aborted, res} = this; | ||
if (res) { | ||
if (aborted) { | ||
res.aborted = true; | ||
res.emit('aborted'); | ||
res.destroy(); | ||
} | ||
const finish = () => { | ||
res.emit('close'); | ||
this.destroy(); | ||
this.emit('close'); | ||
}; | ||
if (res.readable) { | ||
res.once('end', finish); | ||
} else { | ||
finish(); | ||
} | ||
return; | ||
} | ||
this.destroy(); | ||
this.emit('close'); | ||
}); | ||
this.socket = new Proxy(stream, proxySocketHandler); | ||
for (const job of this[kJobs]) { | ||
@@ -368,2 +423,6 @@ job(); | ||
if (!(':authority' in this[kHeaders]) && !isConnectMethod) { | ||
this[kHeaders][':authority'] = this[kOrigin].host; | ||
} | ||
// Makes a HTTP2 request | ||
@@ -374,3 +433,3 @@ if (this[kSession]) { | ||
} catch (error) { | ||
this.emit('error', error); | ||
this.destroy(error); | ||
} | ||
@@ -381,5 +440,12 @@ } else { | ||
try { | ||
onStream(await this.agent.request(this[kOrigin], this[kOptions], this[kHeaders])); | ||
const promise = this.agent.request(this[kOrigin], this[kOptions], this[kHeaders]); | ||
this[kPendingAgentPromise] = promise; | ||
onStream(await promise); | ||
this[kPendingAgentPromise] = false; | ||
} catch (error) { | ||
this.emit('error', error); | ||
this[kPendingAgentPromise] = false; | ||
this.destroy(error); | ||
} | ||
@@ -389,2 +455,22 @@ } | ||
get connection() { | ||
return this.socket; | ||
} | ||
set connection(value) { | ||
this.socket = value; | ||
} | ||
getHeaderNames() { | ||
return Object.keys(this[kHeaders]); | ||
} | ||
hasHeader(name) { | ||
if (typeof name !== 'string') { | ||
throw new ERR_INVALID_ARG_TYPE('name', 'string', name); | ||
} | ||
return Boolean(this[kHeaders][name.toLowerCase()]); | ||
} | ||
getHeader(name) { | ||
@@ -419,14 +505,5 @@ if (typeof name !== 'string') { | ||
if (typeof name !== 'string' || (!isValidHttpToken.test(name) && !isRequestPseudoHeader(name))) { | ||
throw new ERR_INVALID_HTTP_TOKEN('Header name', name); | ||
} | ||
validateHeaderName(name); | ||
validateHeaderValue(name, value); | ||
if (typeof value === 'undefined') { | ||
throw new ERR_HTTP_INVALID_HEADER_VALUE(value, name); | ||
} | ||
if (isInvalidHeaderValue.test(value)) { | ||
throw new ERR_INVALID_CHAR('header content', name); | ||
} | ||
this[kHeaders][name.toLowerCase()] = value; | ||
@@ -433,0 +510,0 @@ } |
@@ -7,4 +7,5 @@ 'use strict'; | ||
super({ | ||
highWaterMark, | ||
autoDestroy: false | ||
emitClose: false, | ||
autoDestroy: true, | ||
highWaterMark | ||
}); | ||
@@ -29,3 +30,2 @@ | ||
this.socket = socket; | ||
this.connection = socket; | ||
@@ -35,3 +35,18 @@ this._dumped = false; | ||
_destroy(error) { | ||
get connection() { | ||
return this.socket; | ||
} | ||
set connection(value) { | ||
this.socket = value; | ||
} | ||
_destroy(error, callback) { | ||
if (!this.readableEnded) { | ||
this.aborted = true; | ||
} | ||
// See https://github.com/nodejs/node/issues/35303 | ||
callback(); | ||
this.req._request.destroy(error); | ||
@@ -38,0 +53,0 @@ } |
'use strict'; | ||
const http2 = require('http2'); | ||
const agent = require('./agent'); | ||
const { | ||
Agent, | ||
globalAgent | ||
} = require('./agent'); | ||
const ClientRequest = require('./client-request'); | ||
const IncomingMessage = require('./incoming-message'); | ||
const auto = require('./auto'); | ||
const { | ||
HttpOverHttp2, | ||
HttpsOverHttp2 | ||
} = require('./proxies/h1-over-h2'); | ||
const Http2OverHttp2 = require('./proxies/h2-over-h2'); | ||
const { | ||
Http2OverHttp, | ||
Http2OverHttps | ||
} = require('./proxies/h2-over-h1'); | ||
const validateHeaderName = require('./utils/validate-header-name'); | ||
const validateHeaderValue = require('./utils/validate-header-value'); | ||
@@ -24,6 +38,16 @@ const request = (url, options, callback) => { | ||
IncomingMessage, | ||
...agent, | ||
Agent, | ||
globalAgent, | ||
request, | ||
get, | ||
auto | ||
auto, | ||
proxies: { | ||
HttpOverHttp2, | ||
HttpsOverHttp2, | ||
Http2OverHttp2, | ||
Http2OverHttp, | ||
Http2OverHttps | ||
}, | ||
validateHeaderName, | ||
validateHeaderValue | ||
}; |
@@ -12,7 +12,3 @@ 'use strict'; | ||
const index = hostHeader.indexOf(']'); | ||
if (index === -1) { | ||
servername = hostHeader; | ||
} else { | ||
servername = hostHeader.slice(1, -1); | ||
} | ||
servername = index === -1 ? hostHeader : hostHeader.slice(1, -1); | ||
} else { | ||
@@ -19,0 +15,0 @@ servername = hostHeader.split(':', 1)[0]; |
@@ -46,1 +46,7 @@ 'use strict'; | ||
}); | ||
makeError( | ||
Error, | ||
'ERR_HTTP2_NO_SOCKET_MANIPULATION', | ||
'HTTP/2 sockets should not be directly manipulated (e.g. read and written)' | ||
); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
68721
26
1794
0
17
373
13