undici
Advanced tools
Comparing version 1.2.6 to 1.3.0
@@ -6,4 +6,3 @@ 'use strict' | ||
const undici = require('..') | ||
const { kEnqueue, kGetNext } = require('../lib/symbols') | ||
const Request = require('../lib/request') | ||
const { kGetNext } = require('../lib/symbols') | ||
@@ -45,2 +44,4 @@ // # Start the h2o server (in h2o repository) | ||
Benchmark.options.minSamples = 200 | ||
suite | ||
@@ -64,14 +65,2 @@ .add('http - keepalive', { | ||
}) | ||
.add('http - noop', { | ||
defer: true, | ||
fn: deferred => { | ||
http.get(httpOptions, response => { | ||
response | ||
.resume() | ||
.on('end', () => { | ||
deferred.resolve() | ||
}) | ||
}) | ||
} | ||
}) | ||
.add('undici - pipeline', { | ||
@@ -137,3 +126,3 @@ defer: true, | ||
}) | ||
.add('undici - simple', { | ||
.add('undici - dispatch', { | ||
defer: true, | ||
@@ -150,3 +139,3 @@ fn: deferred => { | ||
const client = pool[kGetNext]() | ||
client[kEnqueue](new SimpleRequest(client, undiciOptions, stream)) | ||
client.dispatch(undiciOptions, new SimpleRequest(stream)) | ||
} | ||
@@ -158,3 +147,3 @@ }) | ||
const client = pool[kGetNext]() | ||
client[kEnqueue](new NoopRequest(client, undiciOptions, deferred)) | ||
client.dispatch(undiciOptions, new NoopRequest(deferred)) | ||
} | ||
@@ -167,13 +156,16 @@ }) | ||
class NoopRequest extends Request { | ||
constructor (client, opts, deferred) { | ||
super(opts, client) | ||
class NoopRequest { | ||
constructor (deferred) { | ||
this.deferred = deferred | ||
} | ||
_onHeaders () {} | ||
_onHeaders () { | ||
_onBody () {} | ||
} | ||
_onComplete () { | ||
_onData (chunk) { | ||
return true | ||
} | ||
_onComplete (trailers) { | ||
this.deferred.resolve() | ||
@@ -183,17 +175,13 @@ } | ||
class SimpleRequest extends Request { | ||
constructor (client, opts, dst) { | ||
super(opts, client) | ||
class SimpleRequest { | ||
constructor (dst) { | ||
this.dst = dst | ||
this.dst.on('drain', () => { | ||
this.resume() | ||
}) | ||
} | ||
_onHeaders (statusCode, headers, resume) { | ||
this.resume = resume | ||
this.dst.on('drain', resume) | ||
} | ||
_onBody (chunk, offset, length) { | ||
return this.dst.write(chunk.slice(offset, offset + length)) | ||
_onData (chunk) { | ||
return this.dst.write(chunk) | ||
} | ||
@@ -200,0 +188,0 @@ |
@@ -11,105 +11,68 @@ 'use strict' | ||
InvalidReturnValueError, | ||
NotSupportedError, | ||
RequestAbortedError | ||
} = require('./errors') | ||
const Request = require('./request') | ||
const util = require('./util') | ||
const assert = require('assert') | ||
const { kResume, kEnqueue } = require('./symbols') | ||
const { AsyncResource } = require('async_hooks') | ||
// TODO: Refactor | ||
const kResume = Symbol('resume') | ||
class PipelineRequest extends Request { | ||
constructor (client, opts, callback) { | ||
super(opts, client) | ||
class PipelineRequest extends Readable { | ||
constructor () { | ||
super({ autoDestroy: true }) | ||
this.callback = callback | ||
this.aborted = false | ||
this[kResume] = null | ||
} | ||
_onHeaders (statusCode, headers, resume) { | ||
const { callback } = this | ||
_read () { | ||
const { [kResume]: resume } = this | ||
assert(callback) | ||
this.callback = null | ||
this.res = callback.call(this, null, { | ||
statusCode, | ||
headers, | ||
opaque: this.opaque, | ||
resume | ||
}) | ||
if (resume) { | ||
this[kResume] = null | ||
resume() | ||
} | ||
} | ||
_onBody (chunk, offset, length) { | ||
return this.res(null, chunk.slice(offset, offset + length)) | ||
} | ||
_destroy (err, callback) { | ||
this._read() | ||
_onComplete (trailers) { | ||
// TODO: Trailers? | ||
const res = this.res | ||
this.res = null | ||
res(null, null) | ||
} | ||
_onError (err) { | ||
const { callback, res } = this | ||
if (res) { | ||
this.res = null | ||
res(err, null) | ||
if (!err && !this._readableState.endEmitted) { | ||
// This can happen if the server doesn't care | ||
// about the entire request body. | ||
} | ||
if (callback) { | ||
this.callback = null | ||
callback.call(this, err, null) | ||
} | ||
callback(err) | ||
} | ||
} | ||
module.exports = function (client, opts, handler) { | ||
try { | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
class PipelineResponse extends Readable { | ||
constructor (resume) { | ||
super({ autoDestroy: true, read: resume }) | ||
} | ||
if (typeof handler !== 'function') { | ||
throw new InvalidArgumentError('invalid handler') | ||
} | ||
_destroy (err, callback) { | ||
this._read() | ||
if (opts.method === 'CONNECT') { | ||
throw new NotSupportedError('CONNECT method is not supported') | ||
if (!err && !this._readableState.endEmitted) { | ||
err = new RequestAbortedError() | ||
} | ||
const req = new Readable({ | ||
autoDestroy: true, | ||
read () { | ||
if (this[kResume]) { | ||
const resume = this[kResume] | ||
this[kResume] = null | ||
resume() | ||
} | ||
}, | ||
destroy (err, callback) { | ||
this._read() | ||
callback(err) | ||
} | ||
} | ||
if (err) { | ||
util.destroy(ret, err) | ||
} else if (!this._readableState.endEmitted) { | ||
// This can happen if the server doesn't care | ||
// about the entire request body. | ||
ret.end() | ||
} | ||
class PipelineHandler extends AsyncResource { | ||
constructor (opts, handler) { | ||
super('UNDICI_PIPELINE') | ||
request.runInAsyncScope(callback, null, err, null) | ||
} | ||
}) | ||
this.opaque = opts.opaque || null | ||
this.handler = handler | ||
let res | ||
let body | ||
this.req = new PipelineRequest() | ||
const ret = new Duplex({ | ||
this.ret = new Duplex({ | ||
readableObjectMode: opts.objectMode, | ||
autoDestroy: true, | ||
read () { | ||
read: () => { | ||
const { body } = this | ||
if (body && body.resume) { | ||
@@ -119,4 +82,6 @@ body.resume() | ||
}, | ||
write (chunk, encoding, callback) { | ||
if (req.destroyed || req.push(chunk, encoding)) { | ||
write: (chunk, encoding, callback) => { | ||
const { req } = this | ||
if (req.push(chunk, encoding) || req._readableState.destroyed) { | ||
callback() | ||
@@ -127,6 +92,9 @@ } else { | ||
}, | ||
destroy (err, callback) { | ||
if (!err && !this._readableState.endEmitted) { | ||
destroy: (err, callback) => { | ||
const { body, req, res, ret } = this | ||
if (!err && !ret._readableState.endEmitted) { | ||
err = new RequestAbortedError() | ||
} | ||
util.destroy(body, err) | ||
@@ -136,121 +104,142 @@ util.destroy(req, err) | ||
if (err) { | ||
request.onError(err) | ||
} | ||
request.runInAsyncScope( | ||
callback, | ||
null, | ||
err, | ||
null | ||
) | ||
callback(err) | ||
} | ||
}).on('prefinish', () => { | ||
const { req } = this | ||
// Node < 15 does not call _final in same tick. | ||
req.push(null) | ||
client[kResume]() | ||
}) | ||
// TODO: Avoid copy. | ||
opts = { ...opts, body: req } | ||
this.res = null | ||
} | ||
const request = new PipelineRequest(client, opts, function (err, data) { | ||
if (err) { | ||
util.destroy(ret, err) | ||
return | ||
} | ||
_onHeaders (statusCode, headers, resume) { | ||
const { opaque, handler, ret } = this | ||
const { | ||
if (statusCode < 200) { | ||
return | ||
} | ||
this.res = new PipelineResponse(resume) | ||
let body | ||
try { | ||
this.handler = null | ||
body = this.runInAsyncScope(handler, null, { | ||
statusCode, | ||
headers, | ||
opaque, | ||
resume | ||
} = data | ||
body: this.res | ||
}) | ||
} catch (err) { | ||
this.res.on('error', util.nop) | ||
util.destroy(ret, err) | ||
return | ||
} | ||
const request = this | ||
res = new Readable({ | ||
autoDestroy: true, | ||
read: resume, | ||
destroy (err, callback) { | ||
resume() | ||
if ( | ||
!body || | ||
typeof body.on !== 'function' | ||
) { | ||
util.destroy(ret, new InvalidReturnValueError('expected Readable')) | ||
return | ||
} | ||
if (!err && !this._readableState.endEmitted) { | ||
err = new RequestAbortedError() | ||
} | ||
body | ||
.on('data', (chunk) => { | ||
const { ret, body } = this | ||
if (err) { | ||
util.destroy(ret, err) | ||
} | ||
request.runInAsyncScope( | ||
callback, | ||
null, | ||
err, | ||
null | ||
) | ||
if (!ret.push(chunk) && body.pause) { | ||
body.pause() | ||
} | ||
}) | ||
.on('error', (err) => { | ||
const { ret } = this | ||
try { | ||
body = handler({ | ||
statusCode, | ||
headers, | ||
opaque, | ||
body: res | ||
}) | ||
} catch (err) { | ||
res.on('error', util.nop) | ||
util.destroy(ret, err) | ||
return | ||
} | ||
}) | ||
.on('end', () => { | ||
const { ret } = this | ||
// TODO: Should we allow !body? | ||
if (!body || typeof body.on !== 'function') { | ||
util.destroy(ret, new InvalidReturnValueError('expected Readable')) | ||
return | ||
} | ||
ret.push(null) | ||
}) | ||
.on('close', () => { | ||
const { ret } = this | ||
// TODO: If body === res then avoid intermediate | ||
// and write directly to ret.push? Or should this | ||
// happen when body is null? | ||
if (!ret._readableState.ended) { | ||
util.destroy(ret, new RequestAbortedError()) | ||
} | ||
}) | ||
let ended = false | ||
body | ||
.on('data', function (chunk) { | ||
if (!ret.push(chunk) && this.pause) { | ||
this.pause() | ||
} | ||
}) | ||
.on('error', function (err) { | ||
util.destroy(ret, err) | ||
}) | ||
.on('end', function () { | ||
ended = true | ||
ret.push(null) | ||
}) | ||
.on('close', function () { | ||
if (!ended) { | ||
util.destroy(ret, new RequestAbortedError()) | ||
} | ||
}) | ||
this.body = body | ||
} | ||
if (typeof body.destroy === 'function') { | ||
body.destroy = this.runInAsyncScope.bind(this, body.destroy, body) | ||
} | ||
_onData (chunk) { | ||
const { res } = this | ||
return function (err, chunk) { | ||
if (res.destroyed) { | ||
return null | ||
} else if (err) { | ||
res.destroy(err) | ||
} else { | ||
const ret = res.push(chunk) | ||
return res.destroyed ? null : ret | ||
} | ||
} | ||
}) | ||
if (res._readableState.destroyed) { | ||
return | ||
} | ||
client[kEnqueue](request) | ||
return res.push(chunk) | ||
} | ||
return ret | ||
_onComplete (trailers) { | ||
const { res } = this | ||
if (res._readableState.destroyed) { | ||
return | ||
} | ||
res.push(null) | ||
} | ||
_onError (err) { | ||
const { ret } = this | ||
this.handler = null | ||
util.destroy(ret, err) | ||
} | ||
} | ||
module.exports = function (client, opts, handler) { | ||
try { | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
if (typeof handler !== 'function') { | ||
throw new InvalidArgumentError('invalid handler') | ||
} | ||
if (opts.method === 'CONNECT') { | ||
throw new InvalidArgumentError('invalid method') | ||
} | ||
const pipeline = new PipelineHandler(opts, handler) | ||
const { | ||
path, | ||
method, | ||
headers, | ||
idempotent, | ||
servername, | ||
signal, | ||
requestTimeout | ||
} = opts | ||
client.dispatch({ | ||
path, | ||
method, | ||
body: pipeline.req, | ||
headers, | ||
idempotent, | ||
servername, | ||
signal, | ||
requestTimeout | ||
}, pipeline) | ||
return pipeline.ret | ||
} catch (err) { | ||
@@ -257,0 +246,0 @@ return new PassThrough().destroy(err) |
'use strict' | ||
const { Readable } = require('stream') | ||
const Request = require('./request') | ||
const { | ||
InvalidArgumentError, | ||
RequestAbortedError, | ||
NotSupportedError | ||
RequestAbortedError | ||
} = require('./errors') | ||
const util = require('./util') | ||
const { kEnqueue } = require('./symbols') | ||
const assert = require('assert') | ||
const { AsyncResource } = require('async_hooks') | ||
class RequestRequest extends Request { | ||
constructor (client, opts, callback) { | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
class RequestResponse extends Readable { | ||
constructor (resume) { | ||
super({ autoDestroy: true, read: resume }) | ||
} | ||
if (opts.method === 'CONNECT') { | ||
throw new NotSupportedError('CONNECT method is not supported') | ||
_destroy (err, callback) { | ||
this._read() | ||
if (!err && !this._readableState.endEmitted) { | ||
err = new RequestAbortedError() | ||
} | ||
super(opts, client) | ||
callback(err) | ||
} | ||
} | ||
class RequestHandler extends AsyncResource { | ||
constructor (opts, callback) { | ||
super('UNDICI_REQUEST') | ||
this.opaque = opts.opaque || null | ||
this.callback = callback | ||
@@ -33,45 +39,45 @@ this.res = null | ||
assert(callback) | ||
if (statusCode < 200) { | ||
return | ||
} | ||
const body = new RequestResponse(resume) | ||
this.callback = null | ||
this.res = body | ||
const request = this | ||
this.res = new Readable({ | ||
autoDestroy: true, | ||
read: resume, | ||
destroy (err, callback) { | ||
if (!err && !this._readableState.endEmitted) { | ||
err = new RequestAbortedError() | ||
} | ||
if (err) { | ||
request.onError(err) | ||
} | ||
request.runInAsyncScope(callback, null, err, null) | ||
} | ||
}) | ||
callback(null, { | ||
this.runInAsyncScope(callback, null, null, { | ||
statusCode, | ||
headers, | ||
opaque, | ||
body: this.res | ||
body | ||
}) | ||
} | ||
_onBody (chunk, offset, length) { | ||
return this.res.push(chunk.slice(offset, offset + length)) | ||
_onData (chunk) { | ||
const { res } = this | ||
if (res._readableState.destroyed) { | ||
return | ||
} | ||
return res.push(chunk) | ||
} | ||
_onComplete (trailers) { | ||
this.res.push(null) | ||
const { res } = this | ||
if (res._readableState.destroyed) { | ||
return | ||
} | ||
res.push(null) | ||
} | ||
_onError (err) { | ||
const { res, callback } = this | ||
const { res, callback, opaque } = this | ||
if (callback) { | ||
this.callback = null | ||
process.nextTick(callback, err, null) | ||
this.runInAsyncScope(callback, null, err, { opaque }) | ||
} | ||
@@ -100,3 +106,11 @@ | ||
try { | ||
client[kEnqueue](new RequestRequest(client, opts, callback)) | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
if (opts.method === 'CONNECT') { | ||
throw new InvalidArgumentError('invalid method') | ||
} | ||
client.dispatch(opts, new RequestHandler(opts, callback)) | ||
} catch (err) { | ||
@@ -103,0 +117,0 @@ process.nextTick(callback, err, null) |
'use strict' | ||
const { finished } = require('stream') | ||
const Request = require('./request') | ||
const { | ||
InvalidArgumentError, | ||
InvalidReturnValueError, | ||
NotSupportedError | ||
InvalidReturnValueError | ||
} = require('./errors') | ||
const util = require('./util') | ||
const { kEnqueue } = require('./symbols') | ||
const assert = require('assert') | ||
const { AsyncResource } = require('async_hooks') | ||
class StreamRequest extends Request { | ||
constructor (client, opts, factory, callback) { | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
class StreamHandler extends AsyncResource { | ||
constructor (opts, factory, callback) { | ||
super('UNDICI_STREAM') | ||
if (typeof factory !== 'function') { | ||
throw new InvalidArgumentError('invalid factory') | ||
} | ||
if (opts.method === 'CONNECT') { | ||
throw new NotSupportedError('CONNECT method is not supported') | ||
} | ||
super(opts, client) | ||
this.opaque = opts.opaque || null | ||
this.factory = factory | ||
this.callback = callback | ||
this.res = null | ||
this.trailers = null | ||
} | ||
_onHeaders (statusCode, headers, resume) { | ||
const { factory, opaque } = this | ||
const { factory, callback, opaque } = this | ||
assert(factory) | ||
this.factory = null | ||
if (statusCode < 200) { | ||
return | ||
} | ||
let res | ||
try { | ||
res = factory({ | ||
this.factory = null | ||
res = this.runInAsyncScope(factory, null, { | ||
statusCode, | ||
@@ -48,81 +37,67 @@ headers, | ||
}) | ||
if ( | ||
!res || | ||
typeof res.write !== 'function' || | ||
typeof res.end !== 'function' || | ||
typeof res.on !== 'function' | ||
) { | ||
throw new InvalidReturnValueError('expected Writable') | ||
} | ||
} catch (err) { | ||
this.onError(err) | ||
return | ||
} | ||
const { callback } = this | ||
if (!callback) { | ||
// Aborted inside factory. | ||
return | ||
} | ||
if (!res) { | ||
this.callback = null | ||
callback(null, null) | ||
this.runInAsyncScope(callback, null, err, { opaque }) | ||
return | ||
} | ||
if ( | ||
typeof res.write !== 'function' || | ||
typeof res.end !== 'function' || | ||
typeof res.on !== 'function' | ||
) { | ||
this.onError(new InvalidReturnValueError('expected Writable')) | ||
return | ||
} | ||
res.on('drain', resume) | ||
// TODO: Avoid finished. It registers an unecessary amount of listeners. | ||
finished(res, { readable: false }, (err) => { | ||
if (err) { | ||
this.onError(err) | ||
return | ||
} | ||
const { callback, res, opaque, trailers } = this | ||
const { callback, res } = this | ||
assert(res) | ||
this.res = null | ||
if (!res.readable) { | ||
util.destroy(res) | ||
if (err || !res.readable) { | ||
util.destroy(res, err) | ||
} | ||
assert(callback) | ||
this.callback = null | ||
callback(null, null) | ||
this.runInAsyncScope(callback, null, err || null, { opaque, trailers }) | ||
}) | ||
if (typeof res.destroy === 'function') { | ||
res.destroy = this.runInAsyncScope.bind(this, res.destroy, res) | ||
} | ||
this.res = res | ||
} | ||
_onBody (chunk, offset, length) { | ||
return this.res | ||
? this.res.write(chunk.slice(offset, offset + length)) | ||
: true | ||
_onData (chunk) { | ||
const { res } = this | ||
if (util.isDestroyed(res)) { | ||
return | ||
} | ||
return res.write(chunk) | ||
} | ||
_onComplete (trailers) { | ||
if (this.res) { | ||
this.res.end() | ||
const { res } = this | ||
if (util.isDestroyed(res)) { | ||
return | ||
} | ||
this.trailers = trailers || {} | ||
res.end() | ||
} | ||
_onError (err) { | ||
const { res, callback } = this | ||
const { res, callback, opaque } = this | ||
this.factory = null | ||
assert(callback) | ||
this.callback = null | ||
process.nextTick(callback, err, null) | ||
if (res) { | ||
this.res = null | ||
util.destroy(res, err) | ||
} else if (callback) { | ||
this.callback = null | ||
this.runInAsyncScope(callback, null, err, { opaque }) | ||
} | ||
@@ -146,3 +121,15 @@ } | ||
try { | ||
client[kEnqueue](new StreamRequest(client, opts, factory, callback)) | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
if (typeof factory !== 'function') { | ||
throw new InvalidArgumentError('invalid factory') | ||
} | ||
if (opts.method === 'CONNECT') { | ||
throw new InvalidArgumentError('invalid method') | ||
} | ||
client.dispatch(opts, new StreamHandler(opts, factory, callback)) | ||
} catch (err) { | ||
@@ -149,0 +136,0 @@ process.nextTick(callback, err, null) |
@@ -11,2 +11,3 @@ 'use strict' | ||
const util = require('./util') | ||
const Request = require('./request') | ||
const { | ||
@@ -21,4 +22,3 @@ ContentLengthMismatchError, | ||
SocketError, | ||
InformationalError, | ||
NotSupportedError | ||
InformationalError | ||
} = require('./errors') | ||
@@ -28,3 +28,6 @@ const { | ||
kReset, | ||
kPause, | ||
kResume, | ||
kClient, | ||
kParser, | ||
kConnect, | ||
@@ -35,2 +38,3 @@ kResuming, | ||
kServerName, | ||
kIdleTimeout, | ||
kSocketTimeout, | ||
@@ -51,5 +55,7 @@ kRequestTimeout, | ||
kSocketPath, | ||
kEnqueue, | ||
kKeepAliveTimeout, | ||
kMaxHeadersSize, | ||
kHeadersTimeout | ||
kHeadersTimeout, | ||
kMaxKeepAliveTimeout, | ||
kKeepAliveTimeoutThreshold | ||
} = require('./symbols') | ||
@@ -59,2 +65,4 @@ const makeStream = require('./client-stream') | ||
const makePipeline = require('./client-pipeline') | ||
const makeUpgrade = require('./client-upgrade') | ||
const makeConnect = require('./client-connect') | ||
@@ -72,2 +80,5 @@ const CRLF = Buffer.from('\r\n', 'ascii') | ||
socketTimeout, | ||
idleTimeout, | ||
maxKeepAliveTimeout, | ||
keepAliveTimeoutThreshold, | ||
socketPath, | ||
@@ -120,2 +131,14 @@ requestTimeout, | ||
if (idleTimeout != null && (!Number.isFinite(idleTimeout) || idleTimeout <= 0)) { | ||
throw new InvalidArgumentError('invalid idleTimeout') | ||
} | ||
if (maxKeepAliveTimeout != null && (!Number.isFinite(maxKeepAliveTimeout) || maxKeepAliveTimeout <= 0)) { | ||
throw new InvalidArgumentError('invalid maxKeepAliveTimeout') | ||
} | ||
if (keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold)) { | ||
throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold') | ||
} | ||
if (requestTimeout != null && !Number.isFinite(requestTimeout)) { | ||
@@ -137,2 +160,6 @@ throw new InvalidArgumentError('invalid requestTimeout') | ||
this[kSocketTimeout] = socketTimeout == null ? 30e3 : socketTimeout | ||
this[kMaxKeepAliveTimeout] = maxKeepAliveTimeout == null ? 600e3 : maxKeepAliveTimeout | ||
this[kIdleTimeout] = idleTimeout == null ? 4e3 : idleTimeout | ||
this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 1e3 : keepAliveTimeoutThreshold | ||
this[kKeepAliveTimeout] = this[kIdleTimeout] | ||
this[kRequestTimeout] = requestTimeout == null ? 30e3 : requestTimeout | ||
@@ -212,7 +239,7 @@ this[kClosed] = false | ||
for (let n = this[kPendingIdx]; n < this[kQueue].length; n++) { | ||
const { idempotent, body, method } = this[kQueue][n] | ||
if (!idempotent) { | ||
const { idempotent, upgrade, body, method } = this[kQueue][n] | ||
if (!idempotent || upgrade) { | ||
return true | ||
} | ||
if (method === 'HEAD') { | ||
if (method === 'HEAD' || method === 'CONNECT') { | ||
return true | ||
@@ -239,6 +266,2 @@ } | ||
[kResume] () { | ||
resume(this) | ||
} | ||
/* istanbul ignore next: only used for test */ | ||
@@ -256,3 +279,4 @@ [kConnect] (cb) { | ||
[kEnqueue] (request) { | ||
dispatch (opts, handler) { | ||
const request = new Request(opts, this, handler) | ||
try { | ||
@@ -370,2 +394,10 @@ if (this[kDestroyed]) { | ||
} | ||
upgrade (opts, callback) { | ||
return makeUpgrade(this, opts, callback) | ||
} | ||
connect (opts, callback) { | ||
return makeConnect(this, opts, callback) | ||
} | ||
} | ||
@@ -391,5 +423,5 @@ | ||
this.socket = socket | ||
this.resumeSocket = () => socket.resume() | ||
this.statusCode = null | ||
this.upgrade = false | ||
this.headers = null | ||
@@ -401,6 +433,7 @@ this.read = 0 | ||
if (this.statusCode) { | ||
// https://github.com/mcollina/undici/issues/268 | ||
return | ||
// https://github.com/nodejs/node/pull/34578 | ||
this.socket._unrefTimer() | ||
} else { | ||
util.destroy(this.socket, new HeadersTimeoutError()) | ||
} | ||
util.destroy(this.socket, new HeadersTimeoutError()) | ||
} | ||
@@ -415,9 +448,57 @@ | ||
util.destroy(this.socket, ret) | ||
} else { | ||
// When the underlying `net.Socket` instance is consumed - no | ||
// `data` events are emitted, and thus `socket.setTimeout` fires the | ||
// callback even if the data is constantly flowing into the socket. | ||
// See, https://github.com/nodejs/node/commit/ec2822adaad76b126b5cccdeaa1addf2376c9aa6 | ||
this.socket._unrefTimer() | ||
return | ||
} | ||
// When the underlying `net.Socket` instance is consumed - no | ||
// `data` events are emitted, and thus `socket.setTimeout` fires the | ||
// callback even if the data is constantly flowing into the socket. | ||
// See, https://github.com/nodejs/node/commit/ec2822adaad76b126b5cccdeaa1addf2376c9aa6 | ||
this.socket._unrefTimer() | ||
assert(Number.isFinite(ret)) | ||
// This logic cannot live in kOnHeadersComplete since we | ||
// have no way of slicing the parsing buffer without knowing | ||
// the offset which is only provided in kOnExecute. | ||
if (this.upgrade && !this.socket.destroyed) { | ||
const { socket, client, headers, statusCode } = this | ||
const request = client[kQueue][client[kRunningIdx]] | ||
assert(!socket.destroyed) | ||
assert(socket === client[kSocket]) | ||
assert(!socket.isPaused()) | ||
assert(request.upgrade || request.method === 'CONNECT') | ||
this.read = 0 | ||
this.headers = null | ||
this.statusCode = null | ||
// _readableState.flowing might be `true` if the socket has been | ||
// explicitly `resume()`:d even if we never registered a 'data' | ||
// listener. | ||
// We need to stop unshift from emitting 'data'. However, we cannot | ||
// call pause() as that will stop socket from automatically resuming | ||
// when 'data' listener is registered. | ||
// Reset socket state to non flowing: | ||
socket._readableState.flowing = null | ||
socket.unshift(this.getCurrentBuffer().slice(ret)) | ||
request.onUpgrade(statusCode, headers, socket) | ||
if (!socket.destroyed) { | ||
detachSocket(socket) | ||
client[kSocket] = null | ||
client[kQueue][client[kRunningIdx]++] = null | ||
client.emit('disconnect', new InformationalError('upgrade')) | ||
this.unconsume() | ||
setImmediate(() => this.close()) | ||
resume(client) | ||
} | ||
} | ||
} | ||
@@ -427,12 +508,23 @@ | ||
url, statusCode, statusMessage, upgrade, shouldKeepAlive) { | ||
const { client, socket, resumeSocket, headers } = this | ||
const { client, socket } = this | ||
const request = client[kQueue][client[kRunningIdx]] | ||
// TODO: Check for content-length mismatch? | ||
// TODO: Check for content-length mismatch from server? | ||
assert(!this.upgrade) | ||
assert(this.statusCode < 200) | ||
this.headers = null | ||
this.headers = util.parseHeaders(rawHeaders, this.headers) | ||
this.statusCode = statusCode | ||
if (upgrade !== request.upgrade) { | ||
util.destroy(socket, new SocketError('bad upgrade')) | ||
return 1 | ||
} | ||
if (upgrade || request.method === 'CONNECT') { | ||
this.upgrade = true | ||
return 2 | ||
} | ||
if (!shouldKeepAlive) { | ||
@@ -442,5 +534,20 @@ client[kReset] = true | ||
if (upgrade) { | ||
util.destroy(socket, new NotSupportedError('upgrade not supported')) | ||
return true | ||
const { headers } = this | ||
this.headers = null | ||
if (headers['keep-alive']) { | ||
const m = headers['keep-alive'].match(/timeout=(\d+)/) | ||
if (m) { | ||
const keepAliveTimeout = Math.min( | ||
Number(m[1]) * 1000 - client[kKeepAliveTimeoutThreshold], | ||
client[kMaxKeepAliveTimeout] | ||
) | ||
if (!keepAliveTimeout || keepAliveTimeout < 1e3) { | ||
client[kReset] = true | ||
} else { | ||
client[kKeepAliveTimeout] = keepAliveTimeout | ||
} | ||
} | ||
} else { | ||
client[kKeepAliveTimeout] = client[kIdleTimeout] | ||
} | ||
@@ -450,9 +557,9 @@ | ||
if (statusCode < 200) { | ||
// request.onInfo(statusCode, util.parseHeaders(rawHeaders, headers)) | ||
if (statusCode === 100) { | ||
// TODO: 100 Continue | ||
} else { | ||
request.onHeaders(statusCode, util.parseHeaders(rawHeaders, headers), resumeSocket) | ||
request.onHeaders(statusCode, headers, statusCode < 200 ? null : socket[kResume]) | ||
} | ||
return request.method === 'HEAD' || statusCode < 200 | ||
return request.method === 'HEAD' || statusCode < 200 ? 1 : 0 | ||
} | ||
@@ -463,7 +570,2 @@ | ||
if (length === 1) { | ||
// https://github.com/mcollina/undici/issues/269 | ||
socket._unrefTimer() | ||
} | ||
assert(statusCode >= 200) | ||
@@ -475,2 +577,5 @@ | ||
// TODO: maxAbortedPayload should count from when aborted | ||
// not from start. | ||
const ret = request.onBody(chunk, offset, length) | ||
@@ -485,8 +590,13 @@ if (ret == null && this.read > client[kMaxAbortedPayload]) { | ||
[HTTPParser.kOnMessageComplete] () { | ||
const { client, socket, statusCode, headers } = this | ||
const { client, socket, statusCode, headers, upgrade } = this | ||
const request = client[kQueue][client[kRunningIdx]] | ||
assert(statusCode >= 100) | ||
const request = client[kQueue][client[kRunningIdx]] | ||
if (upgrade) { | ||
assert(statusCode < 300 || request.method === 'CONNECT') | ||
return | ||
} | ||
this.read = 0 | ||
this.statusCode = null | ||
@@ -502,6 +612,10 @@ this.headers = null | ||
this.read = 0 | ||
client[kQueue][client[kRunningIdx]++] = null | ||
if (client[kReset]) { | ||
if (client[kWriting]) { | ||
// TOOD: keep writing until maxAbortedPayload? | ||
// Response completed before request. | ||
util.destroy(socket, new InformationalError('request reset')) | ||
} else if (client[kReset]) { | ||
// https://tools.ietf.org/html/rfc7231#section-4.3.1 | ||
@@ -518,24 +632,65 @@ // https://tools.ietf.org/html/rfc7231#section-4.3.2 | ||
} else { | ||
socket.resume() | ||
resume(client) | ||
} | ||
socket.resume() | ||
} | ||
} | ||
destroy (err) { | ||
const { client, socket } = this | ||
function onSocketConnect () { | ||
const { [kClient]: client } = this | ||
assert(err) | ||
assert(socket.destroyed) | ||
client[kReset] = false | ||
client[kRetryDelay] = 0 | ||
client.emit('connect') | ||
resume(client) | ||
} | ||
this.unconsume() | ||
function onSocketTimeout () { | ||
util.destroy(this, new SocketTimeoutError()) | ||
} | ||
// Make sure the parser's stack has unwound before deleting the | ||
// corresponding C++ object through .close(). | ||
setImmediate(() => this.close()) | ||
function onSocketError (err) { | ||
const { [kClient]: client, [kServerName]: servername } = this | ||
if (!client.running) { | ||
return | ||
this[kError] = err | ||
if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') { | ||
assert(!client.running) | ||
while (client.pending && client[kQueue][client[kPendingIdx]].servername === servername) { | ||
const request = client[kQueue][client[kPendingIdx]++] | ||
request.onError(err) | ||
} | ||
} else if ( | ||
!client.running && | ||
err.code !== 'ECONNRESET' && | ||
err.code !== 'ECONNREFUSED' && | ||
err.code !== 'EHOSTUNREACH' && | ||
err.code !== 'EHOSTDOWN' && | ||
err.code !== 'UND_ERR_SOCKET' && | ||
err.code !== 'UND_ERR_INFO' | ||
) { | ||
assert(client[kPendingIdx] === client[kRunningIdx]) | ||
// Error is not caused by running request and not a recoverable | ||
// socket error. | ||
for (const request of client[kQueue].splice(client[kRunningIdx])) { | ||
request.onError(err) | ||
} | ||
} | ||
} | ||
function onSocketEnd () { | ||
util.destroy(this, new SocketError('other side closed')) | ||
} | ||
function onSocketClose () { | ||
const { [kClient]: client, [kParser]: parser } = this | ||
const err = this[kError] || new SocketError('closed') | ||
client[kSocket] = null | ||
parser.unconsume() | ||
setImmediate(() => parser.close()) | ||
if (client.running > 0) { | ||
// Retry all idempotent requests except for the one | ||
@@ -557,4 +712,24 @@ // at the head of the pipeline. | ||
} | ||
if (!client.destroyed) { | ||
client.emit('disconnect', err) | ||
} | ||
resume(client) | ||
} | ||
function detachSocket (socket) { | ||
socket[kPause] = null | ||
socket[kResume] = null | ||
socket[kClient] = null | ||
socket[kParser] = null | ||
socket[kError] = null | ||
socket[kServerName] = null | ||
socket | ||
.removeListener('timeout', onSocketTimeout) | ||
.removeListener('error', onSocketError) | ||
.removeListener('end', onSocketEnd) | ||
.removeListener('close', onSocketClose) | ||
} | ||
function connect (client) { | ||
@@ -593,78 +768,16 @@ assert(!client[kSocket]) | ||
socket[kPause] = socket.pause.bind(socket) | ||
socket[kResume] = socket.resume.bind(socket) | ||
socket[kError] = null | ||
socket.setTimeout(client[kSocketTimeout], function () { | ||
util.destroy(this, new SocketTimeoutError()) | ||
}) | ||
socket[kParser] = parser | ||
socket[kClient] = client | ||
socket[kServerName] = servername | ||
socket | ||
.setNoDelay(true) | ||
.on(protocol === 'https:' ? 'secureConnect' : 'connect', function () { | ||
client[kReset] = false | ||
client[kRetryDelay] = 0 | ||
client.emit('connect') | ||
resume(client) | ||
}) | ||
.on('data', /* istanbul ignore next */ function () { | ||
/* istanbul ignore next */ | ||
assert(false) | ||
}) | ||
.on('error', function (err) { | ||
this[kError] = err | ||
if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') { | ||
assert(!client.running) | ||
while (client.pending && client[kQueue][client[kPendingIdx]].servername === servername) { | ||
const request = client[kQueue][client[kPendingIdx]++] | ||
request.onError(err) | ||
} | ||
} else if ( | ||
!client.running && | ||
err.code !== 'ECONNRESET' && | ||
err.code !== 'ECONNREFUSED' && | ||
err.code !== 'EHOSTUNREACH' && | ||
err.code !== 'EHOSTDOWN' && | ||
err.code !== 'UND_ERR_SOCKET' && | ||
err.code !== 'UND_ERR_INFO' | ||
) { | ||
assert(client[kPendingIdx] === client[kRunningIdx]) | ||
// Error is not caused by running request and not a recoverable | ||
// socket error. | ||
for (const request of client[kQueue].splice(client[kRunningIdx])) { | ||
request.onError(err) | ||
} | ||
} | ||
}) | ||
.on('end', function () { | ||
util.destroy(this, new SocketError('other side closed')) | ||
}) | ||
.on('close', function () { | ||
if (!this[kError]) { | ||
this[kError] = new SocketError('closed') | ||
} | ||
client[kSocket] = null | ||
parser.destroy(this[kError]) | ||
if (client.destroyed) { | ||
resume(client) | ||
return | ||
} | ||
if (client.pending > 0) { | ||
if (client[kRetryDelay]) { | ||
client[kRetryTimeout] = setTimeout(() => { | ||
client[kRetryTimeout] = null | ||
connect(client) | ||
}, client[kRetryDelay]) | ||
client[kRetryDelay] = Math.min(client[kRetryDelay] * 2, client[kSocketTimeout]) | ||
} else { | ||
connect(client) | ||
client[kRetryDelay] = 1e3 | ||
} | ||
} | ||
client.emit('disconnect', this[kError]) | ||
resume(client) | ||
}) | ||
.setTimeout(client[kIdleTimeout]) | ||
.on(protocol === 'https:' ? 'secureConnect' : 'connect', onSocketConnect) | ||
.on('timeout', onSocketTimeout) | ||
.on('error', onSocketError) | ||
.on('end', onSocketEnd) | ||
.on('close', onSocketClose) | ||
} | ||
@@ -687,2 +800,10 @@ | ||
} | ||
if ( | ||
client[kSocket] && | ||
client[kSocket].timeout !== client[kKeepAliveTimeout] | ||
) { | ||
client[kSocket].setTimeout(client[kKeepAliveTimeout]) | ||
} | ||
if (client[kRunningIdx] > 0) { | ||
@@ -693,2 +814,3 @@ client[kQueue].length = 0 | ||
} | ||
return | ||
@@ -733,3 +855,12 @@ } | ||
if (!client[kSocket] && !client[kRetryTimeout]) { | ||
connect(client) | ||
if (client[kRetryDelay]) { | ||
client[kRetryTimeout] = setTimeout(() => { | ||
client[kRetryTimeout] = null | ||
connect(client) | ||
}, client[kRetryDelay]) | ||
client[kRetryDelay] = Math.min(client[kRetryDelay] * 2, client[kSocketTimeout]) | ||
} else { | ||
connect(client) | ||
client[kRetryDelay] = 1e3 | ||
} | ||
return | ||
@@ -781,2 +912,9 @@ } | ||
if (client.running && (request.upgrade || request.method === 'CONNECT')) { | ||
// Don't dispatch an upgrade until all preceeding requests have completed. | ||
// A misbehaving server might upgrade the connection before all pipelined | ||
// request has completed. | ||
return | ||
} | ||
try { | ||
@@ -792,7 +930,7 @@ write(client, request) | ||
function write (client, request) { | ||
const { method } = request | ||
const { body, header, upgrade, method } = request | ||
let contentLength = util.bodyLength(request.body, true) | ||
if (contentLength === undefined) { | ||
if (contentLength === null) { | ||
contentLength = request.contentLength | ||
@@ -814,18 +952,15 @@ } | ||
contentLength = undefined | ||
contentLength = null | ||
} | ||
if (request.contentLength !== undefined && request.contentLength !== contentLength) { | ||
if (request.contentLength !== null && request.contentLength !== contentLength) { | ||
throw new ContentLengthMismatchError() | ||
} | ||
const { body, header } = request | ||
const socket = client[kSocket] | ||
socket.setTimeout(client[kSocketTimeout]) | ||
socket.cork() | ||
socket.write(header) | ||
if (contentLength !== undefined) { | ||
socket.write(`content-length: ${contentLength}\r\n`, 'ascii') | ||
} | ||
// TODO: Expect: 100-continue | ||
@@ -840,2 +975,7 @@ if (method === 'HEAD') { | ||
if (method === 'CONNECT' || upgrade) { | ||
// Block pipeline from dispatching further requests on this connection. | ||
client[kReset] = true | ||
} | ||
// TODO: An HTTP/1.1 user agent MUST NOT preface | ||
@@ -846,12 +986,14 @@ // or follow a request with an extra CRLF. | ||
if (!body) { | ||
socket.write(CRLF) | ||
socket.write(header) | ||
if (contentLength === 0) { | ||
socket.write(`content-length: ${contentLength}\r\n\r\n\r\n`, 'ascii') | ||
} else { | ||
assert(contentLength === null, 'no body must not have content length') | ||
socket.write(CRLF) | ||
} else { | ||
assert(contentLength === undefined, 'no body must not have content length') | ||
} | ||
} else if (body instanceof Uint8Array) { | ||
assert(contentLength !== undefined, 'buffer body must have content length') | ||
assert(contentLength !== null, 'buffer body must have content length') | ||
socket.write(CRLF) | ||
socket.write(header) | ||
socket.write(`content-length: ${contentLength}\r\n\r\n`, 'ascii') | ||
socket.write(body) | ||
@@ -866,2 +1008,5 @@ socket.write(CRLF) | ||
// TODO: Only write once we know we have data? | ||
socket.write(header) | ||
let finished = false | ||
@@ -880,3 +1025,3 @@ let bytesWritten = 0 | ||
// We should defer writing chunks. | ||
if (contentLength !== undefined && bytesWritten + len > contentLength) { | ||
if (contentLength !== null && bytesWritten + len > contentLength) { | ||
util.destroy(this, new ContentLengthMismatchError()) | ||
@@ -887,7 +1032,11 @@ return | ||
if (bytesWritten === 0) { | ||
socket.write(contentLength === undefined ? 'transfer-encoding: chunked\r\n' : '\r\n', 'ascii') | ||
if (contentLength === null) { | ||
socket.write('transfer-encoding: chunked\r\n', 'ascii') | ||
} else { | ||
socket.write(`content-length: ${contentLength}\r\n\r\n`, 'ascii') | ||
} | ||
client[kReset] = !expectsPayload | ||
} | ||
if (contentLength === undefined) { | ||
if (contentLength === null) { | ||
socket.write(`\r\n${len.toString(16)}\r\n`, 'ascii') | ||
@@ -912,2 +1061,5 @@ } | ||
} | ||
const onClose = function () { | ||
onFinished(new SocketError('closed')) | ||
} | ||
const onFinished = function (err) { | ||
@@ -923,7 +1075,3 @@ if (finished) { | ||
if (!err) { | ||
err = socket[kError] | ||
} | ||
if (!err && contentLength !== undefined && bytesWritten !== contentLength) { | ||
if (!err && contentLength !== null && bytesWritten !== contentLength) { | ||
err = new ContentLengthMismatchError() | ||
@@ -935,3 +1083,3 @@ } | ||
.removeListener('error', onFinished) | ||
.removeListener('close', onFinished) | ||
.removeListener('close', onClose) | ||
body | ||
@@ -952,3 +1100,3 @@ .removeListener('data', onData) | ||
if (bytesWritten === 0) { | ||
if (contentLength === undefined && expectsPayload) { | ||
if (contentLength === null && expectsPayload) { | ||
// https://tools.ietf.org/html/rfc7230#section-3.3.2 | ||
@@ -961,3 +1109,3 @@ // A user agent SHOULD send a Content-Length in a request message when | ||
} | ||
} else if (contentLength === undefined) { | ||
} else if (contentLength === null) { | ||
socket.write('\r\n0\r\n', 'ascii') | ||
@@ -964,0 +1112,0 @@ } |
@@ -15,7 +15,3 @@ 'use strict' | ||
connections, | ||
maxAbortedPayload, | ||
socketTimeout, | ||
requestTimeout, | ||
pipelining, | ||
tls | ||
...options | ||
} = {}) { | ||
@@ -28,9 +24,3 @@ if (connections != null && (!Number.isFinite(connections) || connections <= 0)) { | ||
length: connections || 10 | ||
}, () => new Client(url, { | ||
maxAbortedPayload, | ||
socketTimeout, | ||
requestTimeout, | ||
pipelining, | ||
tls | ||
})) | ||
}, () => new Client(url, options)) | ||
} | ||
@@ -44,3 +34,2 @@ | ||
stream (opts, factory, cb) { | ||
// needed because we need the return value from client.stream | ||
if (cb === undefined) { | ||
@@ -62,3 +51,2 @@ return new Promise((resolve, reject) => { | ||
request (opts, cb) { | ||
// needed because we need the return value from client.request | ||
if (cb === undefined) { | ||
@@ -75,2 +63,14 @@ return new Promise((resolve, reject) => { | ||
upgrade (opts, callback) { | ||
return getNext(this).upgrade(opts, callback) | ||
} | ||
connect (opts, callback) { | ||
return getNext(this).connect(opts, callback) | ||
} | ||
dispatch (opts, handler) { | ||
return getNext(this).dispatch(opts, handler) | ||
} | ||
close (cb) { | ||
@@ -77,0 +77,0 @@ const promise = Promise.all(this[kClients].map(c => c.close())) |
'use strict' | ||
const { AsyncResource } = require('async_hooks') | ||
const { | ||
InvalidArgumentError, | ||
RequestAbortedError, | ||
RequestTimeoutError | ||
RequestTimeoutError, | ||
NotSupportedError | ||
} = require('./errors') | ||
@@ -18,4 +18,5 @@ const assert = require('assert') | ||
const kSignal = Symbol('signal') | ||
const kHandler = Symbol('handler') | ||
class Request extends AsyncResource { | ||
class Request { | ||
constructor ({ | ||
@@ -27,3 +28,3 @@ path, | ||
idempotent, | ||
opaque, | ||
upgrade, | ||
servername, | ||
@@ -35,5 +36,3 @@ signal, | ||
[kUrl]: { hostname: defaultHostname } | ||
}) { | ||
super('UNDICI_REQ') | ||
}, handler) { | ||
if (typeof path !== 'string' || path[0] !== '/') { | ||
@@ -47,2 +46,6 @@ throw new InvalidArgumentError('path must be a valid path') | ||
if (upgrade && typeof upgrade !== 'string') { | ||
throw new InvalidArgumentError('upgrade must be a string') | ||
} | ||
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { | ||
@@ -60,2 +63,4 @@ throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') | ||
this[kHandler] = handler | ||
this.method = method | ||
@@ -85,3 +90,3 @@ | ||
this.opaque = opaque | ||
this.upgrade = !!upgrade | ||
@@ -92,9 +97,12 @@ this.idempotent = idempotent == null | ||
this.contentLength = undefined | ||
this.contentLength = null | ||
{ | ||
// TODO (perf): Build directy into buffer instead of | ||
// using an intermediate string. | ||
let header = `${method} ${path} HTTP/1.1\r\n` | ||
let header = `${method} ${path} HTTP/1.1\r\nconnection: keep-alive\r\n` | ||
if (upgrade) { | ||
header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n` | ||
} else { | ||
header += 'connection: keep-alive\r\n' | ||
} | ||
@@ -114,3 +122,3 @@ if (!hostHeader) { | ||
if ( | ||
this.contentLength === undefined && | ||
this.contentLength === null && | ||
key.length === 14 && | ||
@@ -128,2 +136,22 @@ key.toLowerCase() === 'content-length' | ||
throw new InvalidArgumentError('invalid transfer-encoding header') | ||
} else if ( | ||
key.length === 10 && | ||
key.toLowerCase() === 'connection' | ||
) { | ||
throw new InvalidArgumentError('invalid connection header') | ||
} else if ( | ||
key.length === 10 && | ||
key.toLowerCase() === 'keep-alive' | ||
) { | ||
throw new InvalidArgumentError('invalid keep-alive header') | ||
} else if ( | ||
key.length === 7 && | ||
key.toLowerCase() === 'upgrade' | ||
) { | ||
throw new InvalidArgumentError('invalid upgrade header') | ||
} else if ( | ||
key.length === 6 && | ||
key.toLowerCase() === 'expect' | ||
) { | ||
throw new NotSupportedError('expect header not supported') | ||
} else { | ||
@@ -167,5 +195,22 @@ header += `${key}: ${val}\r\n` | ||
: null | ||
this[kResume] = null | ||
} | ||
onUpgrade (statusCode, headers, socket) { | ||
assert(this.upgrade || this.method === 'CONNECT') | ||
if (this.aborted) { | ||
util.destroy(socket, new RequestAbortedError()) | ||
return | ||
} | ||
reset.call(this) | ||
this[kHandler]._onUpgrade(statusCode, headers, socket) | ||
} | ||
onHeaders (statusCode, headers, resume) { | ||
assert(!this.upgrade && this.method !== 'CONNECT') | ||
if (this.aborted) { | ||
@@ -175,2 +220,4 @@ return | ||
this[kResume] = resume | ||
const { | ||
@@ -185,16 +232,18 @@ [kTimeout]: timeout | ||
this[kResume] = resume | ||
this.runInAsyncScope(this._onHeaders, this, statusCode, headers, resume) | ||
this[kHandler]._onHeaders(statusCode, headers, resume) | ||
} | ||
onBody (chunk, offset, length) { | ||
assert(!this.upgrade && this.method !== 'CONNECT') | ||
if (this.aborted) { | ||
return null | ||
return | ||
} | ||
return this.runInAsyncScope(this._onBody, this, chunk, offset, length) | ||
return this[kHandler]._onData(chunk.slice(offset, offset + length)) | ||
} | ||
onComplete (trailers) { | ||
assert(!this.upgrade && this.method !== 'CONNECT') | ||
if (this.aborted) { | ||
@@ -204,22 +253,5 @@ return | ||
const { | ||
body, | ||
[kSignal]: signal | ||
} = this | ||
reset.call(this) | ||
if (body) { | ||
this.body = null | ||
util.destroy(body) | ||
} | ||
if (signal) { | ||
this[kSignal] = null | ||
if ('removeEventListener' in signal) { | ||
signal.removeEventListener('abort', this[kAbort]) | ||
} else { | ||
signal.removeListener('abort', this[kAbort]) | ||
} | ||
} | ||
this.runInAsyncScope(this._onComplete, this, trailers) | ||
return this[kHandler]._onComplete(trailers) | ||
} | ||
@@ -233,36 +265,46 @@ | ||
const { | ||
body, | ||
[kTimeout]: timeout, | ||
[kSignal]: signal, | ||
[kResume]: resume | ||
} = this | ||
reset.call(this, err) | ||
const { [kResume]: resume } = this | ||
// TODO: resume is probably only needed | ||
// when aborted through signal or body? | ||
if (resume) { | ||
this[kResume] = null | ||
resume() | ||
} | ||
if (timeout) { | ||
this[kTimeout] = null | ||
clearTimeout(timeout) | ||
} | ||
process.nextTick((handler, err) => { | ||
handler._onError(err) | ||
}, this[kHandler], err) | ||
} | ||
} | ||
if (body) { | ||
this.body = null | ||
util.destroy(body, err) | ||
} | ||
function reset (err) { | ||
const { | ||
body, | ||
[kTimeout]: timeout, | ||
[kSignal]: signal | ||
} = this | ||
if (signal) { | ||
this[kSignal] = null | ||
if ('removeEventListener' in signal) { | ||
signal.removeEventListener('abort', this[kAbort]) | ||
} else { | ||
signal.removeListener('abort', this[kAbort]) | ||
} | ||
} | ||
if (timeout) { | ||
this[kTimeout] = null | ||
clearTimeout(timeout) | ||
} | ||
this.runInAsyncScope(this._onError, this, err) | ||
if (body) { | ||
this.body = null | ||
util.destroy(body, err) | ||
} | ||
if (signal) { | ||
this[kSignal] = null | ||
if ('removeEventListener' in signal) { | ||
signal.removeEventListener('abort', this[kAbort]) | ||
} else { | ||
signal.removeListener('abort', this[kAbort]) | ||
} | ||
} | ||
} | ||
module.exports = Request |
@@ -7,4 +7,10 @@ module.exports = { | ||
kConnect: Symbol('connect'), | ||
kResume: Symbol('resume'), | ||
kPause: Symbol('pause'), | ||
kSocketTimeout: Symbol('socket timeout'), | ||
kIdleTimeout: Symbol('idle timeout'), | ||
kMaxKeepAliveTimeout: Symbol('max keep alive timeout'), | ||
kKeepAliveTimeoutThreshold: Symbol('keep alive timeouthreshold'), | ||
kRequestTimeout: Symbol('request timeout'), | ||
kKeepAliveTimeout: Symbol('keep alive timeout'), | ||
kServerName: Symbol('server name'), | ||
@@ -20,4 +26,5 @@ kTLSOpts: Symbol('TLS Options'), | ||
kPendingIdx: Symbol('pending index'), | ||
kResume: Symbol('resume'), | ||
kError: Symbol('error'), | ||
kClient: Symbol('client'), | ||
kParser: Symbol('parser'), | ||
kOnDestroyed: Symbol('destroy callbacks'), | ||
@@ -30,4 +37,3 @@ kPipelining: Symbol('pipelinig'), | ||
kRetryTimeout: Symbol('retry timeout'), | ||
kEnqueue: Symbol('enqueue'), | ||
kMaxAbortedPayload: Symbol('max aborted payload') | ||
} |
@@ -21,3 +21,3 @@ 'use strict' | ||
? state.length | ||
: undefined | ||
: null | ||
} | ||
@@ -31,3 +31,3 @@ | ||
function isDestroyed (stream) { | ||
return !!stream && !!(stream.destroyed || stream[kDestroyed]) | ||
return !stream || !!(stream.destroyed || stream[kDestroyed]) | ||
} | ||
@@ -34,0 +34,0 @@ |
{ | ||
"name": "undici", | ||
"version": "1.2.6", | ||
"version": "1.3.0", | ||
"description": "An HTTP/1.1 client, written from scratch for Node.js", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
270
README.md
@@ -26,6 +26,7 @@ # undici | ||
``` | ||
http - keepalive x 5,521 ops/sec ±3.37% (73 runs sampled) | ||
undici - pipeline x 9,292 ops/sec ±4.28% (79 runs sampled) | ||
undici - request x 11,949 ops/sec ±0.99% (85 runs sampled) | ||
undici - stream x 12,223 ops/sec ±0.76% (85 runs sampled) | ||
http - keepalive x 5,847 ops/sec ±2.69% (276 runs sampled) | ||
undici - pipeline x 8,748 ops/sec ±2.90% (277 runs sampled) | ||
undici - request x 12,166 ops/sec ±0.80% (278 runs sampled) | ||
undici - stream x 12,969 ops/sec ±0.82% (278 runs sampled) | ||
undici - dispatch x 13,736 ops/sec ±0.60% (280 runs sampled) | ||
``` | ||
@@ -48,15 +49,29 @@ | ||
- `socketTimeout`, the timeout after which a socket will time out, in | ||
milliseconds. Monitors time between activity on a connected socket. | ||
- `socketTimeout: Number`, the timeout after which a socket with active requests | ||
will time out. Monitors time between activity on a connected socket. | ||
Use `0` to disable it entirely. Default: `30e3` milliseconds (30s). | ||
- `socketPath`, an IPC endpoint, either Unix domain socket or Windows named pipe. | ||
- `socketPath: String|Null`, an IPC endpoint, either Unix domain socket or Windows named pipe. | ||
Default: `null`. | ||
- `requestTimeout`, the timeout after which a request will time out, in | ||
milliseconds. Monitors time between request being enqueued and receiving | ||
- `idleTimeout: Number`, the timeout after which a socket without active requests | ||
will time out. Monitors time between activity on a connected socket. | ||
This value may be overriden by *keep-alive* hints from the server. | ||
Default: `4e3` milliseconds (4s). | ||
- `maxKeepAliveTimeout: Number`, the maximum allowed `idleTimeout` when overriden by | ||
*keep-alive* hints from the server. | ||
Default: `600e3` milliseconds (10min). | ||
- `keepAliveTimeoutThreshold: Number`, a number subtracted from server *keep-alive* hints | ||
when overriding `idleTimeout` to account for timing inaccuries caused by e.g. | ||
transport latency. | ||
Default: `1e3` milliseconds (1s). | ||
- `requestTimeout: Number`, the timeout after which a request will time out. | ||
Monitors time between request being enqueued and receiving | ||
a response. Use `0` to disable it entirely. | ||
Default: `30e3` milliseconds (30s). | ||
- `maxAbortedPayload`, the maximum number of bytes read after which an | ||
- `maxAbortedPayload: Number`, the maximum number of bytes read after which an | ||
aborted response will close the connection. Closing the connection | ||
@@ -66,3 +81,3 @@ will error other inflight requests in the pipeline. | ||
- `pipelining`, the amount of concurrent requests to be sent over the | ||
- `pipelining: Number`, the amount of concurrent requests to be sent over the | ||
single TCP/TLS connection according to | ||
@@ -72,10 +87,10 @@ [RFC7230](https://tools.ietf.org/html/rfc7230#section-6.3.2). | ||
- `tls`, an options object which in the case of `https` will be passed to | ||
- `tls: Object|Null`, an options object which in the case of `https` will be passed to | ||
[`tls.connect`](https://nodejs.org/api/tls.html#tls_tls_connect_options_callback). | ||
Default: `null`, | ||
Default: `null`. | ||
- `maxHeaderSize`, the maximum length of request headers in bytes. | ||
- `maxHeaderSize: Number`, the maximum length of request headers in bytes. | ||
Default: `16384` (16KiB). | ||
- `headersTimeout`, the amount of time the parser will wait to receive the complete | ||
- `headersTimeout: Number`, the amount of time the parser will wait to receive the complete | ||
HTTP headers (Node 14 and above only). | ||
@@ -85,3 +100,3 @@ Default: `30e3` milliseconds (30s). | ||
<a name='request'></a> | ||
#### `client.request(opts, callback(err, data))` | ||
#### `client.request(opts[, callback(err, data)]): Promise|Void` | ||
@@ -92,12 +107,16 @@ Performs a HTTP request. | ||
* `path` | ||
* `method` | ||
* `body`, it can be a `String`, a `Buffer`, `Uint8Array` or a `stream.Readable`. | ||
* `headers`, an object with header-value pairs. | ||
* `signal`, either an `AbortController` or an `EventEmitter`. | ||
* `requestTimeout`, the timeout after which a request will time out, in | ||
* `path: String` | ||
* `method: String` | ||
* `opaque: Any` | ||
* `body: String|Buffer|Uint8Array|stream.Readable|Null`. | ||
Default: `null`. | ||
* `headers: Object|Null`, an object with header-value pairs. | ||
Default: `null`. | ||
* `signal: AbortController|EventEmitter|Null`. | ||
Default: `null`. | ||
* `requestTimeout: Number`, the timeout after which a request will time out, in | ||
milliseconds. Monitors time between request being enqueued and receiving | ||
a response. Use `0` to disable it entirely. | ||
Default: `30e3` milliseconds (30s). | ||
* `idempotent`, whether the requests can be safely retried or not. | ||
* `idempotent: Boolean`, whether the requests can be safely retried or not. | ||
If `false` the request won't be sent until all preceeding | ||
@@ -123,10 +142,9 @@ requests in the pipeline has completed. | ||
* `statusCode` | ||
* `headers` | ||
* `body`, a `stream.Readable` with the body to read. A user **must** | ||
* `statusCode: Number` | ||
* `opaque: Any` | ||
* `headers: Object`, an object where all keys have been lowercased. | ||
* `body: stream.Readable` response payload. A user **must** | ||
either fully consume or destroy the body unless there is an error, or no further requests | ||
will be processed. | ||
`headers` is an object where all keys have been lowercased. | ||
Returns a promise if no callback is provided. | ||
@@ -221,3 +239,3 @@ | ||
<a name='stream'></a> | ||
#### `client.stream(opts, factory(data), callback(err))` | ||
#### `client.stream(opts, factory(data)[, callback(err)]): Promise|Void` | ||
@@ -235,14 +253,15 @@ A faster version of [`request`][request]. | ||
* ... same as [`client.request(opts, callback)`][request]. | ||
* `opaque`, passed as `opaque` to `factory`. Used | ||
to avoid creating a closure. | ||
* ... same as [`client.request(opts[, callback])`][request]. | ||
The `data` parameter in `factory` is defined as follow: | ||
* `statusCode` | ||
* `headers` | ||
* `opaque` | ||
* `statusCode: Number` | ||
* `headers: Object`, an object where all keys have been lowercased. | ||
* `opaque: Any` | ||
`headers` is an object where all keys have been lowercased. | ||
The `data` parameter in `callback` is defined as follow: | ||
* `opaque: Any` | ||
* `trailers: Object`, an object where all keys have been lowercased. | ||
Returns a promise if no callback is provided. | ||
@@ -293,3 +312,3 @@ | ||
<a name='pipeline'></a> | ||
#### `client.pipeline(opts, handler(data))` | ||
#### `client.pipeline(opts, handler(data)): Duplex` | ||
@@ -301,12 +320,11 @@ For easy use with [`stream.pipeline`](https://nodejs.org/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback). | ||
* ... same as [`client.request(opts, callback)`][request]. | ||
* `objectMode`, `true` if the `handler` will return an object stream. | ||
* `opaque`, passed as `opaque` to `handler`. Used | ||
to avoid creating a closure. | ||
* `objectMode: Boolean`, `true` if the `handler` will return an object stream. | ||
Default: `false` | ||
The `data` parameter in `handler` is defined as follow: | ||
* `statusCode` | ||
* `headers` | ||
* `opaque` | ||
* `body`, a `stream.Readable` with the body to read. A user **must** | ||
* `statusCode: Number` | ||
* `headers: Object`, an object where all keys have been lowercased. | ||
* `opaque: Any` | ||
* `body: stream.Readable` response payload. A user **must** | ||
either fully consume or destroy the body unless there is an error, or no further requests | ||
@@ -320,4 +338,2 @@ will be processed. | ||
`headers` is an object where all keys have been lowercased. | ||
The `handler` should validate the response and save any | ||
@@ -362,4 +378,88 @@ required state. If there is an error it should be thrown. | ||
<a name='upgrade'></a> | ||
#### `client.upgrade(opts[, callback(err, data)]): Promise|Void` | ||
Upgrade to a different protocol. | ||
Options: | ||
* `path: String` | ||
* `opaque: Any` | ||
* `method: String` | ||
Default: `GET` | ||
* `headers: Object|Null`, an object with header-value pairs. | ||
Default: `null` | ||
* `signal: AbortController|EventEmitter|Null`. | ||
Default: `null` | ||
* `requestTimeout: Number`, the timeout after which a request will time out, in | ||
milliseconds. Monitors time between request being enqueued and receiving | ||
a response. Use `0` to disable it entirely. | ||
Default: `30e3` milliseconds (30s). | ||
* `protocol: String`, a string of comma separated protocols, in descending preference order. | ||
Default: `Websocket`. | ||
The `data` parameter in `callback` is defined as follow: | ||
* `headers: Object`, an object where all keys have been lowercased. | ||
* `socket: Duplex` | ||
* `opaque` | ||
Returns a promise if no callback is provided. | ||
<a name='connect'></a> | ||
#### `client.connect(opts[, callback(err, data)]): Promise|Void` | ||
Starts two-way communications with the requested resource. | ||
Options: | ||
* `path: String` | ||
* `opaque: Any` | ||
* `headers: Object|Null`, an object with header-value pairs. | ||
Default: `null` | ||
* `signal: AbortController|EventEmitter|Null`. | ||
Default: `null` | ||
* `requestTimeout: Number`, the timeout after which a request will time out, in | ||
milliseconds. Monitors time between request being enqueued and receiving | ||
a response. Use `0` to disable it entirely. | ||
Default: `30e3` milliseconds (30s). | ||
The `data` parameter in `callback` is defined as follow: | ||
* `statusCode: Number` | ||
* `headers: Object`, an object where all keys have been lowercased. | ||
* `socket: Duplex` | ||
* `opaque: Any` | ||
Returns a promise if no callback is provided. | ||
<a name='dispatch'></a> | ||
#### `client.dispatch(opts, handler): Promise|Void` | ||
This is the low level API which all the preceeding API's are implemented on top of. | ||
Options: | ||
* ... same as [`client.request(opts[, callback])`][request]. | ||
The `handler` parameter is defined as follow: | ||
* `onUpgrade(statusCode, headers, socket): Void`, invoked when request is upgraded either due to a `Upgrade` header or `CONNECT` method. | ||
* `statusCode: Number` | ||
* `headers: Object` | ||
* `socket: Duplex` | ||
* `onHeaders(statusCode, headers, resume): Void`, invoked when statusCode and headers have been received. | ||
May be invoked multiple times due to 1xx informational headers. | ||
* `statusCode: Number` | ||
* `headers: Object` | ||
* `resume(): Void`, resume `onData` after returning `false`. | ||
* `onData(chunk): Null|Boolean`, invoked when response payload data is received. | ||
* `chunk: Buffer` | ||
* `onComplete(trailers): Void`, invoked when response payload and trailers have been received and the request has completed. | ||
* `trailers: Object` | ||
* `onError(err): Void`, invoked when an error has occured. | ||
* `err: Error` | ||
<a name='close'></a> | ||
#### `client.close([callback])` | ||
#### `client.close([callback]): Promise|Void` | ||
@@ -372,3 +472,3 @@ Closes the client and gracefully waits fo enqueued requests to | ||
<a name='destroy'></a> | ||
#### `client.destroy([err][, callback])` | ||
#### `client.destroy([err][, callback]): Promise|Void` | ||
@@ -382,19 +482,19 @@ Destroy the client abruptly with the given `err`. All the pending and running | ||
#### `client.pipelining` | ||
#### `client.pipelining: Number` | ||
Property to get and set the pipelining factor. | ||
#### `client.pending` | ||
#### `client.pending: Number` | ||
Number of queued requests. | ||
#### `client.running` | ||
#### `client.running: Number` | ||
Number of inflight requests. | ||
#### `client.size` | ||
#### `client.size: Number` | ||
Number of pending and running requests. | ||
#### `client.connected` | ||
#### `client.connected: Boolean` | ||
@@ -405,3 +505,3 @@ True if the client has an active connection. The client will lazily | ||
#### `client.busy` | ||
#### `client.busy: Boolean` | ||
@@ -411,7 +511,7 @@ True if pipeline is saturated or blocked. Indicicates whether dispatching | ||
#### `client.closed` | ||
#### `client.closed: Boolean` | ||
True after `client.close()` has been called. | ||
#### `client.destroyed` | ||
#### `client.destroyed: Boolean` | ||
@@ -442,19 +542,31 @@ True after `client.destroyed()` has been called or `client.close()` has been | ||
#### `pool.request(opts, callback)` | ||
#### `pool.request(opts[, callback]): Promise|Void` | ||
Calls [`client.request(opts, callback)`][request] on one of the clients. | ||
#### `pool.stream(opts, factory, callback)` | ||
#### `pool.stream(opts, factory[, callback]): Promise|Void` | ||
Calls [`client.stream(opts, factory, callback)`][stream] on one of the clients. | ||
#### `pool.pipeline(opts, handler)` | ||
#### `pool.pipeline(opts, handler): Duplex` | ||
Calls [`client.pipeline(opts, handler)`][pipeline] on one of the clients. | ||
#### `pool.close([callback])` | ||
#### `pool.upgrade(opts[, callback]): Promise|Void` | ||
Calls [`client.upgrade(opts, callback)`][upgrade] on one of the clients. | ||
#### `pool.connect(opts[, callback]): Promise|Void` | ||
Calls [`client.connect(opts, callback)`][connect] on one of the clients. | ||
#### `pool.dispatch(opts, handler): Void` | ||
Calls [`client.dispatch(opts, handler)`][dispatch] on one of the clients. | ||
#### `pool.close([callback]): Promise|Void` | ||
Calls [`client.close(callback)`](#close) on all the clients. | ||
#### `pool.destroy([err][, callback])` | ||
#### `pool.destroy([err][, callback]): Promise|Void` | ||
@@ -492,7 +604,2 @@ Calls [`client.destroy(err, callback)`](#destroy) on all the clients. | ||
### Informational Responses | ||
Undici does not support 1xx informational responses and will either | ||
ignore or error them. | ||
#### Expect | ||
@@ -506,30 +613,2 @@ | ||
#### Switching Protocols | ||
Undici does not support the the `Upgrade` request header field. A | ||
`101 Switching Protocols` response will cause an `UND_ERR_NOT_SUPPORTED` error. | ||
Refs: https://tools.ietf.org/html/rfc7230#section-6.7 | ||
#### Hints | ||
Undici does not support early hints. A `103 Early Hint` response will | ||
be ignored. | ||
Refs: https://tools.ietf.org/html/rfc8297 | ||
### Trailer | ||
Undici does not support the the `Trailer` response header field. Any response | ||
trailer headers will be ignored. | ||
Refs: https://tools.ietf.org/html/rfc7230#section-4.4 | ||
### CONNECT | ||
Undici doea not support the http `CONNECT` method. Dispatching a `CONNECT` | ||
request will cause an `UND_ERR_NOT_SUPPORTED` error. | ||
Refs: https://tools.ietf.org/html/rfc7231#section-4.3.6 | ||
### Pipelining | ||
@@ -564,1 +643,4 @@ | ||
[pipeline]: #pipeline | ||
[upgrade]: #upgrade | ||
[connect]: #connect | ||
[dispatch]: #dispatch |
@@ -9,3 +9,2 @@ 'use strict' | ||
const { PassThrough } = require('stream') | ||
const { AsyncResource } = require('async_hooks') | ||
@@ -72,3 +71,3 @@ const transactions = new Map() | ||
body.once('data', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
t.pass() | ||
body.resume() | ||
@@ -78,3 +77,3 @@ }) | ||
body.on('end', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
t.pass() | ||
}) | ||
@@ -96,3 +95,3 @@ }) | ||
body.once('data', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
t.pass() | ||
body.resume() | ||
@@ -102,3 +101,3 @@ }) | ||
body.on('end', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
t.pass() | ||
}) | ||
@@ -120,3 +119,3 @@ }) | ||
body.once('data', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
t.pass() | ||
body.resume() | ||
@@ -126,3 +125,3 @@ }) | ||
body.on('end', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
t.pass() | ||
}) | ||
@@ -189,41 +188,3 @@ }) | ||
test('async hooks error and close', (t) => { | ||
t.plan(6) | ||
const server = createServer((req, res) => { | ||
res.write('asd') | ||
setImmediate(() => { | ||
res.destroy() | ||
}) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, { body }) => { | ||
t.error(err) | ||
body.resume() | ||
body.on('error', (err) => { | ||
t.ok(err) | ||
}) | ||
t.strictDeepEqual(getCurrentTransaction(), null) | ||
setCurrentTransaction({ hello: 'world2' }) | ||
client.request({ path: '/', method: 'GET' }, (err, data) => { | ||
t.error(err) | ||
data.body.on('error', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
}) | ||
data.body.on('close', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('async hooks pipeline close', (t) => { | ||
test('async hooks pipeline handler', (t) => { | ||
t.plan(2) | ||
@@ -242,20 +203,13 @@ | ||
const ret = client | ||
client | ||
.pipeline({ path: '/', method: 'GET' }, ({ body }) => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
return body | ||
}) | ||
.on('close', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
t.pass() | ||
}) | ||
.on('error', (err) => { | ||
t.ok(err) | ||
}) | ||
.resume() | ||
.end() | ||
new AsyncResource('tmp') | ||
.runInAsyncScope(() => { | ||
setCurrentTransaction({ hello: 'world1' }) | ||
ret.destroy() | ||
}) | ||
}) | ||
}) |
@@ -36,5 +36,5 @@ 'use strict' | ||
client.request({ path: '/', method: 'GET', idempotent: false }, (err, data) => { | ||
client.request({ path: '/', method: 'GET', idempotent: false, opaque: 'asd' }, (err, data) => { | ||
t.ok(err instanceof Error) // we are expecting an error | ||
t.strictEqual(null, data) | ||
t.strictEqual(data.opaque, 'asd') | ||
}) | ||
@@ -90,5 +90,5 @@ | ||
for (let i = 0; i < 3; i++) { | ||
client.request({ path: '/', method: 'GET', idempotent: false }, (err, data) => { | ||
client.request({ path: '/', method: 'GET', idempotent: false, opaque: 'asd' }, (err, data) => { | ||
t.ok(err instanceof Error) // we are expecting an error | ||
t.strictEqual(null, data) | ||
t.strictEqual(data.opaque, 'asd') | ||
}) | ||
@@ -153,2 +153,3 @@ } | ||
}, | ||
opaque: 'asd', | ||
body: new Readable({ | ||
@@ -162,3 +163,3 @@ read () { | ||
t.strictEqual(err.message, 'kaboom') | ||
t.strictEqual(data, null) | ||
t.strictEqual(data.opaque, 'asd') | ||
}) | ||
@@ -218,2 +219,3 @@ | ||
method: 'POST', | ||
opaque: 'asd', | ||
body: new Readable({ | ||
@@ -227,3 +229,3 @@ read () { | ||
t.strictEqual(err.message, 'kaboom') | ||
t.strictEqual(data, null) | ||
t.strictEqual(data.opaque, 'asd') | ||
}) | ||
@@ -719,7 +721,8 @@ | ||
path: '/', | ||
method: 'GET' | ||
method: 'GET', | ||
opaque: 'asd' | ||
}, (err, data) => { | ||
requestErrored = true | ||
t.ok(err) | ||
t.strictEqual(data, null) | ||
t.strictEqual(data.opaque, 'asd') | ||
}) | ||
@@ -726,0 +729,0 @@ }) |
@@ -5,2 +5,3 @@ 'use strict' | ||
const { Client, errors } = require('..') | ||
const EE = require('events') | ||
const { createServer } = require('http') | ||
@@ -529,3 +530,3 @@ const { | ||
} else { | ||
t.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE') | ||
t.strictEqual(err.code, 'UND_ERR_ABORTED') | ||
} | ||
@@ -863,3 +864,3 @@ }) | ||
}).on('error', (err) => { | ||
t.ok(err instanceof errors.NotSupportedError) | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
@@ -899,1 +900,98 @@ client.on('disconnect', () => { | ||
}) | ||
test('pipeline ignore 1xx', (t) => { | ||
t.plan(1) | ||
const server = createServer((req, res) => { | ||
res.writeProcessing() | ||
res.end('hello') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
let buf = '' | ||
client.pipeline({ | ||
path: '/', | ||
method: 'GET' | ||
}, ({ body }) => body) | ||
.on('data', (chunk) => { | ||
buf += chunk | ||
}) | ||
.on('end', () => { | ||
t.strictEqual(buf, 'hello') | ||
}) | ||
.end() | ||
}) | ||
}) | ||
test('pipeline backpressure', (t) => { | ||
t.plan(1) | ||
const expected = Buffer.alloc(1e6).toString() | ||
const server = createServer((req, res) => { | ||
res.writeProcessing() | ||
res.end(expected) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
let buf = '' | ||
client.pipeline({ | ||
path: '/', | ||
method: 'GET' | ||
}, ({ body }) => body) | ||
.end() | ||
.pipe(new Transform({ | ||
highWaterMark: 1, | ||
transform (chunk, encoding, callback) { | ||
setImmediate(() => { | ||
callback(null, chunk) | ||
}) | ||
} | ||
})) | ||
.on('data', chunk => { | ||
buf += chunk | ||
}) | ||
.on('end', () => { | ||
t.strictEqual(buf, expected) | ||
}) | ||
}) | ||
}) | ||
test('pipeline abort after headers', (t) => { | ||
t.plan(1) | ||
const server = createServer((req, res) => { | ||
res.writeProcessing() | ||
res.write('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
const signal = new EE() | ||
client.pipeline({ | ||
path: '/', | ||
method: 'GET', | ||
signal | ||
}, ({ body }) => { | ||
process.nextTick(() => { | ||
signal.emit('abort') | ||
}) | ||
return body | ||
}) | ||
.end() | ||
.on('error', (err) => { | ||
t.ok(err instanceof errors.RequestAbortedError) | ||
}) | ||
}) | ||
}) |
@@ -261,2 +261,3 @@ 'use strict' | ||
method: 'POST', | ||
opaque: 'asd', | ||
body: new Readable({ | ||
@@ -273,3 +274,3 @@ read () { | ||
t.ok(err) | ||
t.strictEqual(data, null) | ||
t.strictEqual(data.opaque, 'asd') | ||
}) | ||
@@ -276,0 +277,0 @@ client.close((err) => { |
@@ -29,8 +29,10 @@ 'use strict' | ||
client.on('disconnect', () => { | ||
if (n++ === 1) { | ||
if (++n === 1) { | ||
t.pass() | ||
server.listen(5555) | ||
} | ||
clock.tick(1000) | ||
process.nextTick(() => { | ||
clock.tick(1000) | ||
}) | ||
}) | ||
}) |
@@ -6,3 +6,3 @@ 'use strict' | ||
const { createServer } = require('http') | ||
const { PassThrough } = require('stream') | ||
const { PassThrough, Writable } = require('stream') | ||
const EE = require('events') | ||
@@ -47,4 +47,4 @@ | ||
test('stream get skip body', (t) => { | ||
t.plan(12) | ||
test('stream promise get', (t) => { | ||
t.plan(6) | ||
@@ -60,29 +60,22 @@ const server = createServer((req, res) => { | ||
server.listen(0, () => { | ||
server.listen(0, async () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
client.stream({ | ||
await client.stream({ | ||
path: '/', | ||
method: 'GET' | ||
}, ({ statusCode, headers }) => { | ||
method: 'GET', | ||
opaque: new PassThrough() | ||
}, ({ statusCode, headers, opaque: pt }) => { | ||
t.strictEqual(statusCode, 200) | ||
t.strictEqual(headers['content-type'], 'text/plain') | ||
// Don't return writable. Skip the body. | ||
}, (err) => { | ||
t.error(err) | ||
const bufs = [] | ||
pt.on('data', (buf) => { | ||
bufs.push(buf) | ||
}) | ||
pt.on('end', () => { | ||
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) | ||
}) | ||
return pt | ||
}) | ||
client.stream({ | ||
path: '/', | ||
method: 'GET' | ||
}, ({ statusCode, headers }) => { | ||
t.strictEqual(statusCode, 200) | ||
t.strictEqual(headers['content-type'], 'text/plain') | ||
// Don't return writable. Skip the body. | ||
}).then(() => { | ||
t.pass() | ||
}) | ||
}) | ||
@@ -205,3 +198,5 @@ }) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
maxAbortedPayload: 1e5 | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
@@ -212,4 +207,3 @@ | ||
path: '/', | ||
method: 'GET', | ||
maxAbortedPayload: 1e5 | ||
method: 'GET' | ||
}, () => { | ||
@@ -227,5 +221,8 @@ pt.on('data', () => { | ||
client.on('disconnect', (err) => { | ||
client.once('disconnect', (err) => { | ||
t.ok(err) | ||
t.pass() | ||
client.on('disconnect', () => { | ||
t.fail() | ||
}) | ||
}) | ||
@@ -490,3 +487,3 @@ | ||
}, (err) => { | ||
t.ok(err instanceof errors.NotSupportedError) | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
@@ -528,1 +525,86 @@ client.on('disconnect', () => { | ||
}) | ||
test('trailers', (t) => { | ||
t.plan(2) | ||
const server = createServer((req, res) => { | ||
res.writeHead(200, { Trailer: 'Content-MD5' }) | ||
res.addTrailers({ 'Content-MD5': 'test' }) | ||
res.end() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
client.stream({ | ||
path: '/', | ||
method: 'GET' | ||
}, () => new PassThrough(), (err, data) => { | ||
t.strictDeepEqual({ 'content-md5': 'test' }, data.trailers) | ||
t.error(err) | ||
}) | ||
}) | ||
}) | ||
test('stream ignore 1xx', (t) => { | ||
t.plan(2) | ||
const server = createServer((req, res) => { | ||
res.writeProcessing() | ||
res.end('hello') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
let buf = '' | ||
client.stream({ | ||
path: '/', | ||
method: 'GET' | ||
}, () => new Writable({ | ||
write (chunk, encoding, callback) { | ||
buf += chunk | ||
callback() | ||
} | ||
}), (err, data) => { | ||
t.error(err) | ||
t.strictEqual(buf, 'hello') | ||
}) | ||
}) | ||
}) | ||
test('stream backpressure', (t) => { | ||
t.plan(2) | ||
const expected = Buffer.alloc(1e6).toString() | ||
const server = createServer((req, res) => { | ||
res.writeProcessing() | ||
res.end(expected) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
let buf = '' | ||
client.stream({ | ||
path: '/', | ||
method: 'GET' | ||
}, () => new Writable({ | ||
highWaterMark: 1, | ||
write (chunk, encoding, callback) { | ||
buf += chunk | ||
process.nextTick(callback) | ||
} | ||
}), (err, data) => { | ||
t.error(err) | ||
t.strictEqual(buf, expected) | ||
}) | ||
}) | ||
}) |
'use strict' | ||
const { test } = require('tap') | ||
const { Client, errors } = require('..') | ||
const { Client } = require('..') | ||
const { createServer } = require('http') | ||
@@ -12,2 +12,3 @@ const net = require('net') | ||
const server = createServer((req, res) => { | ||
res.writeProcessing() | ||
req.pipe(res) | ||
@@ -23,5 +24,2 @@ }) | ||
method: 'POST', | ||
headers: { | ||
Expect: '100-continue' | ||
}, | ||
body: 'hello' | ||
@@ -41,32 +39,2 @@ }, (err, response) => { | ||
test('error 101', (t) => { | ||
t.plan(2) | ||
const server = net.createServer((socket) => { | ||
socket.write('HTTP/1.1 101 Switching Protocols\r\n') | ||
socket.write('Upgrade: TLS/1.0, HTTP/1.1\r\n') | ||
socket.write('Connection: Upgrade\r\n') | ||
socket.write('\r\n') | ||
}) | ||
t.teardown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.teardown(client.destroy.bind(client)) | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
headers: { | ||
Connection: 'upgrade', | ||
Upgrade: 'example/1, foo/2' | ||
} | ||
}, (err) => { | ||
t.ok(err instanceof errors.NotSupportedError) | ||
}) | ||
client.on('disconnect', () => { | ||
t.pass() | ||
}) | ||
}) | ||
}) | ||
test('error 103 body', (t) => { | ||
@@ -73,0 +41,0 @@ t.plan(2) |
@@ -7,3 +7,3 @@ 'use strict' | ||
test('invalid headers', (t) => { | ||
t.plan(4) | ||
t.plan(9) | ||
@@ -36,2 +36,22 @@ const client = new Client('http://localhost:3000') | ||
headers: { | ||
connection: 'close' | ||
} | ||
}, (err, data) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
headers: { | ||
'keep-alive': 'timeout=5' | ||
} | ||
}, (err, data) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
headers: { | ||
foo: {} | ||
@@ -52,2 +72,32 @@ } | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
headers: { | ||
expect: '100-continue' | ||
} | ||
}, (err, data) => { | ||
t.ok(err instanceof errors.NotSupportedError) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
headers: { | ||
Expect: '100-continue' | ||
} | ||
}, (err, data) => { | ||
t.ok(err instanceof errors.NotSupportedError) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
headers: { | ||
expect: 'asd' | ||
} | ||
}, (err, data) => { | ||
t.ok(err instanceof errors.NotSupportedError) | ||
}) | ||
}) |
@@ -9,3 +9,3 @@ 'use strict' | ||
test('pipeline pipelining', (t) => { | ||
t.plan(6) | ||
t.plan(10) | ||
@@ -35,2 +35,5 @@ const server = createServer((req, res) => { | ||
t.strictEqual(client.busy, false) | ||
t.strictDeepEqual(client.running, 0) | ||
t.strictDeepEqual(client.pending, 1) | ||
client.pipeline({ | ||
@@ -41,3 +44,7 @@ method: 'GET', | ||
t.strictEqual(client.busy, true) | ||
t.strictEqual(client.running, 2) | ||
t.strictDeepEqual(client.running, 0) | ||
t.strictDeepEqual(client.pending, 2) | ||
process.nextTick(() => { | ||
t.strictEqual(client.running, 2) | ||
}) | ||
}) | ||
@@ -48,3 +55,3 @@ }) | ||
test('pipeline pipelining retry', (t) => { | ||
t.plan(6) | ||
t.plan(13) | ||
@@ -78,9 +85,9 @@ let count = 0 | ||
path: '/' | ||
}, ({ body }) => body) | ||
}, ({ body }) => body).end().resume() | ||
.on('error', (err) => { | ||
t.ok(err) | ||
}) | ||
.end() | ||
.resume() | ||
t.strictDeepEqual(client.running, 1) | ||
t.strictEqual(client.busy, false) | ||
t.strictDeepEqual(client.running, 0) | ||
t.strictDeepEqual(client.pending, 1) | ||
@@ -91,3 +98,5 @@ client.pipeline({ | ||
}, ({ body }) => body).end().resume() | ||
t.strictDeepEqual(client.running, 2) | ||
t.strictEqual(client.busy, false) | ||
t.strictDeepEqual(client.running, 0) | ||
t.strictDeepEqual(client.pending, 2) | ||
@@ -98,4 +107,10 @@ client.pipeline({ | ||
}, ({ body }) => body).end().resume() | ||
t.strictDeepEqual(client.running, 3) | ||
t.strictEqual(client.busy, true) | ||
t.strictDeepEqual(client.running, 0) | ||
t.strictDeepEqual(client.pending, 3) | ||
process.nextTick(() => { | ||
t.strictEqual(client.running, 3) | ||
}) | ||
client.close(() => { | ||
@@ -102,0 +117,0 @@ t.pass() |
@@ -10,2 +10,3 @@ 'use strict' | ||
const { promisify } = require('util') | ||
const { PassThrough } = require('stream') | ||
const eos = require('stream').finished | ||
@@ -118,2 +119,3 @@ | ||
t.strictEqual(headers['content-type'], 'text/plain') | ||
return new PassThrough() | ||
}) | ||
@@ -120,0 +122,0 @@ }) |
@@ -32,5 +32,5 @@ 'use strict' | ||
client.request({ path: '/', method: 'GET' }, (err, data) => { | ||
client.request({ path: '/', method: 'GET', opaque: 'asd' }, (err, data) => { | ||
t.ok(err instanceof errors.SocketTimeoutError) // we are expecting an error | ||
t.strictEqual(null, data) | ||
t.strictEqual(data.opaque, 'asd') | ||
}) | ||
@@ -37,0 +37,0 @@ |
113
test/unix.js
'use strict' | ||
const { test } = require('tap') | ||
const { Client } = require('..') | ||
const { Client, Pool } = require('..') | ||
const http = require('http') | ||
// const https = require('http') | ||
// const pem = require('https-pem') | ||
const https = require('https') | ||
const pem = require('https-pem') | ||
@@ -47,40 +47,77 @@ if (process.platform !== 'win32') { | ||
// test('https get with tls opts', (t) => { | ||
// t.plan(6) | ||
test('http unix get pool', (t) => { | ||
t.plan(7) | ||
// const server = https.createServer(pem, (req, res) => { | ||
// t.strictEqual('/', req.url) | ||
// t.strictEqual('GET', req.method) | ||
// res.setHeader('content-type', 'text/plain') | ||
// res.end('hello') | ||
// }) | ||
// t.tearDown(server.close.bind(server)) | ||
const server = http.createServer((req, res) => { | ||
t.strictEqual('/', req.url) | ||
t.strictEqual('GET', req.method) | ||
t.strictEqual('localhost', req.headers.host) | ||
res.setHeader('Content-Type', 'text/plain') | ||
res.end('hello') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
// server.listen('/var/tmp/test8.sock', () => { | ||
// const client = new Client({ | ||
// hostname: 'localhost', | ||
// protocol: 'https:' | ||
// }, { | ||
// socketPath: '/var/tmp/test8.sock', | ||
// tls: { | ||
// rejectUnauthorized: false | ||
// } | ||
// }) | ||
// t.tearDown(client.close.bind(client)) | ||
server.listen('/var/tmp/test3.sock', () => { | ||
const client = new Pool({ | ||
hostname: 'localhost', | ||
protocol: 'http:' | ||
}, { | ||
socketPath: '/var/tmp/test3.sock' | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
// client.request({ path: '/', method: 'GET' }, (err, data) => { | ||
// t.error(err) | ||
// const { statusCode, headers, body } = data | ||
// t.strictEqual(statusCode, 200) | ||
// t.strictEqual(headers['content-type'], 'text/plain') | ||
// const bufs = [] | ||
// body.on('data', (buf) => { | ||
// bufs.push(buf) | ||
// }) | ||
// body.on('end', () => { | ||
// t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) | ||
// }) | ||
// }) | ||
// }) | ||
// }) | ||
client.request({ path: '/', method: 'GET' }, (err, data) => { | ||
t.error(err) | ||
const { statusCode, headers, body } = data | ||
t.strictEqual(statusCode, 200) | ||
t.strictEqual(headers['content-type'], 'text/plain') | ||
const bufs = [] | ||
body.on('data', (buf) => { | ||
bufs.push(buf) | ||
}) | ||
body.on('end', () => { | ||
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('https get with tls opts', (t) => { | ||
t.plan(6) | ||
const server = https.createServer(pem, (req, res) => { | ||
t.strictEqual('/', req.url) | ||
t.strictEqual('GET', req.method) | ||
res.setHeader('content-type', 'text/plain') | ||
res.end('hello') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen('/var/tmp/test8.sock', () => { | ||
const client = new Client({ | ||
hostname: 'localhost', | ||
protocol: 'https:' | ||
}, { | ||
socketPath: '/var/tmp/test8.sock', | ||
tls: { | ||
rejectUnauthorized: false | ||
} | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, data) => { | ||
t.error(err) | ||
const { statusCode, headers, body } = data | ||
t.strictEqual(statusCode, 200) | ||
t.strictEqual(headers['content-type'], 'text/plain') | ||
const bufs = [] | ||
body.on('data', (buf) => { | ||
bufs.push(buf) | ||
}) | ||
body.on('end', () => { | ||
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) | ||
}) | ||
}) | ||
}) | ||
}) | ||
} |
280478
52
9100
626
9
42