Comparing version 1.3.1 to 2.0.0
@@ -6,3 +6,2 @@ 'use strict' | ||
const undici = require('..') | ||
const { kGetNext } = require('../lib/symbols') | ||
@@ -33,3 +32,4 @@ // # Start the h2o server (in h2o repository) | ||
path: '/', | ||
method: 'GET' | ||
method: 'GET', | ||
requestTimeout: 0 | ||
} | ||
@@ -39,4 +39,3 @@ | ||
connections: 100, | ||
pipelining: 10, | ||
requestTimeout: 0 | ||
pipelining: 10 | ||
}) | ||
@@ -137,4 +136,3 @@ | ||
}) | ||
const client = pool[kGetNext]() | ||
client.dispatch(undiciOptions, new SimpleRequest(stream)) | ||
pool.dispatch(undiciOptions, new SimpleRequest(stream)) | ||
} | ||
@@ -145,4 +143,3 @@ }) | ||
fn: deferred => { | ||
const client = pool[kGetNext]() | ||
client.dispatch(undiciOptions, new NoopRequest(deferred)) | ||
pool.dispatch(undiciOptions, new NoopRequest(deferred)) | ||
} | ||
@@ -160,6 +157,10 @@ }) | ||
onHeaders () { | ||
onConnect (abort) { | ||
} | ||
onHeaders (statusCode, headers, resume) { | ||
} | ||
onData (chunk) { | ||
@@ -179,2 +180,5 @@ return true | ||
onConnect (abort) { | ||
} | ||
onHeaders (statusCode, headers, resume) { | ||
@@ -181,0 +185,0 @@ this.dst.on('drain', resume) |
16
index.js
'use strict' | ||
const Client = require('./lib/client') | ||
const Client = require('./lib/core/client') | ||
const errors = require('./lib/core/errors') | ||
const Pool = require('./lib/pool') | ||
const errors = require('./lib/errors') | ||
Client.prototype.request = require('./lib/client-request') | ||
Client.prototype.stream = require('./lib/client-stream') | ||
Client.prototype.pipeline = require('./lib/client-pipeline') | ||
Client.prototype.upgrade = require('./lib/client-upgrade') | ||
Client.prototype.connect = require('./lib/client-connect') | ||
Pool.prototype.request = require('./lib/client-request') | ||
Pool.prototype.stream = require('./lib/client-stream') | ||
Pool.prototype.pipeline = require('./lib/client-pipeline') | ||
Pool.prototype.upgrade = require('./lib/client-upgrade') | ||
Pool.prototype.connect = require('./lib/client-connect') | ||
function undici (url, opts) { | ||
@@ -8,0 +20,0 @@ return new Pool(url, opts) |
'use strict' | ||
const { | ||
InvalidArgumentError | ||
} = require('./errors') | ||
InvalidArgumentError, | ||
RequestAbortedError | ||
} = require('./core/errors') | ||
const { AsyncResource } = require('async_hooks') | ||
const util = require('./core/util') | ||
class ConnectRequest { | ||
class ConnectHandler extends AsyncResource { | ||
constructor (opts, callback) { | ||
this.opaque = opts.opaque || null | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
const { signal, opaque } = opts | ||
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { | ||
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') | ||
} | ||
super('UNDICI_CONNECT') | ||
this.opaque = opaque || null | ||
this.callback = callback | ||
this.abort = null | ||
if (signal) { | ||
util.addListener(signal, () => { | ||
if (this.abort) { | ||
this.abort() | ||
} else { | ||
this.onError(new RequestAbortedError()) | ||
} | ||
}) | ||
} | ||
} | ||
onConnect (abort) { | ||
if (!this.callback) { | ||
abort() | ||
} else { | ||
this.abort = abort | ||
} | ||
} | ||
onUpgrade (statusCode, headers, socket) { | ||
@@ -17,5 +51,5 @@ const { callback, opaque } = this | ||
this.callback = null | ||
callback(null, { | ||
this.runInAsyncScope(callback, null, null, { | ||
statusCode, | ||
headers, | ||
headers: util.parseHeaders(headers), | ||
socket, | ||
@@ -29,11 +63,15 @@ opaque | ||
this.callback = null | ||
callback(err, { opaque }) | ||
if (callback) { | ||
this.callback = null | ||
process.nextTick((self, callback, err, opaque) => { | ||
self.runInAsyncScope(callback, null, err, { opaque }) | ||
}, this, callback, err, opaque) | ||
} | ||
} | ||
} | ||
module.exports = function connect (client, opts, callback) { | ||
function connect (opts, callback) { | ||
if (callback === undefined) { | ||
return new Promise((resolve, reject) => { | ||
connect(client, opts, (err, data) => { | ||
connect.call(this, opts, (err, data) => { | ||
return err ? reject(err) : resolve(data) | ||
@@ -49,6 +87,3 @@ }) | ||
try { | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
const connectHandler = new ConnectHandler(opts, callback) | ||
const { | ||
@@ -61,3 +96,3 @@ path, | ||
} = opts | ||
client.dispatch({ | ||
this.dispatch({ | ||
path, | ||
@@ -69,6 +104,8 @@ method: 'CONNECT', | ||
requestTimeout | ||
}, new ConnectRequest(opts, callback)) | ||
}, connectHandler) | ||
} catch (err) { | ||
process.nextTick(callback, err, null) | ||
process.nextTick(callback, err, { opaque: opts && opts.opaque }) | ||
} | ||
} | ||
module.exports = connect |
@@ -12,5 +12,6 @@ 'use strict' | ||
RequestAbortedError | ||
} = require('./errors') | ||
const util = require('./util') | ||
} = require('./core/errors') | ||
const util = require('./core/util') | ||
const { AsyncResource } = require('async_hooks') | ||
const { assert } = require('console') | ||
@@ -38,6 +39,3 @@ const kResume = Symbol('resume') | ||
if (!err && !this._readableState.endEmitted) { | ||
// This can happen if the server doesn't care | ||
// about the entire request body. | ||
} | ||
assert(err || this._readableState.endEmitted) | ||
@@ -50,8 +48,11 @@ callback(err) | ||
constructor (resume) { | ||
super({ autoDestroy: true, read: resume }) | ||
super({ autoDestroy: true }) | ||
this[kResume] = resume | ||
} | ||
_read () { | ||
this[kResume]() | ||
} | ||
_destroy (err, callback) { | ||
this._read() | ||
if (!err && !this._readableState.endEmitted) { | ||
@@ -67,9 +68,34 @@ err = new RequestAbortedError() | ||
constructor (opts, handler) { | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
if (typeof handler !== 'function') { | ||
throw new InvalidArgumentError('invalid handler') | ||
} | ||
const { signal, method, opaque } = opts | ||
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { | ||
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') | ||
} | ||
if (method === 'CONNECT') { | ||
throw new InvalidArgumentError('invalid method') | ||
} | ||
super('UNDICI_PIPELINE') | ||
this.opaque = opts.opaque || null | ||
this.opaque = opaque || null | ||
this.handler = handler | ||
this.abort = null | ||
this.req = new PipelineRequest() | ||
if (signal) { | ||
util.addListener(signal, () => { | ||
util.destroy(this.ret, new RequestAbortedError()) | ||
}) | ||
} | ||
this.req = new PipelineRequest().on('error', util.nop) | ||
this.ret = new Duplex({ | ||
@@ -95,3 +121,3 @@ readableObjectMode: opts.objectMode, | ||
destroy: (err, callback) => { | ||
const { body, req, res, ret } = this | ||
const { body, req, res, ret, abort } = this | ||
@@ -102,2 +128,6 @@ if (!err && !ret._readableState.endEmitted) { | ||
if (abort && err) { | ||
abort() | ||
} | ||
util.destroy(body, err) | ||
@@ -119,4 +149,14 @@ util.destroy(req, err) | ||
onConnect (abort) { | ||
const { ret } = this | ||
if (ret.destroyed) { | ||
abort() | ||
} else { | ||
this.abort = abort | ||
} | ||
} | ||
onHeaders (statusCode, headers, resume) { | ||
const { opaque, handler, ret } = this | ||
const { opaque, handler } = this | ||
@@ -134,3 +174,3 @@ if (statusCode < 200) { | ||
statusCode, | ||
headers, | ||
headers: util.parseHeaders(headers), | ||
opaque, | ||
@@ -141,12 +181,7 @@ body: this.res | ||
this.res.on('error', util.nop) | ||
util.destroy(ret, err) | ||
return | ||
throw err | ||
} | ||
if ( | ||
!body || | ||
typeof body.on !== 'function' | ||
) { | ||
util.destroy(ret, new InvalidReturnValueError('expected Readable')) | ||
return | ||
if (!body || typeof body.on !== 'function') { | ||
throw new InvalidReturnValueError('expected Readable') | ||
} | ||
@@ -185,7 +220,2 @@ | ||
const { res } = this | ||
if (res._readableState.destroyed) { | ||
return | ||
} | ||
return res.push(chunk) | ||
@@ -196,7 +226,2 @@ } | ||
const { res } = this | ||
if (res._readableState.destroyed) { | ||
return | ||
} | ||
res.push(null) | ||
@@ -207,5 +232,3 @@ } | ||
const { ret } = this | ||
this.handler = null | ||
util.destroy(ret, err) | ||
@@ -215,18 +238,5 @@ } | ||
module.exports = function (client, opts, handler) { | ||
function pipeline (opts, handler) { | ||
try { | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
if (typeof handler !== 'function') { | ||
throw new InvalidArgumentError('invalid handler') | ||
} | ||
if (opts.method === 'CONNECT') { | ||
throw new InvalidArgumentError('invalid method') | ||
} | ||
const pipeline = new PipelineHandler(opts, handler) | ||
const pipelineHandler = new PipelineHandler(opts, handler) | ||
const { | ||
@@ -241,7 +251,6 @@ path, | ||
} = opts | ||
client.dispatch({ | ||
this.dispatch({ | ||
path, | ||
method, | ||
body: pipeline.req, | ||
body: pipelineHandler.req, | ||
headers, | ||
@@ -252,5 +261,4 @@ idempotent, | ||
requestTimeout | ||
}, pipeline) | ||
return pipeline.ret | ||
}, pipelineHandler) | ||
return pipelineHandler.ret | ||
} catch (err) { | ||
@@ -260,1 +268,3 @@ return new PassThrough().destroy(err) | ||
} | ||
module.exports = pipeline |
@@ -7,14 +7,15 @@ 'use strict' | ||
RequestAbortedError | ||
} = require('./errors') | ||
const util = require('./util') | ||
} = require('./core/errors') | ||
const util = require('./core/util') | ||
const { AsyncResource } = require('async_hooks') | ||
const kAbort = Symbol('abort') | ||
class RequestResponse extends Readable { | ||
constructor (resume) { | ||
constructor (resume, abort) { | ||
super({ autoDestroy: true, read: resume }) | ||
this[kAbort] = abort | ||
} | ||
_destroy (err, callback) { | ||
this._read() | ||
if (!err && !this._readableState.endEmitted) { | ||
@@ -24,2 +25,6 @@ err = new RequestAbortedError() | ||
if (err) { | ||
this[kAbort]() | ||
} | ||
callback(err) | ||
@@ -31,11 +36,62 @@ } | ||
constructor (opts, callback) { | ||
super('UNDICI_REQUEST') | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
this.opaque = opts.opaque || null | ||
const { signal, method, opaque, body } = opts | ||
try { | ||
if (typeof callback !== 'function') { | ||
throw new InvalidArgumentError('invalid callback') | ||
} | ||
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { | ||
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') | ||
} | ||
if (method === 'CONNECT') { | ||
throw new InvalidArgumentError('invalid method') | ||
} | ||
super('UNDICI_REQUEST') | ||
} catch (err) { | ||
if (util.isStream(body)) { | ||
util.destroy(body.on('error', util.nop), err) | ||
} | ||
throw err | ||
} | ||
this.opaque = opaque || null | ||
this.callback = callback | ||
this.res = null | ||
this.abort = null | ||
this.body = body | ||
if (util.isStream(body)) { | ||
body.on('error', (err) => { | ||
this.onError(err) | ||
}) | ||
} | ||
if (signal) { | ||
util.addListener(signal, () => { | ||
if (this.abort) { | ||
this.abort() | ||
} else { | ||
this.onError(new RequestAbortedError()) | ||
} | ||
}) | ||
} | ||
} | ||
onConnect (abort) { | ||
if (!this.callback) { | ||
abort() | ||
} else { | ||
this.abort = abort | ||
} | ||
} | ||
onHeaders (statusCode, headers, resume) { | ||
const { callback, opaque } = this | ||
const { callback, opaque, abort } = this | ||
@@ -46,3 +102,3 @@ if (statusCode < 200) { | ||
const body = new RequestResponse(resume) | ||
const body = new RequestResponse(resume, abort) | ||
@@ -54,3 +110,3 @@ this.callback = null | ||
statusCode, | ||
headers, | ||
headers: util.parseHeaders(headers), | ||
opaque, | ||
@@ -63,7 +119,2 @@ body | ||
const { res } = this | ||
if (res._readableState.destroyed) { | ||
return | ||
} | ||
return res.push(chunk) | ||
@@ -74,7 +125,2 @@ } | ||
const { res } = this | ||
if (res._readableState.destroyed) { | ||
return | ||
} | ||
res.push(null) | ||
@@ -84,7 +130,9 @@ } | ||
onError (err) { | ||
const { res, callback, opaque } = this | ||
const { res, callback, body, opaque } = this | ||
if (callback) { | ||
this.callback = null | ||
this.runInAsyncScope(callback, null, err, { opaque }) | ||
process.nextTick((self, callback, err, opaque) => { | ||
self.runInAsyncScope(callback, null, err, { opaque }) | ||
}, this, callback, err, opaque) | ||
} | ||
@@ -96,9 +144,14 @@ | ||
} | ||
if (body) { | ||
this.body = null | ||
util.destroy(body, err) | ||
} | ||
} | ||
} | ||
module.exports = function request (client, opts, callback) { | ||
function request (opts, callback) { | ||
if (callback === undefined) { | ||
return new Promise((resolve, reject) => { | ||
request(client, opts, (err, data) => { | ||
request.call(this, opts, (err, data) => { | ||
return err ? reject(err) : resolve(data) | ||
@@ -109,19 +162,13 @@ }) | ||
if (typeof callback !== 'function') { | ||
throw new InvalidArgumentError('invalid callback') | ||
} | ||
try { | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
this.dispatch(opts, new RequestHandler(opts, callback)) | ||
} catch (err) { | ||
if (typeof callback === 'function') { | ||
process.nextTick(callback, err, { opaque: opts && opts.opaque }) | ||
} else { | ||
throw err | ||
} | ||
if (opts.method === 'CONNECT') { | ||
throw new InvalidArgumentError('invalid method') | ||
} | ||
client.dispatch(opts, new RequestHandler(opts, callback)) | ||
} catch (err) { | ||
process.nextTick(callback, err, null) | ||
} | ||
} | ||
module.exports = request |
@@ -6,5 +6,6 @@ 'use strict' | ||
InvalidArgumentError, | ||
InvalidReturnValueError | ||
} = require('./errors') | ||
const util = require('./util') | ||
InvalidReturnValueError, | ||
RequestAbortedError | ||
} = require('./core/errors') | ||
const util = require('./core/util') | ||
const { AsyncResource } = require('async_hooks') | ||
@@ -14,13 +15,68 @@ | ||
constructor (opts, factory, callback) { | ||
super('UNDICI_STREAM') | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
this.opaque = opts.opaque || null | ||
const { signal, method, opaque, body } = opts | ||
try { | ||
if (typeof callback !== 'function') { | ||
throw new InvalidArgumentError('invalid callback') | ||
} | ||
if (typeof factory !== 'function') { | ||
throw new InvalidArgumentError('invalid factory') | ||
} | ||
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { | ||
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') | ||
} | ||
if (method === 'CONNECT') { | ||
throw new InvalidArgumentError('invalid method') | ||
} | ||
super('UNDICI_STREAM') | ||
} catch (err) { | ||
if (util.isStream(body)) { | ||
util.destroy(body.on('error', util.nop), err) | ||
} | ||
throw err | ||
} | ||
this.opaque = opaque || null | ||
this.factory = factory | ||
this.callback = callback | ||
this.res = null | ||
this.abort = null | ||
this.trailers = null | ||
this.body = body | ||
if (util.isStream(body)) { | ||
body.on('error', (err) => { | ||
this.onError(err) | ||
}) | ||
} | ||
if (signal) { | ||
util.addListener(signal, () => { | ||
if (this.abort) { | ||
this.abort() | ||
} else { | ||
this.onError(new RequestAbortedError()) | ||
} | ||
}) | ||
} | ||
} | ||
onConnect (abort) { | ||
if (!this.callback) { | ||
abort() | ||
} else { | ||
this.abort = abort | ||
} | ||
} | ||
onHeaders (statusCode, headers, resume) { | ||
const { factory, callback, opaque } = this | ||
const { factory, opaque } = this | ||
@@ -31,23 +87,16 @@ if (statusCode < 200) { | ||
let res | ||
try { | ||
this.factory = null | ||
res = this.runInAsyncScope(factory, null, { | ||
statusCode, | ||
headers, | ||
opaque | ||
}) | ||
this.factory = null | ||
const res = this.runInAsyncScope(factory, null, { | ||
statusCode, | ||
headers: util.parseHeaders(headers), | ||
opaque | ||
}) | ||
if ( | ||
!res || | ||
typeof res.write !== 'function' || | ||
typeof res.end !== 'function' || | ||
typeof res.on !== 'function' | ||
) { | ||
throw new InvalidReturnValueError('expected Writable') | ||
} | ||
} catch (err) { | ||
this.callback = null | ||
this.runInAsyncScope(callback, null, err, { opaque }) | ||
return | ||
if ( | ||
!res || | ||
typeof res.write !== 'function' || | ||
typeof res.end !== 'function' || | ||
typeof res.on !== 'function' | ||
) { | ||
throw new InvalidReturnValueError('expected Writable') | ||
} | ||
@@ -58,3 +107,3 @@ | ||
finished(res, { readable: false }, (err) => { | ||
const { callback, res, opaque, trailers } = this | ||
const { callback, res, opaque, trailers, abort } = this | ||
@@ -68,2 +117,6 @@ this.res = null | ||
this.runInAsyncScope(callback, null, err || null, { opaque, trailers }) | ||
if (err) { | ||
abort() | ||
} | ||
}) | ||
@@ -76,7 +129,2 @@ | ||
const { res } = this | ||
if (util.isDestroyed(res)) { | ||
return | ||
} | ||
return res.write(chunk) | ||
@@ -88,8 +136,4 @@ } | ||
if (util.isDestroyed(res)) { | ||
return | ||
} | ||
this.trailers = trailers ? util.parseHeaders(trailers) : {} | ||
this.trailers = trailers || {} | ||
res.end() | ||
@@ -99,3 +143,3 @@ } | ||
onError (err) { | ||
const { res, callback, opaque } = this | ||
const { res, callback, opaque, body } = this | ||
@@ -109,11 +153,18 @@ this.factory = null | ||
this.callback = null | ||
this.runInAsyncScope(callback, null, err, { opaque }) | ||
process.nextTick((self, callback, err, opaque) => { | ||
self.runInAsyncScope(callback, null, err, { opaque }) | ||
}, this, callback, err, opaque) | ||
} | ||
if (body) { | ||
this.body = null | ||
util.destroy(body, err) | ||
} | ||
} | ||
} | ||
module.exports = function stream (client, opts, factory, callback) { | ||
function stream (opts, factory, callback) { | ||
if (callback === undefined) { | ||
return new Promise((resolve, reject) => { | ||
stream(client, opts, factory, (err, data) => { | ||
stream.call(this, opts, factory, (err, data) => { | ||
return err ? reject(err) : resolve(data) | ||
@@ -124,23 +175,13 @@ }) | ||
if (typeof callback !== 'function') { | ||
throw new InvalidArgumentError('invalid callback') | ||
} | ||
try { | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
this.dispatch(opts, new StreamHandler(opts, factory, callback)) | ||
} catch (err) { | ||
if (typeof callback === 'function') { | ||
process.nextTick(callback, err, { opaque: opts && opts.opaque }) | ||
} else { | ||
throw err | ||
} | ||
if (typeof factory !== 'function') { | ||
throw new InvalidArgumentError('invalid factory') | ||
} | ||
if (opts.method === 'CONNECT') { | ||
throw new InvalidArgumentError('invalid method') | ||
} | ||
client.dispatch(opts, new StreamHandler(opts, factory, callback)) | ||
} catch (err) { | ||
process.nextTick(callback, err, null) | ||
} | ||
} | ||
module.exports = stream |
'use strict' | ||
const { | ||
InvalidArgumentError | ||
} = require('./errors') | ||
InvalidArgumentError, | ||
RequestAbortedError | ||
} = require('./core/errors') | ||
const { AsyncResource } = require('async_hooks') | ||
const util = require('./core/util') | ||
class UpgradeHandler { | ||
class UpgradeHandler extends AsyncResource { | ||
constructor (opts, callback) { | ||
this.opaque = opts.opaque || null | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
const { signal, opaque } = opts | ||
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { | ||
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') | ||
} | ||
super('UNDICI_UPGRADE') | ||
this.opaque = opaque || null | ||
this.callback = callback | ||
this.abort = null | ||
if (signal) { | ||
util.addListener(signal, () => { | ||
if (this.abort) { | ||
this.abort() | ||
} else { | ||
this.onError(new RequestAbortedError()) | ||
} | ||
}) | ||
} | ||
} | ||
onConnect (abort) { | ||
if (!this.callback) { | ||
abort() | ||
} else { | ||
this.abort = abort | ||
} | ||
} | ||
onUpgrade (statusCode, headers, socket) { | ||
@@ -17,4 +51,4 @@ const { callback, opaque } = this | ||
this.callback = null | ||
callback(null, { | ||
headers, | ||
this.runInAsyncScope(callback, null, null, { | ||
headers: util.parseHeaders(headers), | ||
socket, | ||
@@ -28,11 +62,15 @@ opaque | ||
this.callback = null | ||
callback(err, { opaque }) | ||
if (callback) { | ||
this.callback = null | ||
process.nextTick((self, callback, err, opaque) => { | ||
self.runInAsyncScope(callback, null, err, { opaque }) | ||
}, this, callback, err, opaque) | ||
} | ||
} | ||
} | ||
module.exports = function upgrade (client, opts, callback) { | ||
function upgrade (opts, callback) { | ||
if (callback === undefined) { | ||
return new Promise((resolve, reject) => { | ||
upgrade(client, opts, (err, data) => { | ||
upgrade.call(this, opts, (err, data) => { | ||
return err ? reject(err) : resolve(data) | ||
@@ -48,6 +86,3 @@ }) | ||
try { | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
const upgradeHandler = new UpgradeHandler(opts, callback) | ||
const { | ||
@@ -62,3 +97,3 @@ path, | ||
} = opts | ||
client.dispatch({ | ||
this.dispatch({ | ||
path, | ||
@@ -71,6 +106,8 @@ method: method || 'GET', | ||
upgrade: protocol || 'Websocket' | ||
}, new UpgradeHandler(opts, callback)) | ||
}, upgradeHandler) | ||
} catch (err) { | ||
process.nextTick(callback, err, null) | ||
process.nextTick(callback, err, { opaque: opts && opts.opaque }) | ||
} | ||
} | ||
module.exports = upgrade |
162
lib/pool.js
'use strict' | ||
const Client = require('./client') | ||
const Client = require('./core/client') | ||
const { | ||
InvalidArgumentError | ||
} = require('./errors') | ||
const { | ||
kClients, | ||
kGetNext | ||
} = require('./symbols') | ||
ClientClosedError, | ||
InvalidArgumentError, | ||
ClientDestroyedError | ||
} = require('./core/errors') | ||
const FixedQueue = require('./core/node/fixed-queue') | ||
const kClients = Symbol('clients') | ||
const kQueue = Symbol('queue') | ||
const kDestroyed = Symbol('destroyed') | ||
const kClosedPromise = Symbol('closed promise') | ||
const kClosedResolve = Symbol('closed resolve') | ||
class Pool { | ||
@@ -21,58 +26,82 @@ constructor (url, { | ||
this[kQueue] = new FixedQueue() | ||
this[kClosedPromise] = null | ||
this[kClosedResolve] = null | ||
this[kDestroyed] = false | ||
this[kClients] = Array.from({ | ||
length: connections || 10 | ||
}, () => new Client(url, options)) | ||
} | ||
/* istanbul ignore next: use by benchmark */ | ||
[kGetNext] () { | ||
return getNext(this) | ||
} | ||
const pool = this | ||
function onDrain () { | ||
const queue = pool[kQueue] | ||
stream (opts, factory, cb) { | ||
if (cb === undefined) { | ||
return new Promise((resolve, reject) => { | ||
this.stream(opts, factory, (err, data) => { | ||
return err ? reject(err) : resolve(data) | ||
}) | ||
}) | ||
while (!this.busy) { | ||
const item = queue.shift() | ||
if (!item) { | ||
break | ||
} | ||
this.dispatch(item.opts, item.handler) | ||
} | ||
if (pool[kClosedResolve] && queue.isEmpty()) { | ||
Promise | ||
.all(pool[kClients].map(c => c.close())) | ||
.then(pool[kClosedResolve]) | ||
} | ||
} | ||
getNext(this).stream(opts, factory, cb) | ||
for (const client of this[kClients]) { | ||
client.on('drain', onDrain) | ||
} | ||
} | ||
pipeline (opts, handler) { | ||
return getNext(this).pipeline(opts, handler) | ||
} | ||
dispatch (opts, handler) { | ||
try { | ||
if (this[kDestroyed]) { | ||
throw new ClientDestroyedError() | ||
} | ||
request (opts, cb) { | ||
if (cb === undefined) { | ||
return new Promise((resolve, reject) => { | ||
this.request(opts, (err, data) => { | ||
return err ? reject(err) : resolve(data) | ||
}) | ||
}) | ||
if (this[kClosedPromise]) { | ||
throw new ClientClosedError() | ||
} | ||
const client = this[kClients].find(client => !client.busy) | ||
if (!client) { | ||
this[kQueue].push({ opts, handler }) | ||
} else { | ||
client.dispatch(opts, handler) | ||
} | ||
} catch (err) { | ||
handler.onError(err) | ||
} | ||
getNext(this).request(opts, cb) | ||
} | ||
upgrade (opts, callback) { | ||
return getNext(this).upgrade(opts, callback) | ||
} | ||
close (cb) { | ||
try { | ||
if (this[kDestroyed]) { | ||
throw new ClientDestroyedError() | ||
} | ||
connect (opts, callback) { | ||
return getNext(this).connect(opts, callback) | ||
} | ||
if (!this[kClosedPromise]) { | ||
if (this[kQueue].isEmpty()) { | ||
this[kClosedPromise] = Promise.all(this[kClients].map(c => c.close())) | ||
} else { | ||
this[kClosedPromise] = new Promise((resolve) => { | ||
this[kClosedResolve] = resolve | ||
}) | ||
} | ||
} | ||
dispatch (opts, handler) { | ||
return getNext(this).dispatch(opts, handler) | ||
} | ||
close (cb) { | ||
const promise = Promise.all(this[kClients].map(c => c.close())) | ||
if (cb) { | ||
promise.then(() => cb(null, null), (err) => cb(err, null)) | ||
} else { | ||
return promise | ||
if (cb) { | ||
this[kClosedPromise].then(() => cb(null, null)) | ||
} else { | ||
return this[kClosedPromise] | ||
} | ||
} catch (err) { | ||
if (cb) { | ||
cb(err) | ||
} else { | ||
return Promise.reject(err) | ||
} | ||
} | ||
@@ -82,2 +111,4 @@ } | ||
destroy (err, cb) { | ||
this[kDestroyed] = true | ||
if (typeof err === 'function') { | ||
@@ -88,2 +119,14 @@ cb = err | ||
if (!err) { | ||
err = new ClientDestroyedError() | ||
} | ||
while (true) { | ||
const item = this[kQueue].shift() | ||
if (!item) { | ||
break | ||
} | ||
item.handler.onError(err) | ||
} | ||
const promise = Promise.all(this[kClients].map(c => c.destroy(err))) | ||
@@ -98,25 +141,2 @@ if (cb) { | ||
function getNext (pool) { | ||
let next | ||
for (const client of pool[kClients]) { | ||
if (client.busy) { | ||
continue | ||
} | ||
if (!next) { | ||
next = client | ||
} | ||
if (client.connected) { | ||
return client | ||
} | ||
} | ||
if (next) { | ||
return next | ||
} | ||
return pool[kClients][Math.floor(Math.random() * pool[kClients].length)] | ||
} | ||
module.exports = Pool |
{ | ||
"name": "undici", | ||
"version": "1.3.1", | ||
"version": "2.0.0", | ||
"description": "An HTTP/1.1 client, written from scratch for Node.js", | ||
"main": "index.js", | ||
"module": "wrapper.mjs", | ||
"scripts": { | ||
"lint": "standard | snazzy", | ||
"test": "tap test/*.js --no-coverage", | ||
"test": "tap test/*.{mjs,js} --no-coverage && jest test/jest/test", | ||
"coverage": "standard | snazzy && tap test/*.js", | ||
@@ -14,3 +15,3 @@ "bench": "npx concurrently -k -s first \"node benchmarks/server.js\" \"node -e 'setTimeout(() => {}, 1000)' && node benchmarks\"" | ||
"type": "git", | ||
"url": "git+https://github.com/mcollina/undici.git" | ||
"url": "git+https://github.com/nodejs/undici.git" | ||
}, | ||
@@ -27,5 +28,5 @@ "author": "Matteo Collina <hello@matteocollina.com>", | ||
"bugs": { | ||
"url": "https://github.com/mcollina/undici/issues" | ||
"url": "https://github.com/nodejs/undici/issues" | ||
}, | ||
"homepage": "https://github.com/mcollina/undici#readme", | ||
"homepage": "https://github.com/nodejs/undici#readme", | ||
"devDependencies": { | ||
@@ -37,2 +38,3 @@ "@sinonjs/fake-timers": "^6.0.1", | ||
"https-pem": "^2.0.0", | ||
"jest": "^26.4.0", | ||
"pre-commit": "^1.2.2", | ||
@@ -39,0 +41,0 @@ "proxyquire": "^2.0.1", |
# undici | ||
![Node CI](https://github.com/mcollina/undici/workflows/Node%20CI/badge.svg) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat)](http://standardjs.com/) | ||
![Node CI](https://github.com/mcollina/undici/workflows/Node%20CI/badge.svg) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat)](http://standardjs.com/) [![npm version](https://badge.fury.io/js/undici.svg)](https://badge.fury.io/js/undici) | ||
@@ -26,7 +26,7 @@ A HTTP/1.1 client, written from scratch for Node.js. | ||
``` | ||
http - keepalive x 5,847 ops/sec ±2.69% (276 runs sampled) | ||
undici - pipeline x 8,748 ops/sec ±2.90% (277 runs sampled) | ||
undici - request x 12,166 ops/sec ±0.80% (278 runs sampled) | ||
undici - stream x 12,969 ops/sec ±0.82% (278 runs sampled) | ||
undici - dispatch x 13,736 ops/sec ±0.60% (280 runs sampled) | ||
http - keepalive x 5,882 ops/sec ±1.87% (274 runs sampled) | ||
undici - pipeline x 9,189 ops/sec ±2.02% (272 runs sampled) | ||
undici - request x 12,623 ops/sec ±0.89% (277 runs sampled) | ||
undici - stream x 14,136 ops/sec ±0.61% (280 runs sampled) | ||
undici - dispatch x 14,883 ops/sec ±0.44% (281 runs sampled) | ||
``` | ||
@@ -61,3 +61,6 @@ | ||
- `maxKeepAliveTimeout: Number`, the maximum allowed `idleTimeout` when overriden by | ||
- `keepAlive: Boolean`, enable or disable keep alive connections. | ||
Default: `true`. | ||
- `keepAliveMaxTimeout: Number`, the maximum allowed `idleTimeout` when overriden by | ||
*keep-alive* hints from the server. | ||
@@ -71,12 +74,2 @@ Default: `600e3` milliseconds (10min). | ||
- `requestTimeout: Number`, the timeout after which a request will time out. | ||
Monitors time between request being enqueued and receiving | ||
a response. Use `0` to disable it entirely. | ||
Default: `30e3` milliseconds (30s). | ||
- `maxAbortedPayload: Number`, the maximum number of bytes read after which an | ||
aborted response will close the connection. Closing the connection | ||
will error other inflight requests in the pipeline. | ||
Default: `1048576` bytes (1MiB). | ||
- `pipelining: Number`, the amount of concurrent requests to be sent over the | ||
@@ -91,3 +84,3 @@ single TCP/TLS connection according to | ||
- `maxHeaderSize: Number`, the maximum length of request headers in bytes. | ||
- `maxHeaderSize: Number`, the maximum length of request headers in bytes. | ||
Default: `16384` (16KiB). | ||
@@ -109,7 +102,7 @@ | ||
* `opaque: Any` | ||
* `body: String|Buffer|Uint8Array|stream.Readable|Null`. | ||
* `body: String|Buffer|Uint8Array|stream.Readable|Null` | ||
Default: `null`. | ||
* `headers: Object|Null`, an object with header-value pairs. | ||
Default: `null`. | ||
* `signal: AbortController|EventEmitter|Null`. | ||
* `signal: AbortController|EventEmitter|Null` | ||
Default: `null`. | ||
@@ -431,26 +424,49 @@ * `requestTimeout: Number`, the timeout after which a request will time out, in | ||
This is the low level API which all the preceeding API's are implemented on top of. | ||
This is the low level API which all the preceeding APIs are implemented on top of. | ||
This API is expected to evolve through semver-major versions and is less stable | ||
than the preceeding higher level APIs. It is primarily intended for library developers | ||
who implement higher level APIs on top of this. | ||
Options: | ||
* ... same as [`client.request(opts[, callback])`][request]. | ||
* `path: String` | ||
* `method: String` | ||
* `body: String|Buffer|Uint8Array|stream.Readable|Null` | ||
Default: `null`. | ||
* `headers: Object|Null`, an object with header-value pairs. | ||
Default: `null`. | ||
* `requestTimeout: Number`, the timeout after which a request will time out, in | ||
milliseconds. Monitors time between request being enqueued and receiving | ||
a response. Use `0` to disable it entirely. | ||
Default: `30e3` milliseconds (30s). | ||
* `idempotent: Boolean`, whether the requests can be safely retried or not. | ||
If `false` the request won't be sent until all preceeding | ||
requests in the pipeline has completed. | ||
Default: `true` if `method` is `HEAD` or `GET`. | ||
The `handler` parameter is defined as follow: | ||
* `onConnect(abort)`, invoked before request is dispatched on socket. | ||
May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. | ||
* `abort(): Void`, abort request. | ||
* `onUpgrade(statusCode, headers, socket): Void`, invoked when request is upgraded either due to a `Upgrade` header or `CONNECT` method. | ||
* `statusCode: Number` | ||
* `headers: Object` | ||
* `headers: Array|Null` | ||
* `socket: Duplex` | ||
* `onHeaders(statusCode, headers, resume): Void`, invoked when statusCode and headers have been received. | ||
* `onHeaders(statusCode, headers, resume): Void`, invoked when statusCode and headers have been received. | ||
May be invoked multiple times due to 1xx informational headers. | ||
* `statusCode: Number` | ||
* `headers: Object` | ||
* `headers: Array|Null`, an array of key-value pairs. Keys are not automatically lowercased. | ||
* `resume(): Void`, resume `onData` after returning `false`. | ||
* `onData(chunk): Null|Boolean`, invoked when response payload data is received. | ||
* `onData(chunk): Boolean`, invoked when response payload data is received. | ||
* `chunk: Buffer` | ||
* `onComplete(trailers): Void`, invoked when response payload and trailers have been received and the request has completed. | ||
* `trailers: Object` | ||
* `trailers: Array|Null` | ||
* `onError(err): Void`, invoked when an error has occured. | ||
* `err: Error` | ||
The caller is responsible for handling the `body` argument, in terms of `'error'` events and `destroy()`:ing up until | ||
the `onConnect` handler has been invoked. | ||
<a name='close'></a> | ||
@@ -512,2 +528,5 @@ #### `client.close([callback]): Promise|Void` | ||
* `'drain'`, emitted when pipeline is no longer fully | ||
saturated. | ||
* `'connect'`, emitted when a socket has been created and | ||
@@ -518,3 +537,3 @@ connected. The client will connect once `client.size > 0`. | ||
first argument of the event is the error which caused the | ||
socket to disconnect. The client will reconnect if or once | ||
socket to disconnect. The client will reconnect if or once | ||
`client.size > 0`. | ||
@@ -531,4 +550,7 @@ | ||
* `connections`, the number of clients to create. | ||
Default `100`. | ||
Default `10`. | ||
`Pool` does not guarantee that requests are dispatched in | ||
order of invocation. | ||
#### `pool.request(opts[, callback]): Promise|Void` | ||
@@ -592,3 +614,3 @@ | ||
This section documents parts of the HTTP/1.1 specification which Undici does | ||
This section documents parts of the HTTP/1.1 specification which Undici does | ||
not support or does not fully implement. | ||
@@ -599,3 +621,3 @@ | ||
Undici does not support the `Expect` request header field. The request | ||
body is always immediately sent and the `100 Continue` response will be | ||
body is always immediately sent and the `100 Continue` response will be | ||
ignored. | ||
@@ -607,8 +629,8 @@ | ||
Uncidi will only use pipelining if configured with a `pipelining` factor | ||
Uncidi will only use pipelining if configured with a `pipelining` factor | ||
greater than `1`. | ||
Undici always assumes that connections are persistent and will immediatly | ||
Undici always assumes that connections are persistent and will immediatly | ||
pipeline requests, without checking whether the connection is persistent. | ||
Hence, automatic fallback to HTTP/1.0 or HTTP/1.1 without pipelining is | ||
Hence, automatic fallback to HTTP/1.0 or HTTP/1.1 without pipelining is | ||
not supported. | ||
@@ -615,0 +637,0 @@ |
@@ -203,27 +203,1 @@ 'use strict' | ||
writeBodyStartedWithBody(new Uint8Array([42]), 'Uint8Array') | ||
test('cleanup listener', (t) => { | ||
t.plan(4) | ||
const abortController = new AbortController() | ||
abortController.signal.addEventListener = (name) => t.strictEqual(name, 'abort') | ||
abortController.signal.removeEventListener = (name) => t.strictEqual(name, 'abort') | ||
const server = createServer((req, res) => { | ||
res.writeHead(200, { 'content-type': 'text/plain' }) | ||
res.write('hello') | ||
res.end('world') | ||
}) | ||
t.teardown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.teardown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { | ||
t.error(err) | ||
response.body.on('end', () => { | ||
t.pass() | ||
}).resume() | ||
}) | ||
}) | ||
}) |
@@ -214,27 +214,1 @@ 'use strict' | ||
writeBodyStartedWithBody(new Uint8Array([42]), 'Uint8Array') | ||
test('cleanup listener', (t) => { | ||
t.plan(4) | ||
const ee = new EventEmitter() | ||
ee.addListener = (name) => t.strictEqual(name, 'abort') | ||
ee.removeListener = (name) => t.strictEqual(name, 'abort') | ||
const server = createServer((req, res) => { | ||
res.writeHead(200, { 'content-type': 'text/plain' }) | ||
res.write('hello') | ||
res.end('world') | ||
}) | ||
t.teardown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.teardown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => { | ||
t.error(err) | ||
response.body.on('end', () => { | ||
t.pass() | ||
}).resume() | ||
}) | ||
}) | ||
}) |
@@ -37,154 +37,2 @@ 'use strict' | ||
test('aborted GET maxAbortedPayload reset', (t) => { | ||
t.plan(5) | ||
const server = createServer((req, res) => { | ||
res.end(Buffer.alloc(1e6 - 1)) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 2, | ||
maxAbortedPayload: 1e6 | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
client.on('disconnect', () => { | ||
t.fail() | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, { body }) => { | ||
t.error(err) | ||
body.once('data', (chunk) => { | ||
body.destroy() | ||
}).once('error', (err) => { | ||
t.ok(err) | ||
}).on('end', () => { | ||
t.fail() | ||
}) | ||
}) | ||
// Make sure read counter is reset. | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, { body }) => { | ||
t.error(err) | ||
body.once('data', (chunk) => { | ||
body.destroy() | ||
}).on('error', (err) => { | ||
t.ok(err) | ||
}).on('end', () => { | ||
t.fail() | ||
}) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, { body }) => { | ||
t.error(err) | ||
body.resume() | ||
}) | ||
}) | ||
}) | ||
test('aborted GET maxAbortedPayload', (t) => { | ||
t.plan(5) | ||
const server = createServer((req, res) => { | ||
res.end(Buffer.alloc(100000 + 1, 'a')) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 1, | ||
maxAbortedPayload: 100000 | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, { body }) => { | ||
t.error(err) | ||
body.once('data', () => { | ||
body.destroy() | ||
}).once('error', (err) => { | ||
t.ok(err) | ||
}) | ||
// old Readable emits error twice | ||
.on('error', () => {}) | ||
}) | ||
client.on('disconnect', () => { | ||
t.pass() | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, { body }) => { | ||
t.error(err) | ||
body.resume() | ||
}) | ||
client.close((err) => { | ||
t.error(err) | ||
}) | ||
}) | ||
}) | ||
test('aborted GET maxAbortedPayload less than HWM', (t) => { | ||
t.plan(4) | ||
const server = createServer((req, res) => { | ||
res.end(Buffer.alloc(4 + 1, 'a')) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 1, | ||
maxAbortedPayload: 4 | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, { body }) => { | ||
t.error(err) | ||
body.once('data', () => { | ||
body.destroy() | ||
}).once('error', (err) => { | ||
t.ok(err) | ||
}) | ||
// old Readable emits error twice | ||
.on('error', () => {}) | ||
}) | ||
client.on('disconnect', () => { | ||
t.fail() | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, { body }) => { | ||
t.error(err) | ||
body.resume() | ||
}) | ||
client.close((err) => { | ||
t.error(err) | ||
}) | ||
}) | ||
}) | ||
test('aborted req', (t) => { | ||
@@ -191,0 +39,0 @@ t.plan(1) |
@@ -192,3 +192,3 @@ 'use strict' | ||
test('connect aborted', (t) => { | ||
t.plan(3) | ||
t.plan(4) | ||
@@ -203,8 +203,12 @@ const server = http.createServer((req, res) => { | ||
server.listen(0, async () => { | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 3 | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.on('disconnect', () => { | ||
t.fail() | ||
}) | ||
const signal = new EE() | ||
@@ -221,3 +225,108 @@ client.connect({ | ||
signal.emit('abort') | ||
client.close(() => { | ||
t.pass() | ||
}) | ||
}) | ||
}) | ||
test('basic connect error', (t) => { | ||
t.plan(2) | ||
const server = http.createServer((c) => { | ||
t.fail() | ||
}) | ||
server.on('connect', (req, socket, firstBodyChunk) => { | ||
socket.write('HTTP/1.1 200 Connection established\r\n\r\n') | ||
let data = firstBodyChunk.toString() | ||
socket.on('data', (buf) => { | ||
data += buf.toString() | ||
}) | ||
socket.on('end', () => { | ||
socket.end(data) | ||
}) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
const _err = new Error() | ||
client.connect({ | ||
path: '/' | ||
}, (err, { socket }) => { | ||
t.error(err) | ||
socket.on('error', (err) => { | ||
t.strictEqual(err, _err) | ||
}) | ||
throw _err | ||
}) | ||
}) | ||
}) | ||
test('connect invalid signal', (t) => { | ||
t.plan(2) | ||
const server = http.createServer((req, res) => { | ||
t.fail() | ||
}) | ||
server.on('connect', (req, c, firstBodyChunk) => { | ||
t.fail() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.on('disconnect', () => { | ||
t.fail() | ||
}) | ||
client.connect({ | ||
path: '/', | ||
signal: 'error', | ||
opaque: 'asd' | ||
}, (err, { opaque }) => { | ||
t.strictEqual(opaque, 'asd') | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
}) | ||
}) | ||
test('connect aborted after connect', (t) => { | ||
t.plan(3) | ||
const signal = new EE() | ||
const server = http.createServer((req, res) => { | ||
t.fail() | ||
}) | ||
server.on('connect', (req, c, firstBodyChunk) => { | ||
signal.emit('abort') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 3 | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.on('disconnect', () => { | ||
t.fail() | ||
}) | ||
client.connect({ | ||
path: '/', | ||
signal, | ||
opaque: 'asd' | ||
}, (err, { opaque }) => { | ||
t.strictEqual(opaque, 'asd') | ||
t.ok(err instanceof errors.RequestAbortedError) | ||
}) | ||
t.strictEqual(client.busy, true) | ||
}) | ||
}) |
@@ -5,2 +5,3 @@ 'use strict' | ||
const { Client, errors } = require('..') | ||
const http = require('http') | ||
@@ -12,11 +13,320 @@ test('dispatch invalid opts', (t) => { | ||
try { | ||
client.dispatch({ | ||
path: '/', | ||
method: 'GET', | ||
upgrade: 1 | ||
}, { | ||
onError (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
} | ||
}) | ||
}) | ||
test('basic dispatch get', (t) => { | ||
t.plan(10) | ||
const server = http.createServer((req, res) => { | ||
t.strictEqual('/', req.url) | ||
t.strictEqual('GET', req.method) | ||
t.strictEqual('localhost', req.headers.host) | ||
t.strictEqual(undefined, req.headers.foo) | ||
t.strictEqual('bar', req.headers.bar) | ||
t.strictEqual(undefined, req.headers['content-length']) | ||
res.end('hello') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
const reqHeaders = { | ||
foo: undefined, | ||
bar: 'bar' | ||
} | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
const bufs = [] | ||
client.dispatch({ | ||
path: '/', | ||
method: 'GET', | ||
upgrade: 1 | ||
headers: reqHeaders | ||
}, { | ||
onConnect () { | ||
}, | ||
onHeaders (statusCode, headers) { | ||
t.strictEqual(statusCode, 200) | ||
t.strictEqual(Array.isArray(headers), true) | ||
}, | ||
onData (buf) { | ||
bufs.push(buf) | ||
}, | ||
onComplete (trailers) { | ||
t.strictEqual(trailers, null) | ||
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) | ||
}, | ||
onError () { | ||
t.fail() | ||
} | ||
}) | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
}) | ||
test('trailers dispatch get', (t) => { | ||
t.plan(12) | ||
const server = http.createServer((req, res) => { | ||
t.strictEqual('/', req.url) | ||
t.strictEqual('GET', req.method) | ||
t.strictEqual('localhost', req.headers.host) | ||
t.strictEqual(undefined, req.headers.foo) | ||
t.strictEqual('bar', req.headers.bar) | ||
t.strictEqual(undefined, req.headers['content-length']) | ||
res.addTrailers({ 'Content-MD5': 'test' }) | ||
res.setHeader('Content-Type', 'text/plain') | ||
res.setHeader('Trailer', 'Content-MD5') | ||
res.end('hello') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
const reqHeaders = { | ||
foo: undefined, | ||
bar: 'bar' | ||
} | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
const bufs = [] | ||
client.dispatch({ | ||
path: '/', | ||
method: 'GET', | ||
headers: reqHeaders | ||
}, { | ||
onConnect () { | ||
}, | ||
onHeaders (statusCode, headers) { | ||
t.strictEqual(statusCode, 200) | ||
t.strictEqual(Array.isArray(headers), true) | ||
{ | ||
const contentTypeIdx = headers.findIndex(x => x === 'Content-Type') | ||
t.strictEqual(headers[contentTypeIdx + 1], 'text/plain') | ||
} | ||
}, | ||
onData (buf) { | ||
bufs.push(buf) | ||
}, | ||
onComplete (trailers) { | ||
t.strictEqual(Array.isArray(trailers), true) | ||
{ | ||
const contentMD5Idx = trailers.findIndex(x => x === 'Content-MD5') | ||
t.strictEqual(trailers[contentMD5Idx + 1], 'test') | ||
} | ||
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) | ||
}, | ||
onError () { | ||
t.fail() | ||
} | ||
}) | ||
}) | ||
}) | ||
test('dispatch onHeaders error', (t) => { | ||
t.plan(1) | ||
const server = http.createServer((req, res) => { | ||
res.end() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
const _err = new Error() | ||
client.dispatch({ | ||
path: '/', | ||
method: 'GET' | ||
}, { | ||
onConnect () { | ||
}, | ||
onHeaders (statusCode, headers) { | ||
throw _err | ||
}, | ||
onData (buf) { | ||
t.fail() | ||
}, | ||
onComplete (trailers) { | ||
t.fail() | ||
}, | ||
onError (err) { | ||
t.strictEqual(err, _err) | ||
} | ||
}) | ||
}) | ||
}) | ||
test('dispatch onComplete error', (t) => { | ||
t.plan(2) | ||
const server = http.createServer((req, res) => { | ||
res.end() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
const _err = new Error() | ||
client.dispatch({ | ||
path: '/', | ||
method: 'GET' | ||
}, { | ||
onConnect () { | ||
}, | ||
onHeaders (statusCode, headers) { | ||
t.pass() | ||
}, | ||
onData (buf) { | ||
t.fail() | ||
}, | ||
onComplete (trailers) { | ||
throw _err | ||
}, | ||
onError (err) { | ||
t.strictEqual(err, _err) | ||
} | ||
}) | ||
}) | ||
}) | ||
test('dispatch onData error', (t) => { | ||
t.plan(2) | ||
const server = http.createServer((req, res) => { | ||
res.end('ad') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
const _err = new Error() | ||
client.dispatch({ | ||
path: '/', | ||
method: 'GET' | ||
}, { | ||
onConnect () { | ||
}, | ||
onHeaders (statusCode, headers) { | ||
t.pass() | ||
}, | ||
onData (buf) { | ||
throw _err | ||
}, | ||
onComplete (trailers) { | ||
t.fail() | ||
}, | ||
onError (err) { | ||
t.strictEqual(err, _err) | ||
} | ||
}) | ||
}) | ||
}) | ||
test('dispatch onConnect error', (t) => { | ||
t.plan(1) | ||
const server = http.createServer((req, res) => { | ||
res.end('ad') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
const _err = new Error() | ||
client.dispatch({ | ||
path: '/', | ||
method: 'GET' | ||
}, { | ||
onConnect () { | ||
throw _err | ||
}, | ||
onHeaders (statusCode, headers) { | ||
t.fail() | ||
}, | ||
onData (buf) { | ||
t.fail() | ||
}, | ||
onComplete (trailers) { | ||
t.fail() | ||
}, | ||
onError (err) { | ||
t.strictEqual(err, _err) | ||
} | ||
}) | ||
}) | ||
}) | ||
test('connect call onUpgrade once', (t) => { | ||
t.plan(2) | ||
const server = http.createServer((c) => { | ||
t.fail() | ||
}) | ||
server.on('connect', (req, socket, firstBodyChunk) => { | ||
socket.write('HTTP/1.1 200 Connection established\r\n\r\n') | ||
let data = firstBodyChunk.toString() | ||
socket.on('data', (buf) => { | ||
data += buf.toString() | ||
}) | ||
socket.on('end', () => { | ||
socket.end(data) | ||
}) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
let recvData = '' | ||
let count = 0 | ||
client.dispatch({ | ||
method: 'CONNECT', | ||
path: '/' | ||
}, { | ||
onConnect () { | ||
}, | ||
onUpgrade (statusCode, headers, socket) { | ||
t.strictEqual(count++, 0) | ||
socket.on('data', (d) => { | ||
recvData += d | ||
}) | ||
socket.on('end', () => { | ||
t.strictEqual(recvData.toString(), 'Body') | ||
}) | ||
socket.write('Body') | ||
socket.end() | ||
}, | ||
onData (buf) { | ||
t.fail() | ||
}, | ||
onComplete (trailers) { | ||
t.fail() | ||
}, | ||
onError () { | ||
t.fail() | ||
} | ||
}) | ||
}) | ||
}) |
@@ -9,4 +9,3 @@ 'use strict' | ||
const { kSocket } = require('../lib/symbols') | ||
const { InvalidArgumentError } = require('../lib/errors') | ||
const { kSocket } = require('../lib/core/symbols') | ||
@@ -245,4 +244,2 @@ test('GET errors and reconnect with pipelining 1', (t) => { | ||
test('invalid options throws', (t) => { | ||
t.plan(38) | ||
try { | ||
@@ -278,11 +275,2 @@ new Client({ port: 'foobar' }) // eslint-disable-line | ||
new Client(new URL('http://localhost:200'), { // eslint-disable-line | ||
maxAbortedPayload: 'asd' | ||
}) | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid maxAbortedPayload') | ||
} | ||
try { | ||
new Client(new URL('http://localhost:200'), { // eslint-disable-line | ||
socketPath: 1 | ||
@@ -315,7 +303,7 @@ }) | ||
new Client(new URL('http://localhost:200'), { // eslint-disable-line | ||
maxKeepAliveTimeout: 'asd' | ||
keepAliveMaxTimeout: 'asd' | ||
}) // eslint-disable-line | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid maxKeepAliveTimeout') | ||
t.strictEqual(err.message, 'invalid keepAliveMaxTimeout') | ||
} | ||
@@ -325,7 +313,7 @@ | ||
new Client(new URL('http://localhost:200'), { // eslint-disable-line | ||
maxKeepAliveTimeout: 0 | ||
keepAliveMaxTimeout: 0 | ||
}) // eslint-disable-line | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid maxKeepAliveTimeout') | ||
t.strictEqual(err.message, 'invalid keepAliveMaxTimeout') | ||
} | ||
@@ -352,11 +340,2 @@ | ||
try { | ||
new Client(new URL('http://localhost:200'), { // eslint-disable-line | ||
requestTimeout: 'asd' | ||
}) | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid requestTimeout') | ||
} | ||
try { | ||
new Client({ // eslint-disable-line | ||
@@ -410,2 +389,11 @@ protocol: 'asd' | ||
} | ||
try { | ||
new Client(new URL('http://localhost:200'), { keepAlive: 'true' }) // eslint-disable-line | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid keepAlive') | ||
} | ||
t.end() | ||
}) | ||
@@ -697,3 +685,3 @@ | ||
test('queued request should not fail on socket destroy', (t) => { | ||
t.plan(2) | ||
t.plan(4) | ||
@@ -717,3 +705,5 @@ const server = createServer() | ||
t.error(err) | ||
data.body.resume() | ||
data.body.resume().on('error', () => { | ||
t.pass() | ||
}) | ||
client[kSocket].destroy() | ||
@@ -725,3 +715,5 @@ client.request({ | ||
t.error(err) | ||
data.body.resume() | ||
data.body.resume().on('end', () => { | ||
t.pass() | ||
}) | ||
}) | ||
@@ -733,3 +725,3 @@ }) | ||
test('queued request should fail on client destroy', (t) => { | ||
t.plan(5) | ||
t.plan(6) | ||
@@ -755,2 +747,5 @@ const server = createServer() | ||
data.body.resume() | ||
.on('error', () => { | ||
t.pass() | ||
}) | ||
client.destroy((err) => { | ||
@@ -786,3 +781,3 @@ t.error(err) | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
t.tearDown(client.close.bind(client)) | ||
@@ -897,3 +892,3 @@ client.request({ | ||
test('invalid signal', (t) => { | ||
t.plan(3) | ||
t.plan(8) | ||
@@ -903,12 +898,19 @@ const client = new Client('http://localhost:3333') | ||
client.request({ path: '/', method: 'GET', signal: {} }, (err) => { | ||
t.ok(err instanceof InvalidArgumentError) | ||
let ticked = false | ||
client.request({ path: '/', method: 'GET', signal: {}, opaque: 'asd' }, (err, { opaque }) => { | ||
t.strictEqual(ticked, true) | ||
t.strictEqual(opaque, 'asd') | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
client.pipeline({ path: '/', method: 'GET', signal: {} }, () => {}) | ||
.on('error', (err) => { | ||
t.ok(err instanceof InvalidArgumentError) | ||
t.strictEqual(ticked, true) | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
client.stream({ path: '/', method: 'GET', signal: {} }, () => {}, (err) => { | ||
t.ok(err instanceof InvalidArgumentError) | ||
client.stream({ path: '/', method: 'GET', signal: {}, opaque: 'asd' }, () => {}, (err, { opaque }) => { | ||
t.strictEqual(ticked, true) | ||
t.strictEqual(opaque, 'asd') | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
ticked = true | ||
}) |
@@ -5,3 +5,5 @@ 'use strict' | ||
const { Client } = require('..') | ||
const { kConnect } = require('../lib/core/symbols') | ||
const { createServer } = require('net') | ||
const http = require('http') | ||
@@ -164,3 +166,3 @@ test('keep-alive header', (t) => { | ||
idleTimeout: 30e3, | ||
maxKeepAliveTimeout: 1e3 | ||
keepAliveMaxTimeout: 1e3 | ||
}) | ||
@@ -186,1 +188,97 @@ t.teardown(client.destroy.bind(client)) | ||
}) | ||
test('connection close', (t) => { | ||
t.plan(4) | ||
let close = false | ||
const server = createServer((socket) => { | ||
if (close) { | ||
return | ||
} | ||
close = true | ||
socket.write('HTTP/1.1 200 OK\r\n') | ||
socket.write('Content-Length: 0\r\n') | ||
socket.write('Connection: close\r\n') | ||
socket.write('\r\n\r\n') | ||
}) | ||
t.teardown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 2 | ||
}) | ||
t.teardown(client.destroy.bind(client)) | ||
client[kConnect](() => { | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, { body }) => { | ||
t.error(err) | ||
body.on('end', () => { | ||
const timeout = setTimeout(() => { | ||
t.fail() | ||
}, 3e3) | ||
client.once('disconnect', () => { | ||
close = false | ||
t.pass() | ||
clearTimeout(timeout) | ||
}) | ||
}).resume() | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, { body }) => { | ||
t.error(err) | ||
body.on('end', () => { | ||
const timeout = setTimeout(() => { | ||
t.fail() | ||
}, 3e3) | ||
client.once('disconnect', () => { | ||
t.pass() | ||
clearTimeout(timeout) | ||
}) | ||
}).resume() | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('Disable keep alive', (t) => { | ||
t.plan(7) | ||
const ports = [] | ||
const server = http.createServer((req, res) => { | ||
t.false(ports.includes(req.socket.remotePort)) | ||
ports.push(req.socket.remotePort) | ||
t.match(req.headers, { connection: 'close' }) | ||
res.writeHead(200, { connection: 'close' }) | ||
res.end() | ||
}) | ||
t.teardown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { keepAlive: false }) | ||
t.teardown(client.destroy.bind(client)) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, { body }) => { | ||
t.error(err) | ||
body.on('end', () => { | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, { body }) => { | ||
t.error(err) | ||
body.on('end', () => { | ||
t.pass() | ||
}).resume() | ||
}) | ||
}).resume() | ||
}) | ||
}) | ||
}) |
@@ -415,6 +415,8 @@ 'use strict' | ||
test('pipeline abort res', (t) => { | ||
t.plan(1) | ||
t.plan(2) | ||
let _res | ||
const server = createServer((req, res) => { | ||
res.write('asd') | ||
_res = res | ||
}) | ||
@@ -433,2 +435,10 @@ t.tearDown(server.close.bind(server)) | ||
body.destroy() | ||
_res.write('asdasdadasd') | ||
const timeout = setTimeout(() => { | ||
t.fail() | ||
}, 100) | ||
client.on('disconnect', () => { | ||
clearTimeout(timeout) | ||
t.pass() | ||
}) | ||
}) | ||
@@ -750,3 +760,3 @@ return body | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
t.tearDown(client.destroy.bind(client)) | ||
@@ -973,2 +983,5 @@ client | ||
res.write('asd') | ||
setImmediate(() => { | ||
res.write('asd') | ||
}) | ||
}) | ||
@@ -975,0 +988,0 @@ t.tearDown(server.close.bind(server)) |
@@ -7,2 +7,4 @@ 'use strict' | ||
const { finished, Readable } = require('stream') | ||
const { kConnect } = require('../lib/core/symbols') | ||
const EE = require('events') | ||
@@ -388,3 +390,3 @@ test('20 times GET with pipelining 10', (t) => { | ||
test('pipelining HEAD busy', (t) => { | ||
t.plan(6) | ||
t.plan(7) | ||
@@ -403,10 +405,87 @@ const server = createServer() | ||
{ | ||
client[kConnect](() => { | ||
let ended = false | ||
client.once('disconnect', () => { | ||
t.strictEqual(ended, true) | ||
}) | ||
{ | ||
const body = new Readable({ | ||
read () { } | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
body | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body | ||
.resume() | ||
.on('end', () => { | ||
t.pass() | ||
}) | ||
}) | ||
body.push(null) | ||
t.strictEqual(client.busy, true) | ||
} | ||
{ | ||
const body = new Readable({ | ||
read () { } | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'HEAD', | ||
body | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body | ||
.resume() | ||
.on('end', () => { | ||
ended = true | ||
t.pass() | ||
}) | ||
}) | ||
body.push(null) | ||
t.strictEqual(client.busy, true) | ||
} | ||
}) | ||
}) | ||
}) | ||
test('pipelining empty pipeline before reset', (t) => { | ||
t.plan(7) | ||
let c = 0 | ||
const server = createServer() | ||
server.on('request', (req, res) => { | ||
if (c++ === 0) { | ||
res.end('asd') | ||
} else { | ||
setTimeout(() => { | ||
res.end('asd') | ||
}, 100) | ||
} | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 10 | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
client[kConnect](() => { | ||
let ended = false | ||
client.once('disconnect', () => { | ||
t.strictEqual(ended, true) | ||
}) | ||
const body = new Readable({ | ||
read () { } | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
body | ||
method: 'GET' | ||
}, (err, data) => { | ||
@@ -418,16 +497,11 @@ t.error(err) | ||
t.pass() | ||
body.push(null) | ||
}) | ||
}) | ||
body.push(null) | ||
t.strictEqual(client.busy, false) | ||
} | ||
{ | ||
const body = new Readable({ | ||
read () { } | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'HEAD', | ||
body | ||
body: 'asd' | ||
}, (err, data) => { | ||
@@ -438,8 +512,9 @@ t.error(err) | ||
.on('end', () => { | ||
ended = true | ||
t.pass() | ||
}) | ||
}) | ||
body.push(null) | ||
t.strictEqual(client.busy, true) | ||
} | ||
t.strictEqual(client.running, 2) | ||
}) | ||
}) | ||
@@ -449,3 +524,3 @@ }) | ||
test('pipelining idempotent busy', (t) => { | ||
t.plan(6) | ||
t.plan(12) | ||
@@ -481,26 +556,66 @@ const server = createServer() | ||
body.push(null) | ||
t.strictEqual(client.busy, false) | ||
t.strictEqual(client.busy, true) | ||
} | ||
{ | ||
const body = new Readable({ | ||
read () { } | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
idempotent: false, | ||
body | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body | ||
.resume() | ||
.on('end', () => { | ||
t.pass() | ||
}) | ||
}) | ||
body.push(null) | ||
t.strictEqual(client.busy, true) | ||
} | ||
client[kConnect](() => { | ||
{ | ||
const body = new Readable({ | ||
read () { } | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
body | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body | ||
.resume() | ||
.on('end', () => { | ||
t.pass() | ||
}) | ||
}) | ||
body.push(null) | ||
t.strictEqual(client.busy, true) | ||
} | ||
{ | ||
const signal = new EE() | ||
const body = new Readable({ | ||
read () { } | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
body, | ||
signal | ||
}, (err, data) => { | ||
t.ok(err) | ||
}) | ||
t.strictEqual(client.busy, true) | ||
signal.emit('abort') | ||
t.strictEqual(client.busy, true) | ||
} | ||
{ | ||
const body = new Readable({ | ||
read () { } | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
idempotent: false, | ||
body | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body | ||
.resume() | ||
.on('end', () => { | ||
t.pass() | ||
}) | ||
}) | ||
body.push(null) | ||
t.strictEqual(client.busy, true) | ||
} | ||
}) | ||
}) | ||
}) |
@@ -9,3 +9,3 @@ 'use strict' | ||
test('multiple reconnect', (t) => { | ||
t.plan(2) | ||
t.plan(3) | ||
@@ -25,3 +25,7 @@ const clock = FakeTimers.install() | ||
t.error(err) | ||
data.body.resume() | ||
data.body | ||
.resume() | ||
.on('end', () => { | ||
t.pass() | ||
}) | ||
}) | ||
@@ -28,0 +32,0 @@ |
@@ -7,3 +7,4 @@ 'use strict' | ||
const EE = require('events') | ||
const { kConnect } = require('../lib/symbols') | ||
const { kConnect } = require('../lib/core/symbols') | ||
const { Readable } = require('stream') | ||
@@ -22,3 +23,3 @@ test('request abort before headers', (t) => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
t.tearDown(client.destroy.bind(client)) | ||
@@ -43,1 +44,27 @@ client[kConnect](() => { | ||
}) | ||
test('request body destroyed on invalid callback', (t) => { | ||
t.plan(1) | ||
const server = createServer((req, res) => { | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
const body = new Readable({ | ||
read () {} | ||
}) | ||
try { | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
body | ||
}, null) | ||
} catch (err) { | ||
t.strictEqual(body.destroyed, true) | ||
} | ||
}) | ||
}) |
@@ -6,3 +6,3 @@ 'use strict' | ||
const { createServer } = require('http') | ||
const { PassThrough, Writable } = require('stream') | ||
const { PassThrough, Writable, Readable } = require('stream') | ||
const EE = require('events') | ||
@@ -196,5 +196,3 @@ | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
maxAbortedPayload: 1e5 | ||
}) | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
@@ -460,5 +458,2 @@ | ||
}) | ||
client.on('disconnect', () => { | ||
t.fail() | ||
}) | ||
}) | ||
@@ -522,2 +517,32 @@ }) | ||
test('stream abort before dispatch', (t) => { | ||
t.plan(1) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
const pt = new PassThrough() | ||
const signal = new EE() | ||
client.stream({ | ||
path: '/', | ||
method: 'GET', | ||
signal | ||
}, () => { | ||
return pt | ||
}, (err) => { | ||
t.ok(err instanceof errors.RequestAbortedError) | ||
}) | ||
signal.emit('abort') | ||
client.on('disconnect', () => { | ||
t.fail() | ||
}) | ||
}) | ||
}) | ||
test('trailers', (t) => { | ||
@@ -607,1 +632,27 @@ t.plan(2) | ||
}) | ||
test('stream body destroyed on invalid callback', (t) => { | ||
t.plan(1) | ||
const server = createServer((req, res) => { | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
const body = new Readable({ | ||
read () {} | ||
}) | ||
try { | ||
client.stream({ | ||
path: '/', | ||
method: 'GET', | ||
body | ||
}, () => {}, null) | ||
} catch (err) { | ||
t.strictEqual(body.destroyed, true) | ||
} | ||
}) | ||
}) |
@@ -118,5 +118,5 @@ 'use strict' | ||
}) | ||
c.on('end', () => { | ||
c.end() | ||
c.on('error', () => { | ||
// Whether we get an error, end or close is undefined. | ||
// Ignore error. | ||
}) | ||
@@ -273,3 +273,3 @@ }) | ||
test('upgrade aborted', (t) => { | ||
t.plan(3) | ||
t.plan(4) | ||
@@ -288,3 +288,3 @@ const server = http.createServer((req, res) => { | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
t.tearDown(client.destroy.bind(client)) | ||
@@ -302,2 +302,6 @@ const signal = new EE() | ||
signal.emit('abort') | ||
client.close(() => { | ||
t.pass() | ||
}) | ||
}) | ||
@@ -319,2 +323,5 @@ }) | ||
c.end() | ||
c.on('error', () => { | ||
}) | ||
signal.emit('abort') | ||
@@ -338,1 +345,67 @@ }) | ||
}) | ||
test('basic upgrade error', (t) => { | ||
t.plan(2) | ||
const server = net.createServer((c) => { | ||
c.on('data', (d) => { | ||
c.write('HTTP/1.1 101\r\n') | ||
c.write('hello: world\r\n') | ||
c.write('connection: upgrade\r\n') | ||
c.write('upgrade: websocket\r\n') | ||
c.write('\r\n') | ||
c.write('Body') | ||
}) | ||
c.on('error', () => { | ||
}) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
const _err = new Error() | ||
client.upgrade({ | ||
path: '/', | ||
method: 'GET', | ||
protocol: 'Websocket' | ||
}, (err, data) => { | ||
t.error(err) | ||
data.socket.on('error', (err) => { | ||
t.strictEqual(err, _err) | ||
}) | ||
throw _err | ||
}) | ||
}) | ||
}) | ||
test('upgrade invalid signal', (t) => { | ||
t.plan(2) | ||
const server = net.createServer(() => { | ||
t.fail() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.on('disconnect', () => { | ||
t.fail() | ||
}) | ||
client.upgrade({ | ||
path: '/', | ||
method: 'GET', | ||
protocol: 'Websocket', | ||
signal: 'error', | ||
opaque: 'asd' | ||
}, (err, { opaque }) => { | ||
t.strictEqual(opaque, 'asd') | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
}) | ||
}) |
@@ -8,3 +8,3 @@ 'use strict' | ||
const { Readable } = require('stream') | ||
const { kSocket } = require('../lib/symbols') | ||
const { kSocket } = require('../lib/core/symbols') | ||
const EE = require('events') | ||
@@ -76,3 +76,3 @@ | ||
const server = createServer((req, res) => { | ||
t.strictEqual('/', req.url) | ||
t.strictEqual('/123', req.url) | ||
t.strictEqual('HEAD', req.method) | ||
@@ -89,3 +89,3 @@ t.strictEqual('localhost', req.headers.host) | ||
client.request({ path: '/', method: 'HEAD' }, (err, { statusCode, headers, body }) => { | ||
client.request({ path: '/123', method: 'HEAD' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
@@ -101,3 +101,3 @@ t.strictEqual(statusCode, 200) | ||
client.request({ path: '/', method: 'HEAD' }, (err, { statusCode, headers, body }) => { | ||
client.request({ path: '/123', method: 'HEAD' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
@@ -627,3 +627,3 @@ t.strictEqual(statusCode, 200) | ||
test('multiple destroy callback', (t) => { | ||
t.plan(3) | ||
t.plan(4) | ||
@@ -645,3 +645,7 @@ const server = createServer((req, res) => { | ||
t.error(err) | ||
data.body.resume() | ||
data.body | ||
.resume() | ||
.on('error', () => { | ||
t.pass() | ||
}) | ||
client.destroy(new Error(), (err) => { | ||
@@ -658,3 +662,3 @@ t.error(err) | ||
test('only one streaming req at a time', (t) => { | ||
t.plan(6) | ||
t.plan(7) | ||
@@ -703,3 +707,7 @@ const server = createServer((req, res) => { | ||
t.error(err) | ||
data.body.resume() | ||
data.body | ||
.resume() | ||
.on('end', () => { | ||
t.pass() | ||
}) | ||
}) | ||
@@ -712,3 +720,3 @@ t.strictEqual(client.busy, true) | ||
test('300 requests succeed', (t) => { | ||
t.plan(300 * 2) | ||
t.plan(300 * 3) | ||
@@ -732,2 +740,4 @@ const server = createServer((req, res) => { | ||
t.strictEqual(chunk.toString(), 'asd') | ||
}).on('end', () => { | ||
t.pass() | ||
}) | ||
@@ -734,0 +744,0 @@ }) |
@@ -6,3 +6,3 @@ 'use strict' | ||
const { createServer } = require('http') | ||
const { kSocket } = require('../lib/symbols') | ||
const { kSocket } = require('../lib/core/symbols') | ||
@@ -9,0 +9,0 @@ test('close waits for queued requests to finish', (t) => { |
@@ -7,3 +7,3 @@ 'use strict' | ||
const { Readable } = require('stream') | ||
const { kConnect } = require('../lib/symbols') | ||
const { kConnect } = require('../lib/core/symbols') | ||
@@ -77,22 +77,22 @@ test('GET and HEAD with body should reset connection', (t) => { | ||
test('GET with body should work when target parses body as request', (t) => { | ||
t.plan(4) | ||
// TODO: Avoid external dependency. | ||
// test('GET with body should work when target parses body as request', (t) => { | ||
// t.plan(4) | ||
// This URL will send double responses when receiving a | ||
// GET request with body. | ||
// TODO: Avoid external dependency. | ||
const client = new Client('http://feeds.bbci.co.uk') | ||
t.teardown(client.close.bind(client)) | ||
// // This URL will send double responses when receiving a | ||
// // GET request with body. | ||
// const client = new Client('http://feeds.bbci.co.uk') | ||
// t.teardown(client.close.bind(client)) | ||
client.request({ method: 'GET', path: '/news/rss.xml', body: 'asd' }, (err, data) => { | ||
t.error(err) | ||
t.strictEqual(data.statusCode, 200) | ||
data.body.resume() | ||
}) | ||
client.request({ method: 'GET', path: '/news/rss.xml', body: 'asd' }, (err, data) => { | ||
t.error(err) | ||
t.strictEqual(data.statusCode, 200) | ||
data.body.resume() | ||
}) | ||
}) | ||
// client.request({ method: 'GET', path: '/news/rss.xml', body: 'asd' }, (err, data) => { | ||
// t.error(err) | ||
// t.strictEqual(data.statusCode, 200) | ||
// data.body.resume() | ||
// }) | ||
// client.request({ method: 'GET', path: '/news/rss.xml', body: 'asd' }, (err, data) => { | ||
// t.error(err) | ||
// t.strictEqual(data.statusCode, 200) | ||
// data.body.resume() | ||
// }) | ||
// }) | ||
@@ -99,0 +99,0 @@ test('HEAD should reset connection', (t) => { |
@@ -6,3 +6,3 @@ 'use strict' | ||
const { createServer } = require('http') | ||
const { kConnect } = require('../lib/symbols') | ||
const { kConnect } = require('../lib/core/symbols') | ||
@@ -34,3 +34,3 @@ test('pipeline pipelining', (t) => { | ||
}, ({ body }) => body).end().resume() | ||
t.strictEqual(client.busy, false) | ||
t.strictEqual(client.busy, true) | ||
t.strictDeepEqual(client.running, 0) | ||
@@ -87,3 +87,3 @@ t.strictDeepEqual(client.pending, 1) | ||
}) | ||
t.strictEqual(client.busy, false) | ||
t.strictEqual(client.busy, true) | ||
t.strictDeepEqual(client.running, 0) | ||
@@ -96,3 +96,3 @@ t.strictDeepEqual(client.pending, 1) | ||
}, ({ body }) => body).end().resume() | ||
t.strictEqual(client.busy, false) | ||
t.strictEqual(client.busy, true) | ||
t.strictDeepEqual(client.running, 0) | ||
@@ -99,0 +99,0 @@ t.strictDeepEqual(client.pending, 2) |
523
test/pool.js
@@ -10,5 +10,6 @@ 'use strict' | ||
const { promisify } = require('util') | ||
const { PassThrough } = require('stream') | ||
const { PassThrough, Readable } = require('stream') | ||
const eos = require('stream').finished | ||
const net = require('net') | ||
const EE = require('events') | ||
@@ -196,3 +197,3 @@ test('basic get', (t) => { | ||
request (req, cb) { | ||
dispatch (req, cb) { | ||
seen.push({ req, cb, client: this, id: this.id }) | ||
@@ -203,3 +204,3 @@ } | ||
const Pool = proxyquire('../lib/pool', { | ||
'./client': FakeClient | ||
'./core/client': FakeClient | ||
}) | ||
@@ -211,4 +212,4 @@ | ||
pool.request({}, noop) | ||
pool.request({}, noop) | ||
pool.dispatch({}, noop) | ||
pool.dispatch({}, noop) | ||
@@ -222,7 +223,7 @@ const d1 = seen.shift() // d1 = c0 | ||
pool.request({}, noop) // d3 = c0 | ||
pool.dispatch({}, noop) // d3 = c0 | ||
d1.client._busy = true | ||
pool.request({}, noop) // d4 = c1 | ||
pool.dispatch({}, noop) // d4 = c1 | ||
@@ -237,7 +238,7 @@ const d3 = seen.shift() | ||
pool.request({}, noop) // d5 = c1 | ||
pool.dispatch({}, noop) // d5 = c1 | ||
d1.client._busy = false | ||
pool.request({}, noop) // d6 = c0 | ||
pool.dispatch({}, noop) // d6 = c0 | ||
@@ -423,2 +424,4 @@ const d5 = seen.shift() | ||
}, { | ||
onConnect () { | ||
}, | ||
onHeaders (statusCode, headers) { | ||
@@ -436,1 +439,503 @@ t.strictEqual(statusCode, 200) | ||
}) | ||
test('pool pipeline args validation', (t) => { | ||
t.plan(2) | ||
const client = new Pool('http://localhost:5000') | ||
const ret = client.pipeline(null, () => {}) | ||
ret.on('error', (err) => { | ||
t.ok(/opts/.test(err.message)) | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
}) | ||
test('300 requests succeed', (t) => { | ||
t.plan(300 * 3) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Pool(`http://localhost:${server.address().port}`, { | ||
connections: 1 | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
for (let n = 0; n < 300; ++n) { | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body.on('data', (chunk) => { | ||
t.strictEqual(chunk.toString(), 'asd') | ||
}).on('end', () => { | ||
t.pass() | ||
}) | ||
}) | ||
} | ||
}) | ||
}) | ||
test('pool connect error', (t) => { | ||
t.plan(1) | ||
const server = createServer((c) => { | ||
t.fail() | ||
}) | ||
server.on('connect', (req, socket, firstBodyChunk) => { | ||
socket.destroy() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = new Pool(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
try { | ||
await client.connect({ | ||
path: '/' | ||
}) | ||
} catch (err) { | ||
t.ok(err) | ||
} | ||
}) | ||
}) | ||
test('pool upgrade error', (t) => { | ||
t.plan(1) | ||
const server = net.createServer((c) => { | ||
c.on('data', (d) => { | ||
c.write('HTTP/1.1 101\r\n') | ||
c.write('hello: world\r\n') | ||
c.write('connection: upgrade\r\n') | ||
c.write('\r\n') | ||
c.write('Body') | ||
}) | ||
c.on('error', () => { | ||
// Whether we get an error, end or close is undefined. | ||
// Ignore error. | ||
}) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = new Pool(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
try { | ||
await client.upgrade({ | ||
path: '/', | ||
method: 'GET', | ||
protocol: 'Websocket' | ||
}) | ||
} catch (err) { | ||
t.ok(err) | ||
} | ||
}) | ||
}) | ||
test('pool dispatch error', (t) => { | ||
t.plan(3) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = new Pool(`http://localhost:${server.address().port}`, { | ||
connections: 1, | ||
pipelining: 1 | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
client.dispatch({ | ||
path: '/', | ||
method: 'GET' | ||
}, { | ||
onConnect () { | ||
}, | ||
onHeaders (statusCode, headers) { | ||
t.strictEqual(statusCode, 200) | ||
}, | ||
onData (chunk) { | ||
}, | ||
onComplete () { | ||
t.pass() | ||
} | ||
}) | ||
client.dispatch({ | ||
path: '/', | ||
method: 'GET', | ||
headers: { | ||
'transfer-encoding': 'fail' | ||
} | ||
}, { | ||
onConnect () { | ||
t.fail() | ||
}, | ||
onHeaders (statusCode, headers) { | ||
t.fail() | ||
}, | ||
onData (chunk) { | ||
t.fail() | ||
}, | ||
onError (err) { | ||
t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') | ||
} | ||
}) | ||
}) | ||
}) | ||
test('pool request abort in queue', (t) => { | ||
t.plan(3) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = new Pool(`http://localhost:${server.address().port}`, { | ||
connections: 1, | ||
pipelining: 1 | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
client.dispatch({ | ||
path: '/', | ||
method: 'GET' | ||
}, { | ||
onConnect () { | ||
}, | ||
onHeaders (statusCode, headers) { | ||
t.strictEqual(statusCode, 200) | ||
}, | ||
onData (chunk) { | ||
}, | ||
onComplete () { | ||
t.pass() | ||
} | ||
}) | ||
const signal = new EE() | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
signal | ||
}, (err) => { | ||
t.strictEqual(err.code, 'UND_ERR_ABORTED') | ||
}) | ||
signal.emit('abort') | ||
}) | ||
}) | ||
test('pool stream abort in queue', (t) => { | ||
t.plan(3) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = new Pool(`http://localhost:${server.address().port}`, { | ||
connections: 1, | ||
pipelining: 1 | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
client.dispatch({ | ||
path: '/', | ||
method: 'GET' | ||
}, { | ||
onConnect () { | ||
}, | ||
onHeaders (statusCode, headers) { | ||
t.strictEqual(statusCode, 200) | ||
}, | ||
onData (chunk) { | ||
}, | ||
onComplete () { | ||
t.pass() | ||
} | ||
}) | ||
const signal = new EE() | ||
client.stream({ | ||
path: '/', | ||
method: 'GET', | ||
signal | ||
}, ({ body }) => body, (err) => { | ||
t.strictEqual(err.code, 'UND_ERR_ABORTED') | ||
}) | ||
signal.emit('abort') | ||
}) | ||
}) | ||
test('pool pipeline abort in queue', (t) => { | ||
t.plan(3) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = new Pool(`http://localhost:${server.address().port}`, { | ||
connections: 1, | ||
pipelining: 1 | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
client.dispatch({ | ||
path: '/', | ||
method: 'GET' | ||
}, { | ||
onConnect () { | ||
}, | ||
onHeaders (statusCode, headers) { | ||
t.strictEqual(statusCode, 200) | ||
}, | ||
onData (chunk) { | ||
}, | ||
onComplete () { | ||
t.pass() | ||
} | ||
}) | ||
const signal = new EE() | ||
client.pipeline({ | ||
path: '/', | ||
method: 'GET', | ||
signal | ||
}, ({ body }) => body).end().on('error', (err) => { | ||
t.strictEqual(err.code, 'UND_ERR_ABORTED') | ||
}) | ||
signal.emit('abort') | ||
}) | ||
}) | ||
test('pool stream constructor error destroy body', (t) => { | ||
t.plan(4) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = new Pool(`http://localhost:${server.address().port}`, { | ||
connections: 1, | ||
pipelining: 1 | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
{ | ||
const body = new Readable({ | ||
read () { | ||
} | ||
}) | ||
client.stream({ | ||
path: '/', | ||
method: 'GET', | ||
body, | ||
headers: { | ||
'transfer-encoding': 'fail' | ||
} | ||
}, () => { | ||
t.fail() | ||
}, (err) => { | ||
t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') | ||
t.strictEqual(body.destroyed, true) | ||
}) | ||
} | ||
{ | ||
const body = new Readable({ | ||
read () { | ||
} | ||
}) | ||
client.stream({ | ||
path: '/', | ||
method: 'CONNECT', | ||
body | ||
}, () => { | ||
t.fail() | ||
}, (err) => { | ||
t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') | ||
t.strictEqual(body.destroyed, true) | ||
}) | ||
} | ||
}) | ||
}) | ||
test('pool request constructor error destroy body', (t) => { | ||
t.plan(4) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = new Pool(`http://localhost:${server.address().port}`, { | ||
connections: 1, | ||
pipelining: 1 | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
{ | ||
const body = new Readable({ | ||
read () { | ||
} | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
body, | ||
headers: { | ||
'transfer-encoding': 'fail' | ||
} | ||
}, (err) => { | ||
t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') | ||
t.strictEqual(body.destroyed, true) | ||
}) | ||
} | ||
{ | ||
const body = new Readable({ | ||
read () { | ||
} | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'CONNECT', | ||
body | ||
}, (err) => { | ||
t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') | ||
t.strictEqual(body.destroyed, true) | ||
}) | ||
} | ||
}) | ||
}) | ||
test('pool close waits for all requests', (t) => { | ||
t.plan(5) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Pool(`http://localhost:${server.address().port}`, { | ||
connections: 1, | ||
pipelining: 1 | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err) => { | ||
t.error(err) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err) => { | ||
t.error(err) | ||
}) | ||
client.close(() => { | ||
t.pass() | ||
}) | ||
client.close(() => { | ||
t.pass() | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err) => { | ||
t.ok(err instanceof errors.ClientClosedError) | ||
}) | ||
}) | ||
}) | ||
test('pool destroyed', (t) => { | ||
t.plan(1) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Pool(`http://localhost:${server.address().port}`, { | ||
connections: 1, | ||
pipelining: 1 | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.destroy() | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err) => { | ||
t.ok(err instanceof errors.ClientDestroyedError) | ||
}) | ||
}) | ||
}) | ||
test('pool destroy fails queued requests', (t) => { | ||
t.plan(4) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = new Pool(`http://localhost:${server.address().port}`, { | ||
connections: 1, | ||
pipelining: 1 | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
const _err = new Error() | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err) => { | ||
t.strictEqual(err, _err) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err) => { | ||
t.strictEqual(err, _err) | ||
}) | ||
client.destroy(_err, () => { | ||
t.pass() | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err) => { | ||
t.ok(err instanceof errors.ClientDestroyedError) | ||
}) | ||
}) | ||
}) |
@@ -5,2 +5,3 @@ 'use strict' | ||
const { Client, errors } = require('..') | ||
const { kConnect } = require('../lib/core/symbols') | ||
const { createServer } = require('http') | ||
@@ -43,4 +44,4 @@ const EventEmitter = require('events') | ||
test('request timeout immutable opts', (t) => { | ||
t.plan(2) | ||
test('Subsequent request starves', (t) => { | ||
t.plan(3) | ||
@@ -54,3 +55,3 @@ const clock = FakeTimers.install() | ||
}, 100) | ||
clock.tick(100) | ||
clock.tick(50) | ||
}) | ||
@@ -61,36 +62,13 @@ t.teardown(server.close.bind(server)) | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
requestTimeout: 50 | ||
pipelining: 2 | ||
}) | ||
t.teardown(client.destroy.bind(client)) | ||
const opts = { path: '/', method: 'GET' } | ||
client.request(opts, (err, response) => { | ||
t.ok(err instanceof errors.RequestTimeoutError) | ||
t.strictEqual(opts.requestTimeout, undefined) | ||
}) | ||
clock.tick(50) | ||
}) | ||
}) | ||
test('Subsequent request starves', (t) => { | ||
t.plan(2) | ||
const clock = FakeTimers.install() | ||
t.teardown(clock.uninstall.bind(clock)) | ||
const server = createServer((req, res) => { | ||
setTimeout(() => { | ||
res.end('hello') | ||
}, 100) | ||
clock.tick(50) | ||
}) | ||
t.teardown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.teardown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, response) => { | ||
t.error(err) | ||
response.body | ||
.resume() | ||
.on('end', () => { | ||
t.pass() | ||
}) | ||
}) | ||
@@ -267,37 +245,2 @@ | ||
test('If a request starves, the server should never receive the request', (t) => { | ||
t.plan(4) | ||
const clock = FakeTimers.install() | ||
t.teardown(clock.uninstall.bind(clock)) | ||
let count = 0 | ||
const server = createServer((req, res) => { | ||
count += 1 | ||
t.is(count, 1) | ||
setTimeout(() => { | ||
res.end('hello') | ||
}, 100) | ||
clock.tick(50) | ||
}) | ||
t.teardown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.teardown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET', requestTimeout: 50 }, (err, response) => { | ||
t.ok(err instanceof errors.RequestTimeoutError) | ||
}) | ||
client.request({ path: '/', method: 'GET', requestTimeout: 50 }, (err, response) => { | ||
t.ok(err instanceof errors.RequestTimeoutError) | ||
}) | ||
client.request({ path: '/', method: 'GET', requestTimeout: 50 }, (err, response) => { | ||
t.ok(err instanceof errors.RequestTimeoutError) | ||
}) | ||
}) | ||
}) | ||
test('Timeout with pipelining', (t) => { | ||
@@ -350,6 +293,6 @@ t.plan(3) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { requestTimeout: 50 }) | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.teardown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, response) => { | ||
client.request({ path: '/', method: 'GET', requestTimeout: 50 }, (err, response) => { | ||
t.ok(err instanceof errors.RequestTimeoutError) | ||
@@ -377,3 +320,3 @@ }) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { requestTimeout: 100 }) | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.teardown(client.destroy.bind(client)) | ||
@@ -432,3 +375,7 @@ | ||
clock.tick(100) | ||
client.on('connect', () => { | ||
process.nextTick(() => { | ||
clock.tick(100) | ||
}) | ||
}) | ||
}) | ||
@@ -463,6 +410,6 @@ }) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { requestTimeout: 0 }) | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.teardown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, response) => { | ||
client.request({ path: '/', method: 'GET', requestTimeout: 0 }, (err, response) => { | ||
t.error(err) | ||
@@ -560,3 +507,3 @@ const bufs = [] | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { requestTimeout: 0 }) | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.teardown(client.destroy.bind(client)) | ||
@@ -641,3 +588,3 @@ | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { requestTimeout: 0 }) | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.teardown(client.destroy.bind(client)) | ||
@@ -676,1 +623,35 @@ | ||
}) | ||
test('client.close should not deadlock', (t) => { | ||
t.plan(2) | ||
const clock = FakeTimers.install() | ||
t.teardown(clock.uninstall.bind(clock)) | ||
const server = createServer((req, res) => { | ||
}) | ||
t.teardown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
socketTimeout: 200 | ||
}) | ||
t.teardown(client.destroy.bind(client)) | ||
client[kConnect](() => { | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
requestTimeout: 100 | ||
}, (err, response) => { | ||
t.ok(err instanceof errors.RequestTimeoutError) | ||
}) | ||
client.close((err) => { | ||
t.error(err) | ||
}) | ||
clock.tick(100) | ||
}) | ||
}) | ||
}) |
@@ -69,8 +69,7 @@ 'use strict' | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
socketTimeout: 0, | ||
requestTimeout: 0 | ||
socketTimeout: 0 | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, result) => { | ||
client.request({ path: '/', method: 'GET', requestTimeout: 0 }, (err, result) => { | ||
t.error(err) | ||
@@ -77,0 +76,0 @@ const bufs = [] |
@@ -5,3 +5,3 @@ 'use strict' | ||
const { Client } = require('..') | ||
const { kSocket } = require('../lib/symbols') | ||
const { kSocket } = require('../lib/core/symbols') | ||
const { Readable } = require('stream') | ||
@@ -8,0 +8,0 @@ |
@@ -19,3 +19,3 @@ 'use strict' | ||
test('path', (t) => { | ||
t.plan(6) | ||
t.plan(4) | ||
@@ -28,3 +28,2 @@ const client = new Client(url) | ||
t.strictEqual(err.message, 'path must be a valid path') | ||
t.strictEqual(res, null) | ||
}) | ||
@@ -35,3 +34,2 @@ | ||
t.strictEqual(err.message, 'path must be a valid path') | ||
t.strictEqual(res, null) | ||
}) | ||
@@ -41,3 +39,3 @@ }) | ||
test('method', (t) => { | ||
t.plan(3) | ||
t.plan(2) | ||
@@ -50,3 +48,2 @@ const client = new Client(url) | ||
t.strictEqual(err.message, 'method must be a string') | ||
t.strictEqual(res, null) | ||
}) | ||
@@ -56,3 +53,3 @@ }) | ||
test('body', (t) => { | ||
t.plan(6) | ||
t.plan(4) | ||
@@ -65,3 +62,2 @@ const client = new Client(url) | ||
t.strictEqual(err.message, 'body must be a string, a Buffer or a Readable stream') | ||
t.strictEqual(res, null) | ||
}) | ||
@@ -72,5 +68,4 @@ | ||
t.strictEqual(err.message, 'body must be a string, a Buffer or a Readable stream') | ||
t.strictEqual(res, null) | ||
}) | ||
}) | ||
}) |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
335224
65
10963
648
11
12
47