Comparing version 1.2.2 to 1.2.3
@@ -6,2 +6,4 @@ 'use strict' | ||
const undici = require('..') | ||
const { kEnqueue, kGetNext } = require('../lib/symbols') | ||
const Request = require('../lib/request') | ||
@@ -44,3 +46,3 @@ // # Start the h2o server (in h2o repository) | ||
suite | ||
.add('http - keepalive - pipe', { | ||
.add('http - keepalive', { | ||
defer: true, | ||
@@ -62,5 +64,17 @@ fn: deferred => { | ||
}) | ||
.add('undici - pipeline - pipe', { | ||
.add('http - noop', { | ||
defer: true, | ||
fn: deferred => { | ||
http.get(httpOptions, response => { | ||
response | ||
.resume() | ||
.on('end', () => { | ||
deferred.resolve() | ||
}) | ||
}) | ||
} | ||
}) | ||
.add('undici - pipeline', { | ||
defer: true, | ||
fn: deferred => { | ||
pool | ||
@@ -81,3 +95,3 @@ .pipeline(undiciOptions, data => { | ||
}) | ||
.add('undici - request - pipe', { | ||
.add('undici - request', { | ||
defer: true, | ||
@@ -103,3 +117,3 @@ fn: deferred => { | ||
}) | ||
.add('undici - stream - pipe', { | ||
.add('undici - stream', { | ||
defer: true, | ||
@@ -125,2 +139,24 @@ fn: deferred => { | ||
}) | ||
.add('undici - simple', { | ||
defer: true, | ||
fn: deferred => { | ||
const stream = new Writable({ | ||
write (chunk, encoding, callback) { | ||
callback() | ||
} | ||
}) | ||
stream.once('finish', () => { | ||
deferred.resolve() | ||
}) | ||
const client = pool[kGetNext]() | ||
client[kEnqueue](new SimpleRequest(client, undiciOptions, stream)) | ||
} | ||
}) | ||
.add('undici - noop', { | ||
defer: true, | ||
fn: deferred => { | ||
const client = pool[kGetNext]() | ||
client[kEnqueue](new NoopRequest(client, undiciOptions, deferred)) | ||
} | ||
}) | ||
.on('cycle', event => { | ||
@@ -130,1 +166,38 @@ console.log(String(event.target)) | ||
.run() | ||
class NoopRequest extends Request { | ||
constructor (client, opts, deferred) { | ||
super(opts, client) | ||
this.deferred = deferred | ||
} | ||
_onHeaders () {} | ||
_onBody () {} | ||
_onComplete () { | ||
this.deferred.resolve() | ||
} | ||
} | ||
class SimpleRequest extends Request { | ||
constructor (client, opts, dst) { | ||
super(opts, client) | ||
this.dst = dst | ||
this.dst.on('drain', () => { | ||
this.resume() | ||
}) | ||
} | ||
_onHeaders (statusCode, headers, resume) { | ||
this.resume = resume | ||
} | ||
_onBody (chunk, offset, length) { | ||
return this.dst.write(chunk.slice(offset, offset + length)) | ||
} | ||
_onComplete () { | ||
this.dst.end() | ||
} | ||
} |
1114
lib/client.js
@@ -0,25 +1,272 @@ | ||
'use strict' | ||
const { URL } = require('url') | ||
const net = require('net') | ||
const tls = require('tls') | ||
// TODO: This is not really allowed by Node but it works for now. | ||
const { HTTPParser } = process.binding('http_parser') // eslint-disable-line | ||
const EventEmitter = require('events') | ||
const assert = require('assert') | ||
const util = require('./util') | ||
const { | ||
Readable, | ||
Duplex, | ||
PassThrough, | ||
finished | ||
} = require('stream') | ||
const { | ||
ContentLengthMismatchError, | ||
SocketTimeoutError, | ||
InvalidArgumentError, | ||
InvalidReturnValueError, | ||
RequestAbortedError | ||
RequestAbortedError, | ||
ClientDestroyedError, | ||
ClientClosedError, | ||
HeadersTimeoutError, | ||
SocketError, | ||
InformationalError, | ||
NotSupportedError | ||
} = require('./errors') | ||
const { | ||
kUrl, | ||
kReset, | ||
kResume, | ||
kConnect, | ||
kResuming, | ||
kWriting, | ||
kQueue, | ||
kServerName, | ||
kSocketTimeout, | ||
kRequestTimeout, | ||
kTLSOpts, | ||
kClosed, | ||
kDestroyed, | ||
kPendingIdx, | ||
kRunningIdx, | ||
kError, | ||
kOnDestroyed, | ||
kPipelining, | ||
kRetryDelay, | ||
kRetryTimeout, | ||
kMaxAbortedPayload, | ||
kSocket, | ||
kSocketPath, | ||
kEnqueue, | ||
kResume | ||
kMaxHeadersSize, | ||
kHeadersTimeout | ||
} = require('./symbols') | ||
const ClientBase = require('./client-base') | ||
const assert = require('assert') | ||
const util = require('./util') | ||
const makeStream = require('./client-stream') | ||
const makeRequest = require('./client-request') | ||
const makePipeline = require('./client-pipeline') | ||
class Client extends ClientBase { | ||
request (opts, callback) { | ||
const CRLF = Buffer.from('\r\n', 'ascii') | ||
const nodeMajorVersion = parseInt(process.version.split('.')[0].slice(1)) | ||
const insecureHTTPParser = process.execArgv.includes('--insecure-http-parser') | ||
class Client extends EventEmitter { | ||
constructor (url, { | ||
maxAbortedPayload, | ||
maxHeaderSize, | ||
headersTimeout, | ||
socketTimeout, | ||
socketPath, | ||
requestTimeout, | ||
pipelining, | ||
tls | ||
} = {}) { | ||
super() | ||
if (typeof url === 'string') { | ||
url = new URL(url) | ||
} | ||
if (!url || typeof url !== 'object') { | ||
throw new InvalidArgumentError('invalid url') | ||
} | ||
if (url.port != null && url.port !== '' && !Number.isFinite(parseInt(url.port))) { | ||
throw new InvalidArgumentError('invalid port') | ||
} | ||
if (socketPath != null && typeof socketPath !== 'string') { | ||
throw new InvalidArgumentError('invalid socketPath') | ||
} | ||
if (url.hostname != null && typeof url.hostname !== 'string') { | ||
throw new InvalidArgumentError('invalid hostname') | ||
} | ||
if (!/https?/.test(url.protocol)) { | ||
throw new InvalidArgumentError('invalid protocol') | ||
} | ||
if (/\/.+/.test(url.pathname) || url.search || url.hash) { | ||
throw new InvalidArgumentError('invalid url') | ||
} | ||
if (maxAbortedPayload != null && !Number.isFinite(maxAbortedPayload)) { | ||
throw new InvalidArgumentError('invalid maxAbortedPayload') | ||
} | ||
if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) { | ||
throw new InvalidArgumentError('invalid maxHeaderSize') | ||
} | ||
if (socketTimeout != null && !Number.isFinite(socketTimeout)) { | ||
throw new InvalidArgumentError('invalid socketTimeout') | ||
} | ||
if (requestTimeout != null && !Number.isFinite(requestTimeout)) { | ||
throw new InvalidArgumentError('invalid requestTimeout') | ||
} | ||
if (headersTimeout != null && !Number.isFinite(headersTimeout)) { | ||
throw new InvalidArgumentError('invalid headersTimeout') | ||
} | ||
this[kSocket] = null | ||
this[kReset] = false | ||
this[kPipelining] = pipelining || 1 | ||
this[kMaxHeadersSize] = maxHeaderSize || 16384 | ||
this[kHeadersTimeout] = headersTimeout == null ? 30e3 : headersTimeout | ||
this[kUrl] = url | ||
this[kSocketPath] = socketPath | ||
this[kSocketTimeout] = socketTimeout == null ? 30e3 : socketTimeout | ||
this[kRequestTimeout] = requestTimeout == null ? 30e3 : requestTimeout | ||
this[kClosed] = false | ||
this[kDestroyed] = false | ||
this[kServerName] = null | ||
this[kTLSOpts] = tls | ||
this[kRetryDelay] = 0 | ||
this[kRetryTimeout] = null | ||
this[kOnDestroyed] = [] | ||
this[kWriting] = false | ||
this[kResuming] = false | ||
this[kMaxAbortedPayload] = maxAbortedPayload || 1048576 | ||
// kQueue is built up of 3 sections separated by | ||
// the kRunningIdx and kPendingIdx indices. | ||
// | complete | running | pending | | ||
// ^ kRunningIdx ^ kPendingIdx ^ kQueue.length | ||
// kRunningIdx points to the first running element. | ||
// kPendingIdx points to the first pending element. | ||
// This implements a fast queue with an amortized | ||
// time of O(1). | ||
this[kQueue] = [] | ||
this[kRunningIdx] = 0 | ||
this[kPendingIdx] = 0 | ||
} | ||
get pipelining () { | ||
return this[kPipelining] | ||
} | ||
set pipelining (value) { | ||
this[kPipelining] = value | ||
resume(this) | ||
} | ||
get connected () { | ||
return ( | ||
this[kSocket] && | ||
this[kSocket].connecting !== true && | ||
// Older versions of Node don't set secureConnecting to false. | ||
(this[kSocket].authorized !== false || | ||
this[kSocket].authorizationError | ||
) && | ||
!this[kSocket].destroyed | ||
) | ||
} | ||
get pending () { | ||
return this[kQueue].length - this[kPendingIdx] | ||
} | ||
get running () { | ||
return this[kPendingIdx] - this[kRunningIdx] | ||
} | ||
get size () { | ||
return this[kQueue].length - this[kRunningIdx] | ||
} | ||
get busy () { | ||
if (this.running >= this[kPipelining]) { | ||
return true | ||
} | ||
if (this.size >= this[kPipelining]) { | ||
return true | ||
} | ||
if (this[kReset] || this[kWriting]) { | ||
return true | ||
} | ||
if (this[kResuming]) { | ||
for (let n = this[kPendingIdx]; n < this[kQueue].length; n++) { | ||
const { idempotent, body, method } = this[kQueue][n] | ||
if (!idempotent) { | ||
return true | ||
} | ||
if (method === 'HEAD') { | ||
return true | ||
} | ||
if (util.isStream(body) && util.bodyLength(body) !== 0) { | ||
return true | ||
} | ||
} | ||
} else if (this.pending > 0) { | ||
return true | ||
} | ||
return false | ||
} | ||
get destroyed () { | ||
return this[kDestroyed] | ||
} | ||
get closed () { | ||
return this[kClosed] | ||
} | ||
[kResume] () { | ||
resume(this) | ||
} | ||
[kConnect] (cb) { | ||
connect(this) | ||
if (cb) { | ||
if (this.connected) { | ||
process.nextTick(cb) | ||
} else { | ||
this.once('connect', cb) | ||
} | ||
} | ||
} | ||
[kEnqueue] (request) { | ||
try { | ||
if (this[kDestroyed]) { | ||
throw new ClientDestroyedError() | ||
} | ||
if (this[kClosed]) { | ||
throw new ClientClosedError() | ||
} | ||
this[kQueue].push(request) | ||
if (this[kResuming]) { | ||
// Do nothing. | ||
} else if (util.isStream(request.body)) { | ||
this[kResuming] = true | ||
process.nextTick(resume, this) | ||
} else { | ||
resume(this) | ||
} | ||
} catch (err) { | ||
request.onError(err) | ||
} | ||
} | ||
close (callback) { | ||
if (callback === undefined) { | ||
return new Promise((resolve, reject) => { | ||
this.request(opts, (err, data) => { | ||
this.close((err, data) => { | ||
return err ? reject(err) : resolve(data) | ||
@@ -34,318 +281,665 @@ }) | ||
if (!opts || typeof opts !== 'object') { | ||
process.nextTick(callback, new InvalidArgumentError('invalid opts'), null) | ||
if (this[kDestroyed]) { | ||
process.nextTick(callback, new ClientDestroyedError(), null) | ||
return | ||
} | ||
this[kEnqueue](opts, function (err, data) { | ||
if (err) { | ||
process.nextTick(callback, err, null) | ||
return | ||
} | ||
this[kClosed] = true | ||
const { | ||
statusCode, | ||
headers, | ||
opaque, | ||
resume | ||
} = data | ||
if (this.size === 0) { | ||
this.destroy(callback) | ||
} else { | ||
this[kOnDestroyed].push(callback) | ||
} | ||
} | ||
const body = new Readable({ | ||
autoDestroy: true, | ||
read: resume, | ||
destroy (err, callback) { | ||
if (!err && !this._readableState.endEmitted) { | ||
err = new RequestAbortedError() | ||
} | ||
if (err) { | ||
resume() | ||
} | ||
callback(err, null) | ||
} | ||
}) | ||
body.destroy = this.wrap(body, body.destroy) | ||
destroy (err, callback) { | ||
if (typeof err === 'function') { | ||
callback = err | ||
err = null | ||
} | ||
callback(null, { | ||
statusCode, | ||
headers, | ||
opaque, | ||
body | ||
if (callback === undefined) { | ||
return new Promise((resolve, reject) => { | ||
this.destroy(err, (err, data) => { | ||
return err ? /* istanbul ignore next: should never error */ reject(err) : resolve(data) | ||
}) | ||
}) | ||
} | ||
return this.wrap(body, function (err, chunk) { | ||
if (this.destroyed) { | ||
return null | ||
} else if (err) { | ||
this.destroy(err) | ||
} else { | ||
const ret = this.push(chunk) | ||
return this.destroyed ? null : ret | ||
} | ||
}) | ||
}) | ||
if (typeof callback !== 'function') { | ||
throw new InvalidArgumentError('invalid callback') | ||
} | ||
if (this[kDestroyed]) { | ||
if (this[kOnDestroyed]) { | ||
this[kOnDestroyed].push(callback) | ||
} else { | ||
process.nextTick(callback, null, null) | ||
} | ||
return | ||
} | ||
clearTimeout(this[kRetryTimeout]) | ||
this[kRetryTimeout] = null | ||
this[kClosed] = true | ||
this[kDestroyed] = true | ||
this[kOnDestroyed].push(callback) | ||
const onDestroyed = () => { | ||
const callbacks = this[kOnDestroyed] | ||
this[kOnDestroyed] = null | ||
for (const callback of callbacks) { | ||
callback(null, null) | ||
} | ||
} | ||
if (!this[kSocket]) { | ||
process.nextTick(onDestroyed) | ||
} else { | ||
util.destroy(this[kSocket].on('close', onDestroyed), err) | ||
} | ||
resume(this) | ||
} | ||
request (opts, callback) { | ||
return makeRequest(this, opts, callback) | ||
} | ||
stream (opts, factory, callback) { | ||
return makeStream(this, opts, factory, callback) | ||
} | ||
pipeline (opts, handler) { | ||
if (!opts || typeof opts !== 'object') { | ||
return new PassThrough().destroy(new InvalidArgumentError('invalid opts')) | ||
return makePipeline(this, opts, handler) | ||
} | ||
} | ||
class Parser extends HTTPParser { | ||
constructor (client, socket) { | ||
/* istanbul ignore next */ | ||
if (nodeMajorVersion >= 12) { | ||
super() | ||
this.initialize( | ||
HTTPParser.RESPONSE, | ||
{}, | ||
client[kMaxHeadersSize], | ||
insecureHTTPParser, | ||
client[kHeadersTimeout] | ||
) | ||
} else { | ||
super(HTTPParser.RESPONSE, false) | ||
} | ||
if (typeof handler !== 'function') { | ||
return new PassThrough().destroy(new InvalidArgumentError('invalid handler')) | ||
this.client = client | ||
this.socket = socket | ||
this.resumeSocket = () => socket.resume() | ||
this.statusCode = null | ||
this.headers = null | ||
this.read = 0 | ||
} | ||
[HTTPParser.kOnTimeout] () { | ||
util.destroy(this.socket, new HeadersTimeoutError()) | ||
} | ||
[HTTPParser.kOnHeaders] (rawHeaders) { | ||
this.headers = util.parseHeaders(rawHeaders, this.headers) | ||
} | ||
[HTTPParser.kOnExecute] (ret) { | ||
if (ret instanceof Error) { | ||
const err = ret | ||
if (typeof err.reason === 'string') { | ||
err.message = `Parse Error: ${err.reason}` | ||
} | ||
util.destroy(this.socket, err) | ||
} else { | ||
// When the underlying `net.Socket` instance is consumed - no | ||
// `data` events are emitted, and thus `socket.setTimeout` fires the | ||
// callback even if the data is constantly flowing into the socket. | ||
// See, https://github.com/nodejs/node/commit/ec2822adaad76b126b5cccdeaa1addf2376c9aa6 | ||
this.socket._unrefTimer() | ||
} | ||
} | ||
const req = new Readable({ | ||
autoDestroy: true, | ||
read () { | ||
if (this[kResume]) { | ||
const resume = this[kResume] | ||
this[kResume] = null | ||
resume() | ||
[HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method, | ||
url, statusCode, statusMessage, upgrade, shouldKeepAlive) { | ||
const { client, socket, resumeSocket, headers } = this | ||
const request = client[kQueue][client[kRunningIdx]] | ||
// TODO: Check for content-length mismatch? | ||
assert(this.statusCode < 200) | ||
this.headers = null | ||
this.statusCode = statusCode | ||
if (!shouldKeepAlive) { | ||
client[kReset] = true | ||
} | ||
if (upgrade) { | ||
util.destroy(socket, new NotSupportedError('upgrade not supported')) | ||
return true | ||
} | ||
// TODO: More statusCode validation? | ||
if (statusCode < 200) { | ||
request.onInfo(statusCode, util.parseHeaders(rawHeaders, headers)) | ||
} else { | ||
request.onHeaders(statusCode, util.parseHeaders(rawHeaders, headers), resumeSocket) | ||
} | ||
return request.method === 'HEAD' || statusCode < 200 | ||
} | ||
[HTTPParser.kOnBody] (chunk, offset, length) { | ||
const { client, socket, statusCode } = this | ||
if (statusCode < 200) { | ||
util.destroy(socket, new SocketError('unexpected request body')) | ||
return | ||
} | ||
const request = client[kQueue][client[kRunningIdx]] | ||
this.read += length | ||
const ret = request.onBody(chunk, offset, length) | ||
if (ret == null && this.read > client[kMaxAbortedPayload]) { | ||
util.destroy(socket, new InformationalError('max aborted payload')) | ||
} else if (ret === false) { | ||
socket.pause() | ||
} | ||
} | ||
[HTTPParser.kOnMessageComplete] () { | ||
const { client, socket, statusCode, headers, resumeSocket } = this | ||
const request = client[kQueue][client[kRunningIdx]] | ||
assert(statusCode >= 100) | ||
if (statusCode < 200) { | ||
return | ||
} | ||
this.statusCode = null | ||
this.headers = null | ||
request.onComplete(headers) | ||
this.read = 0 | ||
client[kQueue][client[kRunningIdx]++] = null | ||
if (client[kReset]) { | ||
// https://tools.ietf.org/html/rfc7231#section-4.3.1 | ||
// https://tools.ietf.org/html/rfc7231#section-4.3.2 | ||
// https://tools.ietf.org/html/rfc7231#section-4.3.5 | ||
// Sending a payload body on a request that does not | ||
// expect it can cause undefined behavior on some | ||
// servers and corrupt connection state. Do not | ||
// re-use the connection for further requests. | ||
util.destroy(socket, new InformationalError('request reset')) | ||
} else { | ||
resume(client) | ||
} | ||
resumeSocket() | ||
} | ||
destroy (err) { | ||
const { client, socket } = this | ||
assert(err) | ||
assert(socket.destroyed) | ||
this.unconsume() | ||
// Make sure the parser's stack has unwound before deleting the | ||
// corresponding C++ object through .close(). | ||
setImmediate(() => this.close()) | ||
if (!client.running) { | ||
return | ||
} | ||
// Retry all idempotent requests except for the one | ||
// at the head of the pipeline. | ||
client[kQueue][client[kRunningIdx]++].onError(err) | ||
const retryRequests = [] | ||
for (const request of client[kQueue].slice(client[kRunningIdx], client[kPendingIdx])) { | ||
const { idempotent, body } = request | ||
assert(idempotent && !util.isStream(body)) | ||
retryRequests.push(request) | ||
} | ||
client[kQueue].splice(0, client[kPendingIdx], ...retryRequests) | ||
client[kPendingIdx] = 0 | ||
client[kRunningIdx] = 0 | ||
} | ||
} | ||
function connect (client) { | ||
assert(!client[kSocket]) | ||
assert(!client[kRetryTimeout]) | ||
const { protocol, port, hostname } = client[kUrl] | ||
const servername = client[kServerName] || (client[kTLSOpts] && client[kTLSOpts].servername) | ||
let socket | ||
if (protocol === 'https:') { | ||
const tlsOpts = { ...client[kTLSOpts], servername } | ||
socket = client[kSocketPath] | ||
? tls.connect(client[kSocketPath], tlsOpts) | ||
: tls.connect(port || /* istanbul ignore next */ 443, hostname, tlsOpts) | ||
} else { | ||
socket = client[kSocketPath] | ||
? net.connect(client[kSocketPath]) | ||
: net.connect(port || /* istanbul ignore next */ 80, hostname) | ||
} | ||
client[kSocket] = socket | ||
const parser = new Parser(client, socket) | ||
/* istanbul ignore next */ | ||
if (nodeMajorVersion >= 12) { | ||
assert(socket._handle) | ||
parser.consume(socket._handle) | ||
} else { | ||
assert(socket._handle && socket._handle._externalStream) | ||
parser.consume(socket._handle._externalStream) | ||
} | ||
socket[kError] = null | ||
socket.setTimeout(client[kSocketTimeout], function () { | ||
util.destroy(this, new SocketTimeoutError()) | ||
}) | ||
socket | ||
.setNoDelay(true) | ||
.on(protocol === 'https:' ? 'secureConnect' : 'connect', function () { | ||
client[kReset] = false | ||
client[kRetryDelay] = 0 | ||
client.emit('connect') | ||
resume(client) | ||
}) | ||
.on('data', function () { | ||
/* istanbul ignore next */ | ||
assert(false) | ||
}) | ||
.on('error', function (err) { | ||
this[kError] = err | ||
if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') { | ||
assert(!client.running) | ||
while (client.pending && client[kQueue][client[kPendingIdx]].servername === servername) { | ||
const request = client[kQueue][client[kPendingIdx]++] | ||
request.onError(err) | ||
} | ||
}, | ||
destroy (err, callback) { | ||
if (err) { | ||
if (this[kResume]) { | ||
const resume = this[kResume] | ||
this[kResume] = null | ||
resume(err) | ||
} else { | ||
// Stop ret from scheduling more writes. | ||
util.destroy(ret, err) | ||
} | ||
} else { | ||
if (!this._readableState.endEmitted) { | ||
// This can happen if the server doesn't care | ||
// about the entire request body. | ||
// TODO: Is this fine to ignore? | ||
} | ||
} else if ( | ||
!client.running && | ||
err.code !== 'ECONNRESET' && | ||
err.code !== 'ECONNREFUSED' && | ||
err.code !== 'EHOSTUNREACH' && | ||
err.code !== 'EHOSTDOWN' && | ||
err.code !== 'UND_ERR_SOCKET' && | ||
err.code !== 'UND_ERR_INFO' | ||
) { | ||
assert(client[kPendingIdx] === client[kRunningIdx]) | ||
// Error is not caused by running request and not a recoverable | ||
// socket error. | ||
for (const request of client[kQueue].splice(client[kRunningIdx])) { | ||
request.onError(err) | ||
} | ||
callback(err) | ||
} | ||
}) | ||
let res | ||
let body | ||
.on('end', function () { | ||
util.destroy(this, new SocketError('other side closed')) | ||
}) | ||
.on('close', function () { | ||
if (!this[kError]) { | ||
this[kError] = new SocketError('closed') | ||
} | ||
const ret = new Duplex({ | ||
readableObjectMode: opts.objectMode, | ||
autoDestroy: true, | ||
read () { | ||
if (body && body.resume) { | ||
body.resume() | ||
} | ||
}, | ||
write (chunk, encoding, callback) { | ||
assert(!req.destroyed) | ||
if (req.push(chunk, encoding)) { | ||
callback() | ||
client[kSocket] = null | ||
parser.destroy(this[kError]) | ||
if (client.destroyed) { | ||
resume(client) | ||
return | ||
} | ||
if (client.pending > 0) { | ||
if (client[kRetryDelay]) { | ||
client[kRetryTimeout] = setTimeout(() => { | ||
client[kRetryTimeout] = null | ||
connect(client) | ||
}, client[kRetryDelay]) | ||
client[kRetryDelay] = Math.min(client[kRetryDelay] * 2, client[kSocketTimeout]) | ||
} else { | ||
req[kResume] = callback | ||
connect(client) | ||
client[kRetryDelay] = 1e3 | ||
} | ||
}, | ||
destroy (err, callback) { | ||
if (!err && !this._readableState.endEmitted) { | ||
err = new RequestAbortedError() | ||
} | ||
util.destroy(req, err) | ||
util.destroy(res, err) | ||
callback(err) | ||
} | ||
}).on('prefinish', () => { | ||
// Node < 15 does not call _final in same tick. | ||
req.push(null) | ||
this[kResume]() | ||
client.emit('disconnect', this[kError]) | ||
resume(client) | ||
}) | ||
} | ||
// TODO: Avoid copy. | ||
opts = { ...opts, body: req } | ||
function resume (client) { | ||
client[kResuming] = false | ||
while (true) { | ||
if (client[kDestroyed]) { | ||
const err = new ClientDestroyedError() | ||
for (const request of client[kQueue].splice(client[kPendingIdx])) { | ||
request.onError(err) | ||
} | ||
return | ||
} | ||
const request = this[kEnqueue](opts, function (err, data) { | ||
if (err) { | ||
util.destroy(ret, err) | ||
return | ||
if (client.size === 0) { | ||
if (client[kClosed]) { | ||
client.destroy(util.nop) | ||
} | ||
if (client[kRunningIdx] > 0) { | ||
client[kQueue].length = 0 | ||
client[kPendingIdx] = 0 | ||
client[kRunningIdx] = 0 | ||
} | ||
return | ||
} | ||
const { | ||
statusCode, | ||
headers, | ||
opaque, | ||
resume | ||
} = data | ||
if (client[kRunningIdx] > 256) { | ||
client[kQueue].splice(0, client[kRunningIdx]) | ||
client[kPendingIdx] -= client[kRunningIdx] | ||
client[kRunningIdx] = 0 | ||
} | ||
res = new Readable({ | ||
autoDestroy: true, | ||
read: resume, | ||
destroy (err, callback) { | ||
if (!err && !this._readableState.endEmitted) { | ||
err = new RequestAbortedError() | ||
} | ||
if (err) { | ||
util.destroy(ret, err) | ||
resume() | ||
} | ||
callback(err, null) | ||
} | ||
}) | ||
res.destroy = this.wrap(res, res.destroy) | ||
if (client.running >= client[kPipelining]) { | ||
return | ||
} | ||
try { | ||
body = handler({ | ||
statusCode, | ||
headers, | ||
opaque, | ||
body: res | ||
}) | ||
} catch (err) { | ||
res.on('error', util.nop) | ||
if (!ret.destroyed) { | ||
ret.destroy(err) | ||
} | ||
if (!client.pending) { | ||
return | ||
} | ||
const request = client[kQueue][client[kPendingIdx]] | ||
if (request.aborted) { | ||
// Request was aborted. | ||
// TODO: Avoid splice one by one. | ||
client[kQueue].splice(client[kPendingIdx], 1) | ||
continue | ||
} | ||
if (client[kServerName] !== request.servername) { | ||
if (client.running) { | ||
return | ||
} | ||
// TODO: Should we allow !body? | ||
if (!body || typeof body.on !== 'function') { | ||
util.destroy(ret, new InvalidReturnValueError('expected Readable')) | ||
client[kServerName] = request.servername | ||
if (client[kSocket]) { | ||
util.destroy(client[kSocket], new InformationalError('servername changed')) | ||
return | ||
} | ||
} | ||
// TODO: If body === res then avoid intermediate | ||
// and write directly to ret.push? Or should this | ||
// happen when body is null? | ||
if (!client[kSocket] && !client[kRetryTimeout]) { | ||
connect(client) | ||
return | ||
} | ||
// TODO: body.destroy? | ||
if (!client.connected) { | ||
return | ||
} | ||
let ended = false | ||
body | ||
.on('data', function (chunk) { | ||
if (!ret.push(chunk) && this.pause) { | ||
this.pause() | ||
} | ||
if (client[kReset]) { | ||
return | ||
} | ||
if (client[kWriting]) { | ||
return | ||
} | ||
if (client.running && !request.idempotent) { | ||
// Non-idempotent request cannot be retried. | ||
// Ensure that no other requests are inflight and | ||
// could cause failure. | ||
return | ||
} | ||
if (util.isStream(request.body) && util.bodyLength(request.body) === 0) { | ||
request.body | ||
.on('data', function () { | ||
assert(false) | ||
}) | ||
.on('error', function (err) { | ||
util.destroy(ret, err) | ||
util.destroy(this) | ||
request.onError(err) | ||
}) | ||
.on('end', function () { | ||
ended = true | ||
ret.push(null) | ||
util.destroy(this) | ||
}) | ||
.on('close', function () { | ||
if (!ended) { | ||
util.destroy(ret, new RequestAbortedError()) | ||
} | ||
}) | ||
request.body = null | ||
} | ||
return this.wrap(res, function (err, chunk) { | ||
if (this.destroyed) { | ||
return null | ||
} else if (err) { | ||
this.destroy(err) | ||
} else { | ||
const ret = this.push(chunk) | ||
return this.destroyed ? null : ret | ||
} | ||
}) | ||
}) | ||
if (client.running && util.isStream(request.body)) { | ||
// Request with stream body can error while other requests | ||
// are inflight and indirectly error those as well. | ||
// Ensure this doesn't happen by waiting for inflight | ||
// to complete before dispatching. | ||
ret.destroy = request.wrap(ret, ret.destroy) | ||
// Request with stream body cannot be retried. | ||
// Ensure that no other requests are inflight and | ||
// could cause failure. | ||
return | ||
} | ||
return ret | ||
try { | ||
write(client, request) | ||
client[kPendingIdx]++ | ||
} catch (err) { | ||
request.onError(err) | ||
} | ||
} | ||
} | ||
stream (opts, factory, callback) { | ||
if (callback === undefined) { | ||
return new Promise((resolve, reject) => { | ||
this.stream(opts, factory, (err, data) => { | ||
return err ? reject(err) : resolve(data) | ||
}) | ||
}) | ||
} | ||
function write (client, request) { | ||
const { method } = request | ||
if (typeof callback !== 'function') { | ||
throw new InvalidArgumentError('invalid callback') | ||
} | ||
let contentLength = util.bodyLength(request.body, true) | ||
if (!opts || typeof opts !== 'object') { | ||
process.nextTick(callback, new InvalidArgumentError('invalid opts'), null) | ||
return | ||
} | ||
if (contentLength === undefined) { | ||
contentLength = request.contentLength | ||
} | ||
if (typeof factory !== 'function') { | ||
process.nextTick(callback, new InvalidArgumentError('invalid factory'), null) | ||
return | ||
// TODO: What other methods expect a payload? | ||
const expectsPayload = ( | ||
method === 'PUT' || | ||
method === 'POST' || | ||
method === 'PATCH' | ||
) | ||
if (contentLength === 0 && !expectsPayload) { | ||
// https://tools.ietf.org/html/rfc7230#section-3.3.2 | ||
// A user agent SHOULD NOT send a Content-Length header field when | ||
// the request message does not contain a payload body and the method | ||
// semantics do not anticipate such a body. | ||
contentLength = undefined | ||
} | ||
if (request.contentLength !== undefined && request.contentLength !== contentLength) { | ||
throw new ContentLengthMismatchError() | ||
} | ||
const { body, header } = request | ||
const socket = client[kSocket] | ||
socket.cork() | ||
socket.write(header) | ||
if (contentLength !== undefined) { | ||
socket.write(`content-length: ${contentLength}\r\n`, 'ascii') | ||
} | ||
if (method === 'HEAD') { | ||
// https://github.com/mcollina/undici/issues/258 | ||
// Close after a HEAD request to interop with misbehaving servers | ||
// that may send a body in the response. | ||
client[kReset] = true | ||
} | ||
// TODO: An HTTP/1.1 user agent MUST NOT preface | ||
// or follow a request with an extra CRLF. | ||
// https://tools.ietf.org/html/rfc7230#section-3.5 | ||
if (!body) { | ||
socket.write(CRLF) | ||
if (contentLength === 0) { | ||
socket.write(CRLF) | ||
} else { | ||
assert(contentLength === undefined, 'no body must not have content length') | ||
} | ||
} else if (body instanceof Uint8Array) { | ||
assert(contentLength !== undefined, 'buffer body must have content length') | ||
this[kEnqueue](opts, function (err, data) { | ||
if (err) { | ||
process.nextTick(callback, err, null) | ||
socket.write(CRLF) | ||
socket.write(body) | ||
socket.write(CRLF) | ||
request.body = null | ||
client[kReset] = !expectsPayload | ||
} else if (util.isStream(body)) { | ||
assert(contentLength !== 0 || !client.running, 'stream body cannot be pipelined') | ||
let finished = false | ||
let bytesWritten = 0 | ||
const onData = function (chunk) { | ||
assert(!finished) | ||
const len = Buffer.byteLength(chunk) | ||
if (!len) { | ||
return | ||
} | ||
const { | ||
statusCode, | ||
headers, | ||
opaque, | ||
resume | ||
} = data | ||
let body | ||
try { | ||
body = factory({ | ||
statusCode, | ||
headers, | ||
opaque | ||
}) | ||
} catch (err) { | ||
callback(err, null) | ||
// TODO: What if not ended and bytesWritten === contentLength? | ||
// We should defer writing chunks. | ||
if (contentLength !== undefined && bytesWritten + len > contentLength) { | ||
util.destroy(this, new ContentLengthMismatchError()) | ||
return | ||
} | ||
if (!body) { | ||
callback(null, null) | ||
if (bytesWritten === 0) { | ||
socket.write(contentLength === undefined ? 'transfer-encoding: chunked\r\n' : '\r\n', 'ascii') | ||
client[kReset] = !expectsPayload | ||
} | ||
if (contentLength === undefined) { | ||
socket.write(`\r\n${len.toString(16)}\r\n`, 'ascii') | ||
} | ||
bytesWritten += len | ||
if (!socket.write(chunk) && this.pause) { | ||
this.pause() | ||
} | ||
} | ||
const onDrain = function () { | ||
assert(!finished) | ||
if (body.resume) { | ||
body.resume() | ||
} | ||
} | ||
const onAbort = function () { | ||
onFinished(new RequestAbortedError()) | ||
} | ||
const onFinished = function (err) { | ||
if (finished) { | ||
return | ||
} | ||
if ( | ||
typeof body.write !== 'function' || | ||
typeof body.end !== 'function' || | ||
typeof body.on !== 'function' | ||
) { | ||
callback(new InvalidReturnValueError('expected Writable'), null) | ||
finished = true | ||
assert(client[kWriting] && client.running <= 1) | ||
client[kWriting] = false | ||
if (!err) { | ||
err = socket[kError] | ||
} | ||
if (!err && contentLength !== undefined && bytesWritten !== contentLength) { | ||
err = new ContentLengthMismatchError() | ||
} | ||
socket | ||
.removeListener('drain', onDrain) | ||
.removeListener('error', onFinished) | ||
.removeListener('close', onFinished) | ||
body | ||
.removeListener('data', onData) | ||
.removeListener('end', onFinished) | ||
.removeListener('error', onFinished) | ||
.removeListener('close', onAbort) | ||
request.body = null | ||
util.destroy(body, err) | ||
if (err) { | ||
util.destroy(socket, err) | ||
return | ||
} | ||
body.on('drain', resume) | ||
// TODO: Avoid finished. It registers an unecessary amount of listeners. | ||
finished(body, { readable: false }, (err) => { | ||
body.removeListener('drain', resume) | ||
if (err) { | ||
util.destroy(body, err) | ||
resume() | ||
} else { | ||
// TODO: destroy if body is not Readable? | ||
if (bytesWritten === 0) { | ||
if (contentLength === undefined && expectsPayload) { | ||
// https://tools.ietf.org/html/rfc7230#section-3.3.2 | ||
// A user agent SHOULD send a Content-Length in a request message when | ||
// no Transfer-Encoding is sent and the request method defines a meaning | ||
// for an enclosed payload body. | ||
socket.write('content-length: 0\r\n\r\n', 'ascii') | ||
} | ||
callback(err, null) | ||
}) | ||
if (typeof body.destroy === 'function') { | ||
body.destroy = this.wrap(body, body.destroy) | ||
} else if (contentLength === undefined) { | ||
socket.write('\r\n0\r\n', 'ascii') | ||
} | ||
return this.wrap(body, function (err, chunk) { | ||
if (util.isDestroyed(this)) { | ||
return null | ||
} else if (err) { | ||
util.destroy(this, err) | ||
} else if (chunk == null) { | ||
this.end() | ||
} else { | ||
const ret = this.write(chunk) | ||
return util.isDestroyed(this) ? null : ret | ||
} | ||
}) | ||
}) | ||
socket.write(CRLF) | ||
resume(client) | ||
} | ||
body | ||
.on('data', onData) | ||
.on('end', onFinished) | ||
.on('error', onFinished) | ||
.on('close', onAbort) | ||
socket | ||
.on('drain', onDrain) | ||
.on('error', onFinished) | ||
.on('close', onFinished) | ||
client[kWriting] = true | ||
} else { | ||
/* istanbul ignore next */ | ||
assert(false) | ||
} | ||
socket.uncork() | ||
} | ||
module.exports = Client |
@@ -8,3 +8,4 @@ 'use strict' | ||
const { | ||
kClients | ||
kClients, | ||
kGetNext | ||
} = require('./symbols') | ||
@@ -36,2 +37,7 @@ | ||
/* istanbul ignore next: use by benchmark */ | ||
[kGetNext] () { | ||
return getNext(this) | ||
} | ||
stream (opts, factory, cb) { | ||
@@ -38,0 +44,0 @@ // needed because we need the return value from client.stream |
@@ -12,3 +12,9 @@ 'use strict' | ||
const util = require('./util') | ||
const { kRequestTimeout, kUrl } = require('./symbols') | ||
const kAbort = Symbol('abort') | ||
const kTimeout = Symbol('timeout') | ||
const kResume = Symbol('resume') | ||
const kSignal = Symbol('signal') | ||
class Request extends AsyncResource { | ||
@@ -25,8 +31,8 @@ constructor ({ | ||
requestTimeout | ||
}, hostname, callback) { | ||
}, { | ||
[kRequestTimeout]: defaultRequestTimeout, | ||
[kUrl]: { hostname: defaultHostname } | ||
}) { | ||
super('UNDICI_REQ') | ||
assert(typeof hostname === 'string') | ||
assert(typeof callback === 'function') | ||
if (typeof path !== 'string' || path[0] !== '/') { | ||
@@ -40,2 +46,10 @@ throw new InvalidArgumentError('path must be a valid path') | ||
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { | ||
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') | ||
} | ||
requestTimeout = requestTimeout == null && defaultRequestTimeout | ||
? defaultRequestTimeout | ||
: requestTimeout | ||
if (requestTimeout != null && (!Number.isInteger(requestTimeout) || requestTimeout < 0)) { | ||
@@ -45,6 +59,2 @@ throw new InvalidArgumentError('requestTimeout must be a positive integer or zero') | ||
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { | ||
throw new InvalidArgumentError('signal must implement .on(name, callback)') | ||
} | ||
this.method = method | ||
@@ -54,8 +64,4 @@ | ||
this.body = null | ||
} else if (typeof body.on === 'function') { | ||
this.body = body.on('error', (err) => { | ||
// TODO: Ignore error if body has ended? | ||
this.onError(err) | ||
}) | ||
assert(this.body === body) | ||
} else if (util.isStream(body)) { | ||
this.body = body | ||
} else if (body instanceof Uint8Array) { | ||
@@ -72,11 +78,9 @@ this.body = body.length ? body : null | ||
this.servername = servername || hostHeader || hostname | ||
if (net.isIP(this.servername) || this.servername.startsWith('[')) { | ||
this.servername = servername || hostHeader || null | ||
if (net.isIP(this.servername) || /^\[/.test(this.servername)) { | ||
this.servername = null | ||
} | ||
this.callback = callback | ||
this.aborted = false | ||
this.finished = false | ||
this.opaque = opaque | ||
@@ -96,2 +100,6 @@ | ||
if (!hostHeader) { | ||
header += `host: ${defaultHostname}\r\n` | ||
} | ||
if (headers) { | ||
@@ -106,3 +114,3 @@ for (const [key, val] of Object.entries(headers)) { | ||
if ( | ||
this.contentLength == null && | ||
this.contentLength === undefined && | ||
key.length === 14 && | ||
@@ -126,11 +134,16 @@ key.toLowerCase() === 'content-length' | ||
if (!hostHeader) { | ||
header += `host: ${hostname}\r\n` | ||
} | ||
this.header = Buffer.from(header, 'ascii') | ||
} | ||
if (util.isStream(this.body)) { | ||
// TODO: Cleanup listeners? | ||
this.body.on('error', (err) => { | ||
// TODO: Ignore error if body has ended? | ||
this.onError(err) | ||
}) | ||
} | ||
if (signal) { | ||
const onAbort = () => { | ||
this[kSignal] = signal | ||
this[kAbort] = () => { | ||
this.onError(new RequestAbortedError()) | ||
@@ -140,9 +153,12 @@ } | ||
if ('addEventListener' in signal) { | ||
signal.addEventListener('abort', onAbort) | ||
signal.addEventListener('abort', this[kAbort]) | ||
} else { | ||
signal.once('abort', onAbort) | ||
signal.addListener('abort', this[kAbort]) | ||
} | ||
} else { | ||
this[kSignal] = null | ||
this[kAbort] = null | ||
} | ||
this.timeout = requestTimeout | ||
this[kTimeout] = requestTimeout | ||
? setTimeout((self) => { | ||
@@ -154,69 +170,98 @@ self.onError(new RequestTimeoutError()) | ||
wrap (that, cb) { | ||
return this.runInAsyncScope.bind(this, cb, that) | ||
onInfo (statusCode, headers) { | ||
if (this.aborted) { | ||
return | ||
} | ||
if (this._onInfo) { | ||
this.runInAsyncScope(this._onInfo, this, statusCode, headers) | ||
} | ||
} | ||
onHeaders (statusCode, headers, resume) { | ||
if (statusCode < 200) { | ||
// TODO: Informational response. | ||
if (this.aborted) { | ||
return | ||
} | ||
if (this.finished) { | ||
return | ||
const { [kTimeout]: timeout } = this | ||
if (timeout) { | ||
this[kTimeout] = null | ||
clearTimeout(timeout) | ||
} | ||
this.finished = true | ||
clearTimeout(this.timeout) | ||
this.timeout = null | ||
this[kResume] = resume | ||
this.res = this.runInAsyncScope(this.callback, this, null, { | ||
statusCode, | ||
headers, | ||
opaque: this.opaque, | ||
resume | ||
}) | ||
assert(!this.res || typeof this.res === 'function') | ||
this.runInAsyncScope(this._onHeaders, this, statusCode, headers, resume) | ||
} | ||
onBody (chunk, offset, length) { | ||
if (this.res) { | ||
return this.res(null, chunk.slice(offset, offset + length)) | ||
if (this.aborted) { | ||
return null | ||
} | ||
return this.runInAsyncScope(this._onBody, this, chunk, offset, length) | ||
} | ||
onComplete (trailers) { | ||
// TODO: Trailers? | ||
if (this.res) { | ||
const res = this.res | ||
this.res = null | ||
res(null, null) | ||
if (this.aborted) { | ||
return | ||
} | ||
} | ||
onError (err, sync) { | ||
if (util.isStream(this.body)) { | ||
// TODO: If this.body.destroy doesn't exists or doesn't emit 'error' or | ||
// 'close', it can halt execution in client. | ||
const body = this.body | ||
const { [kSignal]: signal, body } = this | ||
if (body) { | ||
this.body = null | ||
util.destroy(body, err) | ||
util.destroy(body) | ||
} | ||
if (this.res) { | ||
const res = this.res | ||
this.res = null | ||
res(err, null) | ||
if (signal) { | ||
this[kSignal] = null | ||
if ('removeEventListener' in signal) { | ||
signal.removeEventListener('abort', this[kAbort]) | ||
} else { | ||
signal.removeListener('abort', this[kAbort]) | ||
} | ||
} | ||
if (this.finished) { | ||
this.runInAsyncScope(this._onComplete, this, trailers) | ||
} | ||
onError (err) { | ||
if (this.aborted) { | ||
return | ||
} | ||
this.finished = true | ||
this.aborted = true | ||
clearTimeout(this.timeout) | ||
this.timeout = null | ||
const { | ||
body, | ||
[kTimeout]: timeout, | ||
[kSignal]: signal, | ||
[kResume]: resume | ||
} = this | ||
this.runInAsyncScope(this.callback, this, err, null) | ||
if (resume) { | ||
resume() | ||
} | ||
if (timeout) { | ||
this[kTimeout] = null | ||
clearTimeout(timeout) | ||
} | ||
if (body) { | ||
this.body = null | ||
util.destroy(body, err) | ||
} | ||
if (signal) { | ||
this[kSignal] = null | ||
if ('removeEventListener' in signal) { | ||
signal.removeEventListener('abort', this[kAbort]) | ||
} else { | ||
signal.removeListener('abort', this[kAbort]) | ||
} | ||
} | ||
this.runInAsyncScope(this._onError, this, err) | ||
} | ||
@@ -223,0 +268,0 @@ } |
module.exports = { | ||
kUrl: Symbol('url'), | ||
kWriting: Symbol('writing'), | ||
kResuming: Symbol('resuming'), | ||
kQueue: Symbol('queue'), | ||
kConnect: Symbol('connect'), | ||
kSocketTimeout: Symbol('socket timeout'), | ||
@@ -11,2 +13,3 @@ kRequestTimeout: Symbol('request timeout'), | ||
kReset: Symbol('reset'), | ||
kGetNext: Symbol('getNext'), | ||
kDestroyed: Symbol('destroyed'), | ||
@@ -18,3 +21,2 @@ kMaxHeadersSize: Symbol('maxHeaderSize'), | ||
kResume: Symbol('resume'), | ||
kEnd: Symbol('end'), | ||
kError: Symbol('error'), | ||
@@ -21,0 +23,0 @@ kOnDestroyed: Symbol('destroy callbacks'), |
@@ -18,3 +18,5 @@ 'use strict' | ||
const state = body._readableState | ||
return state && state.ended ? state.length : undefined | ||
return state && state.ended === true && Number.isFinite(state.length) | ||
? state.length | ||
: undefined | ||
} | ||
@@ -32,12 +34,6 @@ | ||
function destroy (stream, err) { | ||
if (!stream) { | ||
return stream | ||
if (!isStream(stream) || isDestroyed(stream)) { | ||
return | ||
} | ||
assert(typeof stream.on === 'function') | ||
if (isDestroyed(stream)) { | ||
return stream | ||
} | ||
if (typeof stream.destroy === 'function') { | ||
@@ -54,4 +50,2 @@ stream.destroy(err) | ||
} | ||
return stream | ||
} | ||
@@ -58,0 +52,0 @@ |
{ | ||
"name": "undici", | ||
"version": "1.2.2", | ||
"version": "1.2.3", | ||
"description": "An HTTP/1.1 client, written from scratch for Node.js", | ||
@@ -9,3 +9,4 @@ "main": "index.js", | ||
"test": "tap test/*.js --no-coverage", | ||
"coverage": "standard | snazzy && tap test/*.js" | ||
"coverage": "standard | snazzy && tap test/*.js", | ||
"bench": "npx concurrently -k -s first \"node benchmarks/server.js\" \"node -e 'setTimeout(() => {}, 1000)' && node benchmarks\"" | ||
}, | ||
@@ -33,2 +34,3 @@ "repository": { | ||
"benchmark": "^2.1.4", | ||
"concurrently": "^5.2.0", | ||
"https-pem": "^2.0.0", | ||
@@ -35,0 +37,0 @@ "pre-commit": "^1.2.2", |
@@ -26,6 +26,6 @@ # undici | ||
``` | ||
http - keepalive - pipe x 5,521 ops/sec ±3.37% (73 runs sampled) | ||
undici - pipeline - pipe x 9,292 ops/sec ±4.28% (79 runs sampled) | ||
undici - request - pipe x 11,949 ops/sec ±0.99% (85 runs sampled) | ||
undici - stream - pipe x 12,223 ops/sec ±0.76% (85 runs sampled) | ||
http - keepalive x 5,521 ops/sec ±3.37% (73 runs sampled) | ||
undici - pipeline x 9,292 ops/sec ±4.28% (79 runs sampled) | ||
undici - request x 11,949 ops/sec ±0.99% (85 runs sampled) | ||
undici - stream x 12,223 ops/sec ±0.76% (85 runs sampled) | ||
``` | ||
@@ -32,0 +32,0 @@ |
@@ -203,1 +203,27 @@ 'use strict' | ||
writeBodyStartedWithBody(new Uint8Array([42]), 'Uint8Array') | ||
test('cleanup listener', (t) => { | ||
t.plan(4) | ||
const abortController = new AbortController() | ||
abortController.signal.addEventListener = (name) => t.strictEqual(name, 'abort') | ||
abortController.signal.removeEventListener = (name) => t.strictEqual(name, 'abort') | ||
const server = createServer((req, res) => { | ||
res.writeHead(200, { 'content-type': 'text/plain' }) | ||
res.write('hello') | ||
res.end('world') | ||
}) | ||
t.teardown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.teardown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { | ||
t.error(err) | ||
response.body.on('end', () => { | ||
t.pass() | ||
}).resume() | ||
}) | ||
}) | ||
}) |
@@ -214,1 +214,27 @@ 'use strict' | ||
writeBodyStartedWithBody(new Uint8Array([42]), 'Uint8Array') | ||
test('cleanup listener', (t) => { | ||
t.plan(4) | ||
const ee = new EventEmitter() | ||
ee.addListener = (name) => t.strictEqual(name, 'abort') | ||
ee.removeListener = (name) => t.strictEqual(name, 'abort') | ||
const server = createServer((req, res) => { | ||
res.writeHead(200, { 'content-type': 'text/plain' }) | ||
res.write('hello') | ||
res.end('world') | ||
}) | ||
t.teardown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.teardown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => { | ||
t.error(err) | ||
response.body.on('end', () => { | ||
t.pass() | ||
}).resume() | ||
}) | ||
}) | ||
}) |
@@ -9,6 +9,4 @@ 'use strict' | ||
const { | ||
kSocket, | ||
kEnqueue | ||
} = require('../lib/symbols') | ||
const { kSocket } = require('../lib/symbols') | ||
const { InvalidArgumentError } = require('../lib/errors') | ||
@@ -769,3 +767,3 @@ test('GET errors and reconnect with pipelining 1', (t) => { | ||
test('invalid opts', (t) => { | ||
t.plan(6) | ||
t.plan(2) | ||
@@ -779,16 +777,2 @@ const client = new Client('http://localhost:5000') | ||
}) | ||
client[kEnqueue](null, (err) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
client[kEnqueue]({ path: '/', method: 'GET', signal: 1 }, (err) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
client[kEnqueue]({ path: '/', method: 'GET', signal: {} }, (err) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
try { | ||
client[kEnqueue]({ path: '/', method: 'GET', signal: {} }, null) | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
} | ||
}) | ||
@@ -862,1 +846,19 @@ | ||
}) | ||
test('invalid signal', (t) => { | ||
t.plan(3) | ||
const client = new Client('http://localhost:3333') | ||
t.teardown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET', signal: {} }, (err) => { | ||
t.ok(err instanceof InvalidArgumentError) | ||
}) | ||
client.pipeline({ path: '/', method: 'GET', signal: {} }, () => {}) | ||
.on('error', (err) => { | ||
t.ok(err instanceof InvalidArgumentError) | ||
}) | ||
client.stream({ path: '/', method: 'GET', signal: {} }, () => {}, (err) => { | ||
t.ok(err instanceof InvalidArgumentError) | ||
}) | ||
}) |
@@ -817,1 +817,26 @@ 'use strict' | ||
}) | ||
test('pipeline invalid opts', (t) => { | ||
t.plan(2) | ||
const server = createServer((req, res) => { | ||
res.end(JSON.stringify({ asd: 1 })) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.close((err) => { | ||
t.error(err) | ||
}) | ||
client | ||
.pipeline({ path: '/', method: 'GET', objectMode: true }, ({ body }) => { | ||
t.fail() | ||
}) | ||
.on('error', (err) => { | ||
t.ok(err) | ||
}) | ||
}) | ||
}) |
@@ -66,59 +66,2 @@ 'use strict' | ||
test('20 times HEAD with pipelining 10', (t) => { | ||
const num = 20 | ||
t.plan(3 * num + 1) | ||
let count = 0 | ||
let countGreaterThanOne = false | ||
const server = createServer((req, res) => { | ||
count++ | ||
setTimeout(function () { | ||
countGreaterThanOne = countGreaterThanOne || count > 1 | ||
res.end(req.url) | ||
}, 10) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
// needed to check for a warning on the maxListeners on the socket | ||
process.on('warning', t.fail) | ||
t.tearDown(() => { | ||
process.removeListener('warning', t.fail) | ||
}) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 10 | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
for (let i = 0; i < num; i++) { | ||
makeRequest(i) | ||
} | ||
function makeRequest (i) { | ||
makeHeadRequestAndExpectUrl(client, i, t, () => { | ||
count-- | ||
if (i === num - 1) { | ||
t.ok(countGreaterThanOne, 'seen more than one parallel request') | ||
} | ||
}) | ||
return !client.busy | ||
} | ||
}) | ||
}) | ||
function makeHeadRequestAndExpectUrl (client, i, t, cb) { | ||
return client.request({ path: '/' + i, method: 'HEAD' }, (err, { statusCode, headers, body }) => { | ||
cb() | ||
t.error(err) | ||
t.strictEqual(statusCode, 200) | ||
body | ||
.resume() | ||
.on('end', () => { | ||
t.pass() | ||
}) | ||
}) | ||
} | ||
test('A client should enqueue as much as twice its pipelining factor', (t) => { | ||
@@ -125,0 +68,0 @@ const num = 10 |
@@ -7,2 +7,3 @@ 'use strict' | ||
const { PassThrough } = require('stream') | ||
const EE = require('events') | ||
@@ -190,3 +191,3 @@ test('stream get', (t) => { | ||
test('stream response resume back pressure and non standard error', (t) => { | ||
t.plan(5) | ||
t.plan(6) | ||
@@ -206,2 +207,3 @@ const server = createServer((req, res) => { | ||
const pt = new PassThrough() | ||
client.stream({ | ||
@@ -212,7 +214,6 @@ path: '/', | ||
}, () => { | ||
const pt = new PassThrough() | ||
pt.on('data', () => { | ||
pt.emit('error', new Error('kaboom')) | ||
}).once('error', (err) => { | ||
t.ok(err) | ||
t.strictEqual(err.message, 'kaboom') | ||
}) | ||
@@ -222,2 +223,3 @@ return pt | ||
t.ok(err) | ||
t.strictEqual(pt.destroyed, true) | ||
}) | ||
@@ -255,3 +257,3 @@ | ||
const pt = new PassThrough() | ||
const pt = new PassThrough({ autoDestroy: false }) | ||
client.stream({ | ||
@@ -305,4 +307,4 @@ path: '/', | ||
test('stream waits only for writable side', (t) => { | ||
t.plan(1) | ||
test('stream destroy if not readable', (t) => { | ||
t.plan(2) | ||
@@ -314,2 +316,4 @@ const server = createServer((req, res) => { | ||
const pt = new PassThrough() | ||
pt.readable = false | ||
server.listen(0, () => { | ||
@@ -323,5 +327,6 @@ const client = new Client(`http://localhost:${server.address().port}`) | ||
}, () => { | ||
throw new Error('kaboom') | ||
return pt | ||
}, (err) => { | ||
t.strictEqual(err.message, 'kaboom') | ||
t.error(err) | ||
t.strictEqual(pt.destroyed, true) | ||
}) | ||
@@ -402,1 +407,94 @@ }) | ||
}) | ||
test('stream factory abort', (t) => { | ||
t.plan(1) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
const signal = new EE() | ||
client.stream({ | ||
path: '/', | ||
method: 'GET', | ||
signal | ||
}, () => { | ||
signal.emit('abort') | ||
return new PassThrough() | ||
}, (err) => { | ||
t.ok(err instanceof errors.RequestAbortedError) | ||
}) | ||
}) | ||
}) | ||
test('stream factory throw', (t) => { | ||
t.plan(3) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.stream({ | ||
path: '/', | ||
method: 'GET' | ||
}, () => { | ||
throw new Error('asd') | ||
}, (err) => { | ||
t.strictEqual(err.message, 'asd') | ||
}) | ||
client.stream({ | ||
path: '/', | ||
method: 'GET' | ||
}, () => { | ||
throw new Error('asd') | ||
}, (err) => { | ||
t.strictEqual(err.message, 'asd') | ||
}) | ||
client.stream({ | ||
path: '/', | ||
method: 'GET' | ||
}, () => { | ||
return new PassThrough() | ||
}, (err) => { | ||
t.error(err) | ||
}) | ||
client.on('disconnect', () => { | ||
t.fail() | ||
}) | ||
}) | ||
}) | ||
test('stream CONNECT throw', (t) => { | ||
t.plan(1) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.stream({ | ||
path: '/', | ||
method: 'CONNECT' | ||
}, () => { | ||
}, (err) => { | ||
t.ok(err instanceof errors.NotSupportedError) | ||
}) | ||
client.on('disconnect', () => { | ||
t.fail() | ||
}) | ||
}) | ||
}) |
@@ -39,4 +39,5 @@ 'use strict' | ||
headers: reqHeaders | ||
}, (err, { statusCode, headers, body }) => { | ||
}, (err, data) => { | ||
t.error(err) | ||
const { statusCode, headers, body } = data | ||
t.strictEqual(statusCode, 200) | ||
@@ -43,0 +44,0 @@ t.strictEqual(headers['content-type'], 'text/plain') |
@@ -223,6 +223,6 @@ 'use strict' | ||
}) | ||
client.request({}, (err) => { | ||
client.request({ path: '/', method: 'GET' }, (err) => { | ||
t.ok(err instanceof errors.ClientClosedError) | ||
client.destroy() | ||
client.request({}, (err) => { | ||
client.request({ path: '/', method: 'GET' }, (err) => { | ||
t.ok(err instanceof errors.ClientDestroyedError) | ||
@@ -229,0 +229,0 @@ }) |
@@ -7,2 +7,3 @@ 'use strict' | ||
const { Readable } = require('stream') | ||
const { kConnect } = require('../lib/symbols') | ||
@@ -96,1 +97,49 @@ test('GET and HEAD with body should reset connection', (t) => { | ||
}) | ||
test('HEAD should reset connection', (t) => { | ||
t.plan(9) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.teardown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.teardown(client.destroy.bind(client)) | ||
client.on('disconnect', () => { | ||
t.pass() | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'HEAD' | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body.resume() | ||
}) | ||
t.strictEqual(client.busy, true) | ||
client.request({ | ||
path: '/', | ||
method: 'HEAD' | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body.resume() | ||
client.on('disconnect', () => { | ||
client[kConnect](() => { | ||
client.request({ | ||
path: '/', | ||
method: 'HEAD' | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body.resume() | ||
}) | ||
t.strictEqual(client.busy, true) | ||
}) | ||
}) | ||
}) | ||
t.strictEqual(client.busy, true) | ||
}) | ||
}) |
@@ -43,3 +43,6 @@ 'use strict' | ||
const server = net.createServer((socket) => { | ||
socket.write('HTTP/1.1 101 Switching Protocols\r\n\r\n') | ||
socket.write('HTTP/1.1 101 Switching Protocols\r\n') | ||
socket.write('Upgrade: TLS/1.0, HTTP/1.1\r\n') | ||
socket.write('Connection: Upgrade\r\n') | ||
socket.write('\r\n') | ||
}) | ||
@@ -53,3 +56,7 @@ t.teardown(server.close.bind(server)) | ||
path: '/', | ||
method: 'GET' | ||
method: 'GET', | ||
headers: { | ||
Connection: 'upgrade', | ||
Upgrade: 'example/1, foo/2' | ||
} | ||
}, (err) => { | ||
@@ -56,0 +63,0 @@ t.ok(err instanceof errors.NotSupportedError) |
@@ -6,5 +6,6 @@ 'use strict' | ||
const { createServer } = require('http') | ||
const { kConnect } = require('../lib/symbols') | ||
test('pipeline pipelining', (t) => { | ||
t.plan(7) | ||
t.plan(6) | ||
@@ -27,23 +28,74 @@ const server = createServer((req, res) => { | ||
client.pipeline({ | ||
method: 'GET', | ||
path: '/' | ||
}, ({ body }) => body) | ||
.end() | ||
.resume() | ||
.on('end', () => { | ||
t.strictEqual(client.running, 0) | ||
client.pipeline({ | ||
method: 'GET', | ||
path: '/' | ||
}, ({ body }) => body).end().resume() | ||
t.strictEqual(client.busy, false) | ||
client.pipeline({ | ||
method: 'GET', | ||
path: '/' | ||
}, ({ body }) => body).end().resume() | ||
t.strictEqual(client.busy, true) | ||
t.strictEqual(client.running, 2) | ||
client[kConnect](() => { | ||
t.strictEqual(client.running, 0) | ||
client.pipeline({ | ||
method: 'GET', | ||
path: '/' | ||
}, ({ body }) => body).end().resume() | ||
t.strictEqual(client.busy, false) | ||
client.pipeline({ | ||
method: 'GET', | ||
path: '/' | ||
}, ({ body }) => body).end().resume() | ||
t.strictEqual(client.busy, true) | ||
t.strictEqual(client.running, 2) | ||
}) | ||
}) | ||
}) | ||
test('pipeline pipelining retry', (t) => { | ||
t.plan(6) | ||
let count = 0 | ||
const server = createServer((req, res) => { | ||
if (count++ === 0) { | ||
res.destroy() | ||
} else { | ||
res.end() | ||
} | ||
}) | ||
t.teardown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 3 | ||
}) | ||
t.teardown(client.destroy.bind(client)) | ||
client.once('disconnect', () => { | ||
t.pass() | ||
client.on('disconnect', () => { | ||
t.fail() | ||
}) | ||
}) | ||
client[kConnect](() => { | ||
client.pipeline({ | ||
method: 'GET', | ||
path: '/' | ||
}, ({ body }) => body) | ||
.on('error', (err) => { | ||
t.ok(err) | ||
}) | ||
.end() | ||
.resume() | ||
t.strictDeepEqual(client.running, 1) | ||
client.pipeline({ | ||
method: 'GET', | ||
path: '/' | ||
}, ({ body }) => body).end().resume() | ||
t.strictDeepEqual(client.running, 2) | ||
client.pipeline({ | ||
method: 'GET', | ||
path: '/' | ||
}, ({ body }) => body).end().resume() | ||
t.strictDeepEqual(client.running, 3) | ||
client.close(() => { | ||
t.pass() | ||
}) | ||
}) | ||
}) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
244871
46
7904
10
35