Comparing version 11.0.0-2 to 11.0.0-3
@@ -56,2 +56,38 @@ /// <reference types="node" /> | ||
export declare function normalizeUserAgent<T extends Request, U extends Response>(options?: NormalizeUserAgentOptions): Middleware<T, U>; | ||
export declare class ConnectionManager<T> { | ||
connections: Map<string, T>; | ||
get(key: string): T | undefined; | ||
set(key: string, connection: T): T; | ||
delete(key: string, connection: T): T; | ||
} | ||
export interface ConcurrencyConnectionManagerOptions { | ||
maxConnections?: number; | ||
maxFreeConnections?: number; | ||
} | ||
export interface ConnectionSet<T> { | ||
used?: Set<T>; | ||
free?: Set<T>; | ||
pend?: Array<(connection?: T) => void>; | ||
} | ||
/** | ||
* Manage HTTP connection reuse. | ||
*/ | ||
export declare class ConcurrencyConnectionManager<T> extends ConnectionManager<ConnectionSet<T>> { | ||
protected options: ConcurrencyConnectionManagerOptions; | ||
maxConnections: number; | ||
maxFreeConnections: number; | ||
constructor(options?: ConcurrencyConnectionManagerOptions); | ||
/** | ||
* Create a new connection. | ||
*/ | ||
ready(key: string, onReady: (existingConnection?: T) => void): void; | ||
getUsedConnection(key: string): T | undefined; | ||
getFreeConnection(key: string): T | undefined; | ||
use(key: string, connection: T): void; | ||
freed(key: string, connection: T, discard: () => void): void; | ||
remove(key: string, connection: T): void; | ||
} | ||
/** | ||
* Configure HTTP version negotiation. | ||
*/ | ||
export declare enum NegotiateHttpVersion { | ||
@@ -66,2 +102,4 @@ HTTP1_ONLY = 0, | ||
export interface SendOptions { | ||
keepAlive?: number; | ||
servername?: string; | ||
rejectUnauthorized?: boolean; | ||
@@ -68,0 +106,0 @@ ca?: string | Buffer | Array<string | Buffer>; |
@@ -111,2 +111,117 @@ "use strict"; | ||
exports.normalizeUserAgent = normalizeUserAgent; | ||
class ConnectionManager { | ||
constructor() { | ||
this.connections = new Map(); | ||
} | ||
get(key) { | ||
return this.connections.get(key); | ||
} | ||
set(key, connection) { | ||
if (this.connections.has(key)) | ||
throw new TypeError('Connection exists for key'); | ||
this.connections.set(key, connection); | ||
return connection; | ||
} | ||
delete(key, connection) { | ||
const existing = this.connections.get(key); | ||
if (existing !== connection) | ||
throw new TypeError('Connection for key does not match'); | ||
this.connections.delete(key); | ||
return connection; | ||
} | ||
} | ||
exports.ConnectionManager = ConnectionManager; | ||
/** | ||
* Manage HTTP connection reuse. | ||
*/ | ||
class ConcurrencyConnectionManager extends ConnectionManager { | ||
constructor(options = {}) { | ||
super(); | ||
this.options = options; | ||
this.maxConnections = Infinity; | ||
this.maxFreeConnections = 256; | ||
if (options.maxConnections) | ||
this.maxConnections = options.maxConnections; | ||
if (options.maxFreeConnections) | ||
this.maxFreeConnections = options.maxFreeConnections; | ||
} | ||
/** | ||
* Create a new connection. | ||
*/ | ||
ready(key, onReady) { | ||
const pool = this.get(key) || this.set(key, Object.create(null)); | ||
// Reuse free connections first. | ||
if (pool.free) | ||
return onReady(this.getFreeConnection(key)); | ||
// If no other connections exist, `onReady` immediately. | ||
if (!pool.used) | ||
return onReady(); | ||
// Add to "pending" queue. | ||
if (pool.used.size >= this.maxConnections) { | ||
if (!pool.pend) | ||
pool.pend = []; | ||
pool.pend.push(onReady); | ||
return; | ||
} | ||
return onReady(); | ||
} | ||
getUsedConnection(key) { | ||
const pool = this.get(key); | ||
if (pool && pool.used) | ||
return pool.used.values().next().value; | ||
} | ||
getFreeConnection(key) { | ||
const pool = this.get(key); | ||
if (pool && pool.free) | ||
return pool.free.values().next().value; | ||
} | ||
use(key, connection) { | ||
const pool = this.get(key) || this.set(key, Object.create(null)); | ||
if (pool.free) | ||
pool.free.delete(connection); | ||
if (!pool.used) | ||
pool.used = new Set(); | ||
pool.used.add(connection); | ||
} | ||
freed(key, connection, discard) { | ||
const pool = this.get(key) || this.set(key, Object.create(null)); | ||
// Remove from any possible "used". | ||
if (pool.used) | ||
pool.used.delete(connection); | ||
// Immediately send for connection. | ||
if (pool.pend) { | ||
const onReady = pool.pend.shift(); | ||
onReady(connection); | ||
if (!pool.pend.length) | ||
delete pool.pend; | ||
} | ||
// Add to "free" connections pool. | ||
if (!pool.free) | ||
pool.free = new Set(); | ||
if (pool.free.size >= this.maxFreeConnections) | ||
return discard(); | ||
pool.free.add(connection); | ||
} | ||
remove(key, connection) { | ||
const pool = this.get(key); | ||
if (!pool) | ||
return; | ||
if (pool.used && pool.used.has(connection)) { | ||
pool.used.delete(connection); | ||
if (!pool.used.size) | ||
delete pool.used; | ||
} | ||
if (pool.free && pool.free.has(connection)) { | ||
pool.free.delete(connection); | ||
if (!pool.free.size) | ||
delete pool.free; | ||
} | ||
if (!pool.free && !pool.used && !pool.pend) | ||
this.delete(key, pool); | ||
} | ||
} | ||
exports.ConcurrencyConnectionManager = ConcurrencyConnectionManager; | ||
/** | ||
* Configure HTTP version negotiation. | ||
*/ | ||
var NegotiateHttpVersion; | ||
@@ -135,6 +250,10 @@ (function (NegotiateHttpVersion) { | ||
} | ||
// Global connection caches. | ||
const globalNetConnections = new ConcurrencyConnectionManager(); | ||
const globalTlsConnections = new ConcurrencyConnectionManager(); | ||
const globalHttp2Connections = new ConnectionManager(); | ||
/** | ||
* Execute HTTP request. | ||
*/ | ||
function execHttp(req, protocol, host, port, socket, agent) { | ||
function execHttp1(req, protocol, host, port, keepAlive, socket) { | ||
return new Promise((resolve, reject) => { | ||
@@ -152,3 +271,2 @@ const { url, body, Url } = req; | ||
auth: Url.auth, | ||
agent, | ||
createConnection: () => socket | ||
@@ -158,2 +276,7 @@ }; | ||
const requestStream = new stream_1.PassThrough(); | ||
// Reuse HTTP connections where possible. | ||
if (keepAlive > 0) { | ||
rawRequest.shouldKeepAlive = true; | ||
rawRequest.setHeader('Connection', 'keep-alive'); | ||
} | ||
// Trigger unavailable error when node.js errors before response. | ||
@@ -170,3 +293,3 @@ function onError(err) { | ||
const trailer = new Promise(resolve => { | ||
rawResponse.on('end', () => resolve(servie_1.createHeaders(rawResponse.rawTrailers))); | ||
rawResponse.once('end', () => resolve(servie_1.createHeaders(rawResponse.rawTrailers))); | ||
}); | ||
@@ -186,4 +309,6 @@ // Replace request error listener behaviour. | ||
rawResponse.on('data', (chunk) => res.bytesTransferred += chunk.length); | ||
rawResponse.on('end', () => res.finished = true); | ||
rawResponse.on('close', () => req.closed = true); | ||
rawResponse.once('end', () => { | ||
req.closed = true; | ||
res.finished = true; | ||
}); | ||
return resolve(res); | ||
@@ -195,3 +320,6 @@ } | ||
req.started = true; | ||
req.events.on('abort', () => rawRequest.abort()); | ||
req.events.once('abort', () => { | ||
socket.emit('agentRemove'); // `abort` destroys the connection with no event. | ||
rawRequest.abort(); | ||
}); | ||
// Track request upload progress. | ||
@@ -206,3 +334,3 @@ requestStream.on('data', (chunk) => req.bytesTransferred += chunk.length); | ||
*/ | ||
function execHttp2(req, protocol, host, port, socket) { | ||
function execHttp2(req, protocol, host, port, client) { | ||
return new Promise((resolve, reject) => { | ||
@@ -214,10 +342,10 @@ // HTTP2 formatted headers. | ||
}); | ||
// TODO: Fix node.js types. | ||
const connectOptions = { | ||
createConnection: () => socket | ||
}; | ||
const authority = `${protocol}//${host}:${port}`; | ||
const client = http2_1.connect(authority, connectOptions); | ||
const http2Stream = client.request(headers, { endStream: false }); | ||
const requestStream = new stream_1.PassThrough(); | ||
ref(client.socket); // Request ref tracking. | ||
// Track when stream finishes. | ||
function onClose() { | ||
req.closed = true; | ||
unref(client.socket); | ||
} | ||
// Trigger unavailable error when node.js errors before response. | ||
@@ -228,4 +356,4 @@ function onError(err) { | ||
function onResponse(headers) { | ||
const encrypted = socket.encrypted === true; | ||
const { localAddress, localPort, remoteAddress, remotePort } = socket; | ||
const encrypted = client.socket.encrypted === true; | ||
const { localAddress, localPort, remoteAddress, remotePort } = client.socket; | ||
// Replace request error listener behaviour with proxy. | ||
@@ -246,16 +374,11 @@ http2Stream.removeListener('error', onError); | ||
http2Stream.on('data', (chunk) => res.bytesTransferred += chunk.length); | ||
// Close HTTP2 session when request ends. | ||
http2Stream.on('end', () => { | ||
req.closed = true; | ||
res.finished = true; | ||
client.close(); | ||
}); | ||
http2Stream.once('end', () => res.finished = true); | ||
return resolve(res); | ||
} | ||
client.once('error', onError); | ||
http2Stream.once('error', onError); | ||
http2Stream.once('close', onClose); | ||
http2Stream.once('response', onResponse); | ||
// https://github.com/serviejs/servie#implementers | ||
req.started = true; | ||
req.events.on('abort', () => http2Stream.destroy()); | ||
req.events.once('abort', () => http2Stream.destroy()); | ||
// Track request upload progress. | ||
@@ -271,4 +394,8 @@ requestStream.on('data', (chunk) => req.bytesTransferred += chunk.length); | ||
function send(options) { | ||
// Mirror common browser behaviour by default. | ||
const { negotiateHttpVersion = NegotiateHttpVersion.HTTP2_FOR_HTTPS } = options; | ||
const { keepAlive = 5000, // Default to keeping a connection open briefly. | ||
negotiateHttpVersion = NegotiateHttpVersion.HTTP2_FOR_HTTPS } = options; | ||
// TODO: Allow configuration in options. | ||
const tlsConnections = globalTlsConnections; | ||
const netConnections = globalNetConnections; | ||
const http2Connections = globalHttp2Connections; | ||
return function (req) { | ||
@@ -279,46 +406,85 @@ const { hostname, protocol } = req.Url; | ||
const port = Number(req.Url.port) || 80; | ||
const socketOptions = { host, port }; | ||
const connectionKey = `${host}:${port}:${negotiateHttpVersion}`; | ||
// Use existing HTTP2 session in HTTP2 mode. | ||
if (negotiateHttpVersion === NegotiateHttpVersion.HTTP2_ONLY) { | ||
return execHttp2(req, protocol, host, port, net_1.connect(socketOptions)); | ||
const existingSession = http2Connections.get(connectionKey); | ||
if (existingSession) | ||
return execHttp2(req, protocol, host, port, existingSession); | ||
} | ||
return execHttp(req, protocol, host, port, net_1.connect(socketOptions), options.agent); | ||
return new Promise((resolve) => { | ||
return netConnections.ready(connectionKey, (freeSocket) => { | ||
const socketOptions = { host, port }; | ||
const socket = freeSocket || setupSocket(connectionKey, keepAlive, netConnections, net_1.connect(socketOptions)); | ||
socket.ref(); | ||
netConnections.use(connectionKey, socket); | ||
if (negotiateHttpVersion === NegotiateHttpVersion.HTTP2_ONLY) { | ||
const authority = `${protocol}//${host}:${port}`; | ||
const client = manageHttp2(authority, connectionKey, keepAlive, http2Connections, socket); | ||
return resolve(execHttp2(req, protocol, host, port, client)); | ||
} | ||
return resolve(execHttp1(req, protocol, host, port, keepAlive, socket)); | ||
}); | ||
}); | ||
} | ||
// Optionally negotiate HTTP2 connection. | ||
if (protocol === 'https:') { | ||
const { ca, cert, key, secureProtocol, secureContext } = options; | ||
const port = Number(req.Url.port) || 443; | ||
const servername = options.servername || calculateServerName(host, req.headers.get('host')); | ||
const rejectUnauthorized = options.rejectUnauthorized !== false; | ||
const connectionKey = `${host}:${port}:${negotiateHttpVersion}:${servername}:${rejectUnauthorized}:${ca || ''}:${cert || ''}:${key || ''}:${secureProtocol || ''}`; | ||
// Use an existing TLS session to speed up handshake. | ||
const existingSocket = tlsConnections.getFreeConnection(connectionKey) || tlsConnections.getUsedConnection(connectionKey); | ||
const session = existingSocket ? existingSocket.getSession() : undefined; | ||
const socketOptions = { | ||
host, | ||
port, | ||
servername: calculateServerName(host, req.headers.get('host')), | ||
rejectUnauthorized: options.rejectUnauthorized !== false, | ||
ca: options.ca, | ||
cert: options.cert, | ||
key: options.key, | ||
secureProtocol: options.secureProtocol, | ||
secureContext: options.secureContext | ||
host, port, servername, rejectUnauthorized, ca, cert, key, | ||
session, secureProtocol, secureContext | ||
}; | ||
if (negotiateHttpVersion === NegotiateHttpVersion.HTTP1_ONLY) { | ||
return execHttp(req, protocol, host, port, tls_1.connect(socketOptions), options.agent); | ||
// Use any existing HTTP2 session. | ||
if (negotiateHttpVersion === NegotiateHttpVersion.HTTP2_ONLY || | ||
negotiateHttpVersion === NegotiateHttpVersion.HTTP2_FOR_HTTPS) { | ||
const existingSession = http2Connections.get(connectionKey); | ||
if (existingSession) | ||
return execHttp2(req, protocol, host, port, existingSession); | ||
} | ||
if (negotiateHttpVersion === NegotiateHttpVersion.HTTP2_ONLY) { | ||
socketOptions.ALPNProtocols = ['h2']; // Only requesting HTTP2 support. | ||
return execHttp2(req, protocol, host, port, tls_1.connect(socketOptions)); | ||
} | ||
return new Promise((resolve, reject) => { | ||
socketOptions.ALPNProtocols = ['h2', 'http/1.1']; // Request HTTP2 or HTTP1. | ||
const socket = tls_1.connect(socketOptions); | ||
socket.once('secureConnect', () => { | ||
const alpnProtocol = socket.alpnProtocol; | ||
// Successfully negotiated HTTP2 connection. | ||
if (alpnProtocol === 'h2') { | ||
return resolve(execHttp2(req, protocol, host, port, socket)); | ||
// Set up ALPN protocols for connection negotiation. | ||
if (negotiateHttpVersion === NegotiateHttpVersion.HTTP2_ONLY) { | ||
socketOptions.ALPNProtocols = ['h2']; | ||
} | ||
else if (negotiateHttpVersion === NegotiateHttpVersion.HTTP2_FOR_HTTPS) { | ||
socketOptions.ALPNProtocols = ['h2', 'http/1.1']; | ||
} | ||
return tlsConnections.ready(connectionKey, (freeSocket) => { | ||
const socket = freeSocket || setupSocket(connectionKey, keepAlive, tlsConnections, tls_1.connect(socketOptions)); | ||
socket.ref(); | ||
tlsConnections.use(connectionKey, socket); | ||
if (negotiateHttpVersion === NegotiateHttpVersion.HTTP1_ONLY) { | ||
return resolve(execHttp1(req, protocol, host, port, keepAlive, socket)); | ||
} | ||
if (alpnProtocol === 'http/1.1') { | ||
return resolve(execHttp(req, protocol, host, port, socket, options.agent)); | ||
if (negotiateHttpVersion === NegotiateHttpVersion.HTTP2_ONLY) { | ||
const client = manageHttp2(`${protocol}//${host}:${port}`, connectionKey, keepAlive, http2Connections, socket); | ||
return resolve(execHttp2(req, protocol, host, port, client)); | ||
} | ||
return reject(new error_1.PopsicleError('No ALPN protocol negotiated', 'EALPNPROTOCOL', req)); | ||
socket.once('secureConnect', () => { | ||
const alpnProtocol = socket.alpnProtocol; | ||
// Successfully negotiated HTTP2 connection. | ||
if (alpnProtocol === 'h2') { | ||
const existingClient = http2Connections.get(connectionKey); | ||
if (existingClient) { | ||
socket.destroy(); // Destroy socket in case of TLS connection race. | ||
return resolve(execHttp2(req, protocol, host, port, existingClient)); | ||
} | ||
const client = manageHttp2(`${protocol}//${host}:${port}`, connectionKey, keepAlive, http2Connections, socket); | ||
return resolve(execHttp2(req, protocol, host, port, client)); | ||
} | ||
if (alpnProtocol === 'http/1.1') { | ||
return resolve(execHttp1(req, protocol, host, port, keepAlive, socket)); | ||
} | ||
return reject(new error_1.PopsicleError('No ALPN protocol negotiated', 'EALPNPROTOCOL', req)); | ||
}); | ||
socket.once('error', (err) => { | ||
return reject(new error_1.PopsicleError(`Unable to connect to ${host}:${port}`, 'EUNAVAILABLE', req, err)); | ||
}); | ||
}); | ||
socket.once('error', (err) => { | ||
return reject(new error_1.PopsicleError(`Unable to connect to ${host}:${port}`, 'EUNAVAILABLE', req, err)); | ||
}); | ||
}); | ||
@@ -331,2 +497,65 @@ } | ||
/** | ||
* Setup the socket with the connection manager. | ||
* | ||
* Ref: https://github.com/nodejs/node/blob/531b4bedcac14044f09129ffb65dab71cc2707d9/lib/_http_agent.js#L254 | ||
*/ | ||
function setupSocket(key, keepAlive, manager, socket) { | ||
const onFree = () => { | ||
if (keepAlive > 0) { | ||
socket.setKeepAlive(true, keepAlive); | ||
socket.unref(); | ||
} | ||
manager.freed(key, socket, () => socket.destroy()); | ||
}; | ||
const onClose = () => manager.remove(key, socket); | ||
const onRemove = () => { | ||
socket.removeListener('free', onFree); | ||
socket.removeListener('close', onClose); | ||
manager.remove(key, socket); | ||
}; | ||
socket.on('free', onFree); | ||
socket.once('close', onClose); | ||
socket.once('agentRemove', onRemove); | ||
return socket; | ||
} | ||
/** | ||
* Set up a HTTP2 working session. | ||
*/ | ||
function manageHttp2(authority, key, keepAlive, manager, socket) { | ||
// TODO: Fix node.js types. | ||
const connectOptions = { createConnection: () => socket }; | ||
const client = http2_1.connect(authority, connectOptions); | ||
manager.set(key, client); | ||
client.once('close', () => manager.delete(key, client)); | ||
client.setTimeout(keepAlive, () => client.close()); | ||
return client; | ||
} | ||
/** | ||
* Track socket usage. | ||
*/ | ||
const SOCKET_REFS = new WeakMap(); | ||
/** | ||
* Track socket refs. | ||
*/ | ||
function ref(socket) { | ||
const count = SOCKET_REFS.get(socket) || 0; | ||
if (count === 0) | ||
socket.ref(); | ||
SOCKET_REFS.set(socket, count + 1); | ||
} | ||
/** | ||
* Track socket unrefs and globally unref. | ||
*/ | ||
function unref(socket) { | ||
const count = SOCKET_REFS.get(socket); | ||
if (!count) | ||
return; | ||
if (count === 1) { | ||
socket.unref(); | ||
SOCKET_REFS.delete(socket); | ||
return; | ||
} | ||
SOCKET_REFS.set(socket, count - 1); | ||
} | ||
/** | ||
* Create a request transport using node.js `http` libraries. | ||
@@ -333,0 +562,0 @@ */ |
{ | ||
"name": "popsicle", | ||
"version": "11.0.0-2", | ||
"version": "11.0.0-3", | ||
"description": "Advanced HTTP requests in node.js and browsers, using Servie", | ||
@@ -67,5 +67,5 @@ "main": "dist/universal.js", | ||
"@types/jest": "^22.2.3", | ||
"@types/node": "^8.0.0", | ||
"@types/node": "^10.1.2", | ||
"body-parser": "^1.18.3", | ||
"browserify": "^14.3.0", | ||
"browserify": "^16.2.2", | ||
"envify": "^4.1.0", | ||
@@ -72,0 +72,0 @@ "express": "^4.16.3", |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
154839
1345