Socket
Socket
Sign inDemoInstall

undici

Package Overview
Dependencies
Maintainers
2
Versions
212
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

undici - npm Package Compare versions

Comparing version 1.3.1 to 2.0.0

CODE_OF_CONDUCT.md

22

benchmarks/index.js

@@ -6,3 +6,2 @@ 'use strict'

const undici = require('..')
const { kGetNext } = require('../lib/symbols')

@@ -33,3 +32,4 @@ // # Start the h2o server (in h2o repository)

path: '/',
method: 'GET'
method: 'GET',
requestTimeout: 0
}

@@ -39,4 +39,3 @@

connections: 100,
pipelining: 10,
requestTimeout: 0
pipelining: 10
})

@@ -137,4 +136,3 @@

})
const client = pool[kGetNext]()
client.dispatch(undiciOptions, new SimpleRequest(stream))
pool.dispatch(undiciOptions, new SimpleRequest(stream))
}

@@ -145,4 +143,3 @@ })

fn: deferred => {
const client = pool[kGetNext]()
client.dispatch(undiciOptions, new NoopRequest(deferred))
pool.dispatch(undiciOptions, new NoopRequest(deferred))
}

@@ -160,6 +157,10 @@ })

onHeaders () {
onConnect (abort) {
}
onHeaders (statusCode, headers, resume) {
}
onData (chunk) {

@@ -179,2 +180,5 @@ return true

onConnect (abort) {
}
onHeaders (statusCode, headers, resume) {

@@ -181,0 +185,0 @@ this.dst.on('drain', resume)

'use strict'
const Client = require('./lib/client')
const Client = require('./lib/core/client')
const errors = require('./lib/core/errors')
const Pool = require('./lib/pool')
const errors = require('./lib/errors')
Client.prototype.request = require('./lib/client-request')
Client.prototype.stream = require('./lib/client-stream')
Client.prototype.pipeline = require('./lib/client-pipeline')
Client.prototype.upgrade = require('./lib/client-upgrade')
Client.prototype.connect = require('./lib/client-connect')
Pool.prototype.request = require('./lib/client-request')
Pool.prototype.stream = require('./lib/client-stream')
Pool.prototype.pipeline = require('./lib/client-pipeline')
Pool.prototype.upgrade = require('./lib/client-upgrade')
Pool.prototype.connect = require('./lib/client-connect')
function undici (url, opts) {

@@ -8,0 +20,0 @@ return new Pool(url, opts)

'use strict'
const {
InvalidArgumentError
} = require('./errors')
InvalidArgumentError,
RequestAbortedError
} = require('./core/errors')
const { AsyncResource } = require('async_hooks')
const util = require('./core/util')
class ConnectRequest {
class ConnectHandler extends AsyncResource {
constructor (opts, callback) {
this.opaque = opts.opaque || null
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('invalid opts')
}
const { signal, opaque } = opts
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
}
super('UNDICI_CONNECT')
this.opaque = opaque || null
this.callback = callback
this.abort = null
if (signal) {
util.addListener(signal, () => {
if (this.abort) {
this.abort()
} else {
this.onError(new RequestAbortedError())
}
})
}
}
onConnect (abort) {
if (!this.callback) {
abort()
} else {
this.abort = abort
}
}
onUpgrade (statusCode, headers, socket) {

@@ -17,5 +51,5 @@ const { callback, opaque } = this

this.callback = null
callback(null, {
this.runInAsyncScope(callback, null, null, {
statusCode,
headers,
headers: util.parseHeaders(headers),
socket,

@@ -29,11 +63,15 @@ opaque

this.callback = null
callback(err, { opaque })
if (callback) {
this.callback = null
process.nextTick((self, callback, err, opaque) => {
self.runInAsyncScope(callback, null, err, { opaque })
}, this, callback, err, opaque)
}
}
}
module.exports = function connect (client, opts, callback) {
function connect (opts, callback) {
if (callback === undefined) {
return new Promise((resolve, reject) => {
connect(client, opts, (err, data) => {
connect.call(this, opts, (err, data) => {
return err ? reject(err) : resolve(data)

@@ -49,6 +87,3 @@ })

try {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('invalid opts')
}
const connectHandler = new ConnectHandler(opts, callback)
const {

@@ -61,3 +96,3 @@ path,

} = opts
client.dispatch({
this.dispatch({
path,

@@ -69,6 +104,8 @@ method: 'CONNECT',

requestTimeout
}, new ConnectRequest(opts, callback))
}, connectHandler)
} catch (err) {
process.nextTick(callback, err, null)
process.nextTick(callback, err, { opaque: opts && opts.opaque })
}
}
module.exports = connect

@@ -12,5 +12,6 @@ 'use strict'

RequestAbortedError
} = require('./errors')
const util = require('./util')
} = require('./core/errors')
const util = require('./core/util')
const { AsyncResource } = require('async_hooks')
const { assert } = require('console')

@@ -38,6 +39,3 @@ const kResume = Symbol('resume')

if (!err && !this._readableState.endEmitted) {
// This can happen if the server doesn't care
// about the entire request body.
}
assert(err || this._readableState.endEmitted)

@@ -50,8 +48,11 @@ callback(err)

constructor (resume) {
super({ autoDestroy: true, read: resume })
super({ autoDestroy: true })
this[kResume] = resume
}
_read () {
this[kResume]()
}
_destroy (err, callback) {
this._read()
if (!err && !this._readableState.endEmitted) {

@@ -67,9 +68,34 @@ err = new RequestAbortedError()

constructor (opts, handler) {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('invalid opts')
}
if (typeof handler !== 'function') {
throw new InvalidArgumentError('invalid handler')
}
const { signal, method, opaque } = opts
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
}
if (method === 'CONNECT') {
throw new InvalidArgumentError('invalid method')
}
super('UNDICI_PIPELINE')
this.opaque = opts.opaque || null
this.opaque = opaque || null
this.handler = handler
this.abort = null
this.req = new PipelineRequest()
if (signal) {
util.addListener(signal, () => {
util.destroy(this.ret, new RequestAbortedError())
})
}
this.req = new PipelineRequest().on('error', util.nop)
this.ret = new Duplex({

@@ -95,3 +121,3 @@ readableObjectMode: opts.objectMode,

destroy: (err, callback) => {
const { body, req, res, ret } = this
const { body, req, res, ret, abort } = this

@@ -102,2 +128,6 @@ if (!err && !ret._readableState.endEmitted) {

if (abort && err) {
abort()
}
util.destroy(body, err)

@@ -119,4 +149,14 @@ util.destroy(req, err)

onConnect (abort) {
const { ret } = this
if (ret.destroyed) {
abort()
} else {
this.abort = abort
}
}
onHeaders (statusCode, headers, resume) {
const { opaque, handler, ret } = this
const { opaque, handler } = this

@@ -134,3 +174,3 @@ if (statusCode < 200) {

statusCode,
headers,
headers: util.parseHeaders(headers),
opaque,

@@ -141,12 +181,7 @@ body: this.res

this.res.on('error', util.nop)
util.destroy(ret, err)
return
throw err
}
if (
!body ||
typeof body.on !== 'function'
) {
util.destroy(ret, new InvalidReturnValueError('expected Readable'))
return
if (!body || typeof body.on !== 'function') {
throw new InvalidReturnValueError('expected Readable')
}

@@ -185,7 +220,2 @@

const { res } = this
if (res._readableState.destroyed) {
return
}
return res.push(chunk)

@@ -196,7 +226,2 @@ }

const { res } = this
if (res._readableState.destroyed) {
return
}
res.push(null)

@@ -207,5 +232,3 @@ }

const { ret } = this
this.handler = null
util.destroy(ret, err)

@@ -215,18 +238,5 @@ }

module.exports = function (client, opts, handler) {
function pipeline (opts, handler) {
try {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('invalid opts')
}
if (typeof handler !== 'function') {
throw new InvalidArgumentError('invalid handler')
}
if (opts.method === 'CONNECT') {
throw new InvalidArgumentError('invalid method')
}
const pipeline = new PipelineHandler(opts, handler)
const pipelineHandler = new PipelineHandler(opts, handler)
const {

@@ -241,7 +251,6 @@ path,

} = opts
client.dispatch({
this.dispatch({
path,
method,
body: pipeline.req,
body: pipelineHandler.req,
headers,

@@ -252,5 +261,4 @@ idempotent,

requestTimeout
}, pipeline)
return pipeline.ret
}, pipelineHandler)
return pipelineHandler.ret
} catch (err) {

@@ -260,1 +268,3 @@ return new PassThrough().destroy(err)

}
module.exports = pipeline

@@ -7,14 +7,15 @@ 'use strict'

RequestAbortedError
} = require('./errors')
const util = require('./util')
} = require('./core/errors')
const util = require('./core/util')
const { AsyncResource } = require('async_hooks')
const kAbort = Symbol('abort')
class RequestResponse extends Readable {
constructor (resume) {
constructor (resume, abort) {
super({ autoDestroy: true, read: resume })
this[kAbort] = abort
}
_destroy (err, callback) {
this._read()
if (!err && !this._readableState.endEmitted) {

@@ -24,2 +25,6 @@ err = new RequestAbortedError()

if (err) {
this[kAbort]()
}
callback(err)

@@ -31,11 +36,62 @@ }

constructor (opts, callback) {
super('UNDICI_REQUEST')
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('invalid opts')
}
this.opaque = opts.opaque || null
const { signal, method, opaque, body } = opts
try {
if (typeof callback !== 'function') {
throw new InvalidArgumentError('invalid callback')
}
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
}
if (method === 'CONNECT') {
throw new InvalidArgumentError('invalid method')
}
super('UNDICI_REQUEST')
} catch (err) {
if (util.isStream(body)) {
util.destroy(body.on('error', util.nop), err)
}
throw err
}
this.opaque = opaque || null
this.callback = callback
this.res = null
this.abort = null
this.body = body
if (util.isStream(body)) {
body.on('error', (err) => {
this.onError(err)
})
}
if (signal) {
util.addListener(signal, () => {
if (this.abort) {
this.abort()
} else {
this.onError(new RequestAbortedError())
}
})
}
}
onConnect (abort) {
if (!this.callback) {
abort()
} else {
this.abort = abort
}
}
onHeaders (statusCode, headers, resume) {
const { callback, opaque } = this
const { callback, opaque, abort } = this

@@ -46,3 +102,3 @@ if (statusCode < 200) {

const body = new RequestResponse(resume)
const body = new RequestResponse(resume, abort)

@@ -54,3 +110,3 @@ this.callback = null

statusCode,
headers,
headers: util.parseHeaders(headers),
opaque,

@@ -63,7 +119,2 @@ body

const { res } = this
if (res._readableState.destroyed) {
return
}
return res.push(chunk)

@@ -74,7 +125,2 @@ }

const { res } = this
if (res._readableState.destroyed) {
return
}
res.push(null)

@@ -84,7 +130,9 @@ }

onError (err) {
const { res, callback, opaque } = this
const { res, callback, body, opaque } = this
if (callback) {
this.callback = null
this.runInAsyncScope(callback, null, err, { opaque })
process.nextTick((self, callback, err, opaque) => {
self.runInAsyncScope(callback, null, err, { opaque })
}, this, callback, err, opaque)
}

@@ -96,9 +144,14 @@

}
if (body) {
this.body = null
util.destroy(body, err)
}
}
}
module.exports = function request (client, opts, callback) {
function request (opts, callback) {
if (callback === undefined) {
return new Promise((resolve, reject) => {
request(client, opts, (err, data) => {
request.call(this, opts, (err, data) => {
return err ? reject(err) : resolve(data)

@@ -109,19 +162,13 @@ })

if (typeof callback !== 'function') {
throw new InvalidArgumentError('invalid callback')
}
try {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('invalid opts')
this.dispatch(opts, new RequestHandler(opts, callback))
} catch (err) {
if (typeof callback === 'function') {
process.nextTick(callback, err, { opaque: opts && opts.opaque })
} else {
throw err
}
if (opts.method === 'CONNECT') {
throw new InvalidArgumentError('invalid method')
}
client.dispatch(opts, new RequestHandler(opts, callback))
} catch (err) {
process.nextTick(callback, err, null)
}
}
module.exports = request

@@ -6,5 +6,6 @@ 'use strict'

InvalidArgumentError,
InvalidReturnValueError
} = require('./errors')
const util = require('./util')
InvalidReturnValueError,
RequestAbortedError
} = require('./core/errors')
const util = require('./core/util')
const { AsyncResource } = require('async_hooks')

@@ -14,13 +15,68 @@

constructor (opts, factory, callback) {
super('UNDICI_STREAM')
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('invalid opts')
}
this.opaque = opts.opaque || null
const { signal, method, opaque, body } = opts
try {
if (typeof callback !== 'function') {
throw new InvalidArgumentError('invalid callback')
}
if (typeof factory !== 'function') {
throw new InvalidArgumentError('invalid factory')
}
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
}
if (method === 'CONNECT') {
throw new InvalidArgumentError('invalid method')
}
super('UNDICI_STREAM')
} catch (err) {
if (util.isStream(body)) {
util.destroy(body.on('error', util.nop), err)
}
throw err
}
this.opaque = opaque || null
this.factory = factory
this.callback = callback
this.res = null
this.abort = null
this.trailers = null
this.body = body
if (util.isStream(body)) {
body.on('error', (err) => {
this.onError(err)
})
}
if (signal) {
util.addListener(signal, () => {
if (this.abort) {
this.abort()
} else {
this.onError(new RequestAbortedError())
}
})
}
}
onConnect (abort) {
if (!this.callback) {
abort()
} else {
this.abort = abort
}
}
onHeaders (statusCode, headers, resume) {
const { factory, callback, opaque } = this
const { factory, opaque } = this

@@ -31,23 +87,16 @@ if (statusCode < 200) {

let res
try {
this.factory = null
res = this.runInAsyncScope(factory, null, {
statusCode,
headers,
opaque
})
this.factory = null
const res = this.runInAsyncScope(factory, null, {
statusCode,
headers: util.parseHeaders(headers),
opaque
})
if (
!res ||
typeof res.write !== 'function' ||
typeof res.end !== 'function' ||
typeof res.on !== 'function'
) {
throw new InvalidReturnValueError('expected Writable')
}
} catch (err) {
this.callback = null
this.runInAsyncScope(callback, null, err, { opaque })
return
if (
!res ||
typeof res.write !== 'function' ||
typeof res.end !== 'function' ||
typeof res.on !== 'function'
) {
throw new InvalidReturnValueError('expected Writable')
}

@@ -58,3 +107,3 @@

finished(res, { readable: false }, (err) => {
const { callback, res, opaque, trailers } = this
const { callback, res, opaque, trailers, abort } = this

@@ -68,2 +117,6 @@ this.res = null

this.runInAsyncScope(callback, null, err || null, { opaque, trailers })
if (err) {
abort()
}
})

@@ -76,7 +129,2 @@

const { res } = this
if (util.isDestroyed(res)) {
return
}
return res.write(chunk)

@@ -88,8 +136,4 @@ }

if (util.isDestroyed(res)) {
return
}
this.trailers = trailers ? util.parseHeaders(trailers) : {}
this.trailers = trailers || {}
res.end()

@@ -99,3 +143,3 @@ }

onError (err) {
const { res, callback, opaque } = this
const { res, callback, opaque, body } = this

@@ -109,11 +153,18 @@ this.factory = null

this.callback = null
this.runInAsyncScope(callback, null, err, { opaque })
process.nextTick((self, callback, err, opaque) => {
self.runInAsyncScope(callback, null, err, { opaque })
}, this, callback, err, opaque)
}
if (body) {
this.body = null
util.destroy(body, err)
}
}
}
module.exports = function stream (client, opts, factory, callback) {
function stream (opts, factory, callback) {
if (callback === undefined) {
return new Promise((resolve, reject) => {
stream(client, opts, factory, (err, data) => {
stream.call(this, opts, factory, (err, data) => {
return err ? reject(err) : resolve(data)

@@ -124,23 +175,13 @@ })

if (typeof callback !== 'function') {
throw new InvalidArgumentError('invalid callback')
}
try {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('invalid opts')
this.dispatch(opts, new StreamHandler(opts, factory, callback))
} catch (err) {
if (typeof callback === 'function') {
process.nextTick(callback, err, { opaque: opts && opts.opaque })
} else {
throw err
}
if (typeof factory !== 'function') {
throw new InvalidArgumentError('invalid factory')
}
if (opts.method === 'CONNECT') {
throw new InvalidArgumentError('invalid method')
}
client.dispatch(opts, new StreamHandler(opts, factory, callback))
} catch (err) {
process.nextTick(callback, err, null)
}
}
module.exports = stream
'use strict'
const {
InvalidArgumentError
} = require('./errors')
InvalidArgumentError,
RequestAbortedError
} = require('./core/errors')
const { AsyncResource } = require('async_hooks')
const util = require('./core/util')
class UpgradeHandler {
class UpgradeHandler extends AsyncResource {
constructor (opts, callback) {
this.opaque = opts.opaque || null
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('invalid opts')
}
const { signal, opaque } = opts
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
}
super('UNDICI_UPGRADE')
this.opaque = opaque || null
this.callback = callback
this.abort = null
if (signal) {
util.addListener(signal, () => {
if (this.abort) {
this.abort()
} else {
this.onError(new RequestAbortedError())
}
})
}
}
onConnect (abort) {
if (!this.callback) {
abort()
} else {
this.abort = abort
}
}
onUpgrade (statusCode, headers, socket) {

@@ -17,4 +51,4 @@ const { callback, opaque } = this

this.callback = null
callback(null, {
headers,
this.runInAsyncScope(callback, null, null, {
headers: util.parseHeaders(headers),
socket,

@@ -28,11 +62,15 @@ opaque

this.callback = null
callback(err, { opaque })
if (callback) {
this.callback = null
process.nextTick((self, callback, err, opaque) => {
self.runInAsyncScope(callback, null, err, { opaque })
}, this, callback, err, opaque)
}
}
}
module.exports = function upgrade (client, opts, callback) {
function upgrade (opts, callback) {
if (callback === undefined) {
return new Promise((resolve, reject) => {
upgrade(client, opts, (err, data) => {
upgrade.call(this, opts, (err, data) => {
return err ? reject(err) : resolve(data)

@@ -48,6 +86,3 @@ })

try {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('invalid opts')
}
const upgradeHandler = new UpgradeHandler(opts, callback)
const {

@@ -62,3 +97,3 @@ path,

} = opts
client.dispatch({
this.dispatch({
path,

@@ -71,6 +106,8 @@ method: method || 'GET',

upgrade: protocol || 'Websocket'
}, new UpgradeHandler(opts, callback))
}, upgradeHandler)
} catch (err) {
process.nextTick(callback, err, null)
process.nextTick(callback, err, { opaque: opts && opts.opaque })
}
}
module.exports = upgrade
'use strict'
const Client = require('./client')
const Client = require('./core/client')
const {
InvalidArgumentError
} = require('./errors')
const {
kClients,
kGetNext
} = require('./symbols')
ClientClosedError,
InvalidArgumentError,
ClientDestroyedError
} = require('./core/errors')
const FixedQueue = require('./core/node/fixed-queue')
const kClients = Symbol('clients')
const kQueue = Symbol('queue')
const kDestroyed = Symbol('destroyed')
const kClosedPromise = Symbol('closed promise')
const kClosedResolve = Symbol('closed resolve')
class Pool {

@@ -21,58 +26,82 @@ constructor (url, {

this[kQueue] = new FixedQueue()
this[kClosedPromise] = null
this[kClosedResolve] = null
this[kDestroyed] = false
this[kClients] = Array.from({
length: connections || 10
}, () => new Client(url, options))
}
/* istanbul ignore next: use by benchmark */
[kGetNext] () {
return getNext(this)
}
const pool = this
function onDrain () {
const queue = pool[kQueue]
stream (opts, factory, cb) {
if (cb === undefined) {
return new Promise((resolve, reject) => {
this.stream(opts, factory, (err, data) => {
return err ? reject(err) : resolve(data)
})
})
while (!this.busy) {
const item = queue.shift()
if (!item) {
break
}
this.dispatch(item.opts, item.handler)
}
if (pool[kClosedResolve] && queue.isEmpty()) {
Promise
.all(pool[kClients].map(c => c.close()))
.then(pool[kClosedResolve])
}
}
getNext(this).stream(opts, factory, cb)
for (const client of this[kClients]) {
client.on('drain', onDrain)
}
}
pipeline (opts, handler) {
return getNext(this).pipeline(opts, handler)
}
dispatch (opts, handler) {
try {
if (this[kDestroyed]) {
throw new ClientDestroyedError()
}
request (opts, cb) {
if (cb === undefined) {
return new Promise((resolve, reject) => {
this.request(opts, (err, data) => {
return err ? reject(err) : resolve(data)
})
})
if (this[kClosedPromise]) {
throw new ClientClosedError()
}
const client = this[kClients].find(client => !client.busy)
if (!client) {
this[kQueue].push({ opts, handler })
} else {
client.dispatch(opts, handler)
}
} catch (err) {
handler.onError(err)
}
getNext(this).request(opts, cb)
}
upgrade (opts, callback) {
return getNext(this).upgrade(opts, callback)
}
close (cb) {
try {
if (this[kDestroyed]) {
throw new ClientDestroyedError()
}
connect (opts, callback) {
return getNext(this).connect(opts, callback)
}
if (!this[kClosedPromise]) {
if (this[kQueue].isEmpty()) {
this[kClosedPromise] = Promise.all(this[kClients].map(c => c.close()))
} else {
this[kClosedPromise] = new Promise((resolve) => {
this[kClosedResolve] = resolve
})
}
}
dispatch (opts, handler) {
return getNext(this).dispatch(opts, handler)
}
close (cb) {
const promise = Promise.all(this[kClients].map(c => c.close()))
if (cb) {
promise.then(() => cb(null, null), (err) => cb(err, null))
} else {
return promise
if (cb) {
this[kClosedPromise].then(() => cb(null, null))
} else {
return this[kClosedPromise]
}
} catch (err) {
if (cb) {
cb(err)
} else {
return Promise.reject(err)
}
}

@@ -82,2 +111,4 @@ }

destroy (err, cb) {
this[kDestroyed] = true
if (typeof err === 'function') {

@@ -88,2 +119,14 @@ cb = err

if (!err) {
err = new ClientDestroyedError()
}
while (true) {
const item = this[kQueue].shift()
if (!item) {
break
}
item.handler.onError(err)
}
const promise = Promise.all(this[kClients].map(c => c.destroy(err)))

@@ -98,25 +141,2 @@ if (cb) {

function getNext (pool) {
let next
for (const client of pool[kClients]) {
if (client.busy) {
continue
}
if (!next) {
next = client
}
if (client.connected) {
return client
}
}
if (next) {
return next
}
return pool[kClients][Math.floor(Math.random() * pool[kClients].length)]
}
module.exports = Pool
{
"name": "undici",
"version": "1.3.1",
"version": "2.0.0",
"description": "An HTTP/1.1 client, written from scratch for Node.js",
"main": "index.js",
"module": "wrapper.mjs",
"scripts": {
"lint": "standard | snazzy",
"test": "tap test/*.js --no-coverage",
"test": "tap test/*.{mjs,js} --no-coverage && jest test/jest/test",
"coverage": "standard | snazzy && tap test/*.js",

@@ -14,3 +15,3 @@ "bench": "npx concurrently -k -s first \"node benchmarks/server.js\" \"node -e 'setTimeout(() => {}, 1000)' && node benchmarks\""

"type": "git",
"url": "git+https://github.com/mcollina/undici.git"
"url": "git+https://github.com/nodejs/undici.git"
},

@@ -27,5 +28,5 @@ "author": "Matteo Collina <hello@matteocollina.com>",

"bugs": {
"url": "https://github.com/mcollina/undici/issues"
"url": "https://github.com/nodejs/undici/issues"
},
"homepage": "https://github.com/mcollina/undici#readme",
"homepage": "https://github.com/nodejs/undici#readme",
"devDependencies": {

@@ -37,2 +38,3 @@ "@sinonjs/fake-timers": "^6.0.1",

"https-pem": "^2.0.0",
"jest": "^26.4.0",
"pre-commit": "^1.2.2",

@@ -39,0 +41,0 @@ "proxyquire": "^2.0.1",

# undici
![Node CI](https://github.com/mcollina/undici/workflows/Node%20CI/badge.svg) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat)](http://standardjs.com/)
![Node CI](https://github.com/mcollina/undici/workflows/Node%20CI/badge.svg) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat)](http://standardjs.com/) [![npm version](https://badge.fury.io/js/undici.svg)](https://badge.fury.io/js/undici)

@@ -26,7 +26,7 @@ A HTTP/1.1 client, written from scratch for Node.js.

```
http - keepalive x 5,847 ops/sec ±2.69% (276 runs sampled)
undici - pipeline x 8,748 ops/sec ±2.90% (277 runs sampled)
undici - request x 12,166 ops/sec ±0.80% (278 runs sampled)
undici - stream x 12,969 ops/sec ±0.82% (278 runs sampled)
undici - dispatch x 13,736 ops/sec ±0.60% (280 runs sampled)
http - keepalive x 5,882 ops/sec ±1.87% (274 runs sampled)
undici - pipeline x 9,189 ops/sec ±2.02% (272 runs sampled)
undici - request x 12,623 ops/sec ±0.89% (277 runs sampled)
undici - stream x 14,136 ops/sec ±0.61% (280 runs sampled)
undici - dispatch x 14,883 ops/sec ±0.44% (281 runs sampled)
```

@@ -61,3 +61,6 @@

- `maxKeepAliveTimeout: Number`, the maximum allowed `idleTimeout` when overriden by
- `keepAlive: Boolean`, enable or disable keep alive connections.
Default: `true`.
- `keepAliveMaxTimeout: Number`, the maximum allowed `idleTimeout` when overriden by
*keep-alive* hints from the server.

@@ -71,12 +74,2 @@ Default: `600e3` milliseconds (10min).

- `requestTimeout: Number`, the timeout after which a request will time out.
Monitors time between request being enqueued and receiving
a response. Use `0` to disable it entirely.
Default: `30e3` milliseconds (30s).
- `maxAbortedPayload: Number`, the maximum number of bytes read after which an
aborted response will close the connection. Closing the connection
will error other inflight requests in the pipeline.
Default: `1048576` bytes (1MiB).
- `pipelining: Number`, the amount of concurrent requests to be sent over the

@@ -91,3 +84,3 @@ single TCP/TLS connection according to

- `maxHeaderSize: Number`, the maximum length of request headers in bytes.
- `maxHeaderSize: Number`, the maximum length of request headers in bytes.
Default: `16384` (16KiB).

@@ -109,7 +102,7 @@

* `opaque: Any`
* `body: String|Buffer|Uint8Array|stream.Readable|Null`.
* `body: String|Buffer|Uint8Array|stream.Readable|Null`
Default: `null`.
* `headers: Object|Null`, an object with header-value pairs.
Default: `null`.
* `signal: AbortController|EventEmitter|Null`.
* `signal: AbortController|EventEmitter|Null`
Default: `null`.

@@ -431,26 +424,49 @@ * `requestTimeout: Number`, the timeout after which a request will time out, in

This is the low level API which all the preceeding API's are implemented on top of.
This is the low level API which all the preceeding APIs are implemented on top of.
This API is expected to evolve through semver-major versions and is less stable
than the preceeding higher level APIs. It is primarily intended for library developers
who implement higher level APIs on top of this.
Options:
* ... same as [`client.request(opts[, callback])`][request].
* `path: String`
* `method: String`
* `body: String|Buffer|Uint8Array|stream.Readable|Null`
Default: `null`.
* `headers: Object|Null`, an object with header-value pairs.
Default: `null`.
* `requestTimeout: Number`, the timeout after which a request will time out, in
milliseconds. Monitors time between request being enqueued and receiving
a response. Use `0` to disable it entirely.
Default: `30e3` milliseconds (30s).
* `idempotent: Boolean`, whether the requests can be safely retried or not.
If `false` the request won't be sent until all preceeding
requests in the pipeline has completed.
Default: `true` if `method` is `HEAD` or `GET`.
The `handler` parameter is defined as follow:
* `onConnect(abort)`, invoked before request is dispatched on socket.
May be invoked multiple times when a request is retried when the request at the head of the pipeline fails.
* `abort(): Void`, abort request.
* `onUpgrade(statusCode, headers, socket): Void`, invoked when request is upgraded either due to a `Upgrade` header or `CONNECT` method.
* `statusCode: Number`
* `headers: Object`
* `headers: Array|Null`
* `socket: Duplex`
* `onHeaders(statusCode, headers, resume): Void`, invoked when statusCode and headers have been received.
* `onHeaders(statusCode, headers, resume): Void`, invoked when statusCode and headers have been received.
May be invoked multiple times due to 1xx informational headers.
* `statusCode: Number`
* `headers: Object`
* `headers: Array|Null`, an array of key-value pairs. Keys are not automatically lowercased.
* `resume(): Void`, resume `onData` after returning `false`.
* `onData(chunk): Null|Boolean`, invoked when response payload data is received.
* `onData(chunk): Boolean`, invoked when response payload data is received.
* `chunk: Buffer`
* `onComplete(trailers): Void`, invoked when response payload and trailers have been received and the request has completed.
* `trailers: Object`
* `trailers: Array|Null`
* `onError(err): Void`, invoked when an error has occured.
* `err: Error`
The caller is responsible for handling the `body` argument, in terms of `'error'` events and `destroy()`:ing up until
the `onConnect` handler has been invoked.
<a name='close'></a>

@@ -512,2 +528,5 @@ #### `client.close([callback]): Promise|Void`

* `'drain'`, emitted when pipeline is no longer fully
saturated.
* `'connect'`, emitted when a socket has been created and

@@ -518,3 +537,3 @@ connected. The client will connect once `client.size > 0`.

first argument of the event is the error which caused the
socket to disconnect. The client will reconnect if or once
socket to disconnect. The client will reconnect if or once
`client.size > 0`.

@@ -531,4 +550,7 @@

* `connections`, the number of clients to create.
Default `100`.
Default `10`.
`Pool` does not guarantee that requests are dispatched in
order of invocation.
#### `pool.request(opts[, callback]): Promise|Void`

@@ -592,3 +614,3 @@

This section documents parts of the HTTP/1.1 specification which Undici does
This section documents parts of the HTTP/1.1 specification which Undici does
not support or does not fully implement.

@@ -599,3 +621,3 @@

Undici does not support the `Expect` request header field. The request
body is always immediately sent and the `100 Continue` response will be
body is always immediately sent and the `100 Continue` response will be
ignored.

@@ -607,8 +629,8 @@

Uncidi will only use pipelining if configured with a `pipelining` factor
Uncidi will only use pipelining if configured with a `pipelining` factor
greater than `1`.
Undici always assumes that connections are persistent and will immediatly
Undici always assumes that connections are persistent and will immediatly
pipeline requests, without checking whether the connection is persistent.
Hence, automatic fallback to HTTP/1.0 or HTTP/1.1 without pipelining is
Hence, automatic fallback to HTTP/1.0 or HTTP/1.1 without pipelining is
not supported.

@@ -615,0 +637,0 @@

@@ -203,27 +203,1 @@ 'use strict'

writeBodyStartedWithBody(new Uint8Array([42]), 'Uint8Array')
test('cleanup listener', (t) => {
t.plan(4)
const abortController = new AbortController()
abortController.signal.addEventListener = (name) => t.strictEqual(name, 'abort')
abortController.signal.removeEventListener = (name) => t.strictEqual(name, 'abort')
const server = createServer((req, res) => {
res.writeHead(200, { 'content-type': 'text/plain' })
res.write('hello')
res.end('world')
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => {
t.error(err)
response.body.on('end', () => {
t.pass()
}).resume()
})
})
})

@@ -214,27 +214,1 @@ 'use strict'

writeBodyStartedWithBody(new Uint8Array([42]), 'Uint8Array')
test('cleanup listener', (t) => {
t.plan(4)
const ee = new EventEmitter()
ee.addListener = (name) => t.strictEqual(name, 'abort')
ee.removeListener = (name) => t.strictEqual(name, 'abort')
const server = createServer((req, res) => {
res.writeHead(200, { 'content-type': 'text/plain' })
res.write('hello')
res.end('world')
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => {
t.error(err)
response.body.on('end', () => {
t.pass()
}).resume()
})
})
})

@@ -37,154 +37,2 @@ 'use strict'

test('aborted GET maxAbortedPayload reset', (t) => {
t.plan(5)
const server = createServer((req, res) => {
res.end(Buffer.alloc(1e6 - 1))
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 2,
maxAbortedPayload: 1e6
})
t.tearDown(client.close.bind(client))
client.on('disconnect', () => {
t.fail()
})
client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
t.error(err)
body.once('data', (chunk) => {
body.destroy()
}).once('error', (err) => {
t.ok(err)
}).on('end', () => {
t.fail()
})
})
// Make sure read counter is reset.
client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
t.error(err)
body.once('data', (chunk) => {
body.destroy()
}).on('error', (err) => {
t.ok(err)
}).on('end', () => {
t.fail()
})
})
client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
t.error(err)
body.resume()
})
})
})
test('aborted GET maxAbortedPayload', (t) => {
t.plan(5)
const server = createServer((req, res) => {
res.end(Buffer.alloc(100000 + 1, 'a'))
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 1,
maxAbortedPayload: 100000
})
t.tearDown(client.destroy.bind(client))
client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
t.error(err)
body.once('data', () => {
body.destroy()
}).once('error', (err) => {
t.ok(err)
})
// old Readable emits error twice
.on('error', () => {})
})
client.on('disconnect', () => {
t.pass()
})
client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
t.error(err)
body.resume()
})
client.close((err) => {
t.error(err)
})
})
})
test('aborted GET maxAbortedPayload less than HWM', (t) => {
t.plan(4)
const server = createServer((req, res) => {
res.end(Buffer.alloc(4 + 1, 'a'))
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 1,
maxAbortedPayload: 4
})
t.tearDown(client.destroy.bind(client))
client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
t.error(err)
body.once('data', () => {
body.destroy()
}).once('error', (err) => {
t.ok(err)
})
// old Readable emits error twice
.on('error', () => {})
})
client.on('disconnect', () => {
t.fail()
})
client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
t.error(err)
body.resume()
})
client.close((err) => {
t.error(err)
})
})
})
test('aborted req', (t) => {

@@ -191,0 +39,0 @@ t.plan(1)

@@ -192,3 +192,3 @@ 'use strict'

test('connect aborted', (t) => {
t.plan(3)
t.plan(4)

@@ -203,8 +203,12 @@ const server = http.createServer((req, res) => {

server.listen(0, async () => {
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 3
})
t.tearDown(client.close.bind(client))
t.tearDown(client.destroy.bind(client))
client.on('disconnect', () => {
t.fail()
})
const signal = new EE()

@@ -221,3 +225,108 @@ client.connect({

signal.emit('abort')
client.close(() => {
t.pass()
})
})
})
test('basic connect error', (t) => {
t.plan(2)
const server = http.createServer((c) => {
t.fail()
})
server.on('connect', (req, socket, firstBodyChunk) => {
socket.write('HTTP/1.1 200 Connection established\r\n\r\n')
let data = firstBodyChunk.toString()
socket.on('data', (buf) => {
data += buf.toString()
})
socket.on('end', () => {
socket.end(data)
})
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
const _err = new Error()
client.connect({
path: '/'
}, (err, { socket }) => {
t.error(err)
socket.on('error', (err) => {
t.strictEqual(err, _err)
})
throw _err
})
})
})
test('connect invalid signal', (t) => {
t.plan(2)
const server = http.createServer((req, res) => {
t.fail()
})
server.on('connect', (req, c, firstBodyChunk) => {
t.fail()
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
client.on('disconnect', () => {
t.fail()
})
client.connect({
path: '/',
signal: 'error',
opaque: 'asd'
}, (err, { opaque }) => {
t.strictEqual(opaque, 'asd')
t.ok(err instanceof errors.InvalidArgumentError)
})
})
})
test('connect aborted after connect', (t) => {
t.plan(3)
const signal = new EE()
const server = http.createServer((req, res) => {
t.fail()
})
server.on('connect', (req, c, firstBodyChunk) => {
signal.emit('abort')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 3
})
t.tearDown(client.destroy.bind(client))
client.on('disconnect', () => {
t.fail()
})
client.connect({
path: '/',
signal,
opaque: 'asd'
}, (err, { opaque }) => {
t.strictEqual(opaque, 'asd')
t.ok(err instanceof errors.RequestAbortedError)
})
t.strictEqual(client.busy, true)
})
})

@@ -5,2 +5,3 @@ 'use strict'

const { Client, errors } = require('..')
const http = require('http')

@@ -12,11 +13,320 @@ test('dispatch invalid opts', (t) => {

try {
client.dispatch({
path: '/',
method: 'GET',
upgrade: 1
}, {
onError (err) {
t.ok(err instanceof errors.InvalidArgumentError)
}
})
})
test('basic dispatch get', (t) => {
t.plan(10)
const server = http.createServer((req, res) => {
t.strictEqual('/', req.url)
t.strictEqual('GET', req.method)
t.strictEqual('localhost', req.headers.host)
t.strictEqual(undefined, req.headers.foo)
t.strictEqual('bar', req.headers.bar)
t.strictEqual(undefined, req.headers['content-length'])
res.end('hello')
})
t.tearDown(server.close.bind(server))
const reqHeaders = {
foo: undefined,
bar: 'bar'
}
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
const bufs = []
client.dispatch({
path: '/',
method: 'GET',
upgrade: 1
headers: reqHeaders
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.strictEqual(statusCode, 200)
t.strictEqual(Array.isArray(headers), true)
},
onData (buf) {
bufs.push(buf)
},
onComplete (trailers) {
t.strictEqual(trailers, null)
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8'))
},
onError () {
t.fail()
}
})
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
})
})
test('trailers dispatch get', (t) => {
t.plan(12)
const server = http.createServer((req, res) => {
t.strictEqual('/', req.url)
t.strictEqual('GET', req.method)
t.strictEqual('localhost', req.headers.host)
t.strictEqual(undefined, req.headers.foo)
t.strictEqual('bar', req.headers.bar)
t.strictEqual(undefined, req.headers['content-length'])
res.addTrailers({ 'Content-MD5': 'test' })
res.setHeader('Content-Type', 'text/plain')
res.setHeader('Trailer', 'Content-MD5')
res.end('hello')
})
t.tearDown(server.close.bind(server))
const reqHeaders = {
foo: undefined,
bar: 'bar'
}
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
const bufs = []
client.dispatch({
path: '/',
method: 'GET',
headers: reqHeaders
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.strictEqual(statusCode, 200)
t.strictEqual(Array.isArray(headers), true)
{
const contentTypeIdx = headers.findIndex(x => x === 'Content-Type')
t.strictEqual(headers[contentTypeIdx + 1], 'text/plain')
}
},
onData (buf) {
bufs.push(buf)
},
onComplete (trailers) {
t.strictEqual(Array.isArray(trailers), true)
{
const contentMD5Idx = trailers.findIndex(x => x === 'Content-MD5')
t.strictEqual(trailers[contentMD5Idx + 1], 'test')
}
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8'))
},
onError () {
t.fail()
}
})
})
})
test('dispatch onHeaders error', (t) => {
t.plan(1)
const server = http.createServer((req, res) => {
res.end()
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
const _err = new Error()
client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
throw _err
},
onData (buf) {
t.fail()
},
onComplete (trailers) {
t.fail()
},
onError (err) {
t.strictEqual(err, _err)
}
})
})
})
test('dispatch onComplete error', (t) => {
t.plan(2)
const server = http.createServer((req, res) => {
res.end()
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
const _err = new Error()
client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.pass()
},
onData (buf) {
t.fail()
},
onComplete (trailers) {
throw _err
},
onError (err) {
t.strictEqual(err, _err)
}
})
})
})
test('dispatch onData error', (t) => {
t.plan(2)
const server = http.createServer((req, res) => {
res.end('ad')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
const _err = new Error()
client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.pass()
},
onData (buf) {
throw _err
},
onComplete (trailers) {
t.fail()
},
onError (err) {
t.strictEqual(err, _err)
}
})
})
})
test('dispatch onConnect error', (t) => {
t.plan(1)
const server = http.createServer((req, res) => {
res.end('ad')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
const _err = new Error()
client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
throw _err
},
onHeaders (statusCode, headers) {
t.fail()
},
onData (buf) {
t.fail()
},
onComplete (trailers) {
t.fail()
},
onError (err) {
t.strictEqual(err, _err)
}
})
})
})
test('connect call onUpgrade once', (t) => {
t.plan(2)
const server = http.createServer((c) => {
t.fail()
})
server.on('connect', (req, socket, firstBodyChunk) => {
socket.write('HTTP/1.1 200 Connection established\r\n\r\n')
let data = firstBodyChunk.toString()
socket.on('data', (buf) => {
data += buf.toString()
})
socket.on('end', () => {
socket.end(data)
})
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
let recvData = ''
let count = 0
client.dispatch({
method: 'CONNECT',
path: '/'
}, {
onConnect () {
},
onUpgrade (statusCode, headers, socket) {
t.strictEqual(count++, 0)
socket.on('data', (d) => {
recvData += d
})
socket.on('end', () => {
t.strictEqual(recvData.toString(), 'Body')
})
socket.write('Body')
socket.end()
},
onData (buf) {
t.fail()
},
onComplete (trailers) {
t.fail()
},
onError () {
t.fail()
}
})
})
})

@@ -9,4 +9,3 @@ 'use strict'

const { kSocket } = require('../lib/symbols')
const { InvalidArgumentError } = require('../lib/errors')
const { kSocket } = require('../lib/core/symbols')

@@ -245,4 +244,2 @@ test('GET errors and reconnect with pipelining 1', (t) => {

test('invalid options throws', (t) => {
t.plan(38)
try {

@@ -278,11 +275,2 @@ new Client({ port: 'foobar' }) // eslint-disable-line

new Client(new URL('http://localhost:200'), { // eslint-disable-line
maxAbortedPayload: 'asd'
})
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid maxAbortedPayload')
}
try {
new Client(new URL('http://localhost:200'), { // eslint-disable-line
socketPath: 1

@@ -315,7 +303,7 @@ })

new Client(new URL('http://localhost:200'), { // eslint-disable-line
maxKeepAliveTimeout: 'asd'
keepAliveMaxTimeout: 'asd'
}) // eslint-disable-line
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid maxKeepAliveTimeout')
t.strictEqual(err.message, 'invalid keepAliveMaxTimeout')
}

@@ -325,7 +313,7 @@

new Client(new URL('http://localhost:200'), { // eslint-disable-line
maxKeepAliveTimeout: 0
keepAliveMaxTimeout: 0
}) // eslint-disable-line
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid maxKeepAliveTimeout')
t.strictEqual(err.message, 'invalid keepAliveMaxTimeout')
}

@@ -352,11 +340,2 @@

try {
new Client(new URL('http://localhost:200'), { // eslint-disable-line
requestTimeout: 'asd'
})
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid requestTimeout')
}
try {
new Client({ // eslint-disable-line

@@ -410,2 +389,11 @@ protocol: 'asd'

}
try {
new Client(new URL('http://localhost:200'), { keepAlive: 'true' }) // eslint-disable-line
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid keepAlive')
}
t.end()
})

@@ -697,3 +685,3 @@

test('queued request should not fail on socket destroy', (t) => {
t.plan(2)
t.plan(4)

@@ -717,3 +705,5 @@ const server = createServer()

t.error(err)
data.body.resume()
data.body.resume().on('error', () => {
t.pass()
})
client[kSocket].destroy()

@@ -725,3 +715,5 @@ client.request({

t.error(err)
data.body.resume()
data.body.resume().on('end', () => {
t.pass()
})
})

@@ -733,3 +725,3 @@ })

test('queued request should fail on client destroy', (t) => {
t.plan(5)
t.plan(6)

@@ -755,2 +747,5 @@ const server = createServer()

data.body.resume()
.on('error', () => {
t.pass()
})
client.destroy((err) => {

@@ -786,3 +781,3 @@ t.error(err)

})
t.tearDown(client.destroy.bind(client))
t.tearDown(client.close.bind(client))

@@ -897,3 +892,3 @@ client.request({

test('invalid signal', (t) => {
t.plan(3)
t.plan(8)

@@ -903,12 +898,19 @@ const client = new Client('http://localhost:3333')

client.request({ path: '/', method: 'GET', signal: {} }, (err) => {
t.ok(err instanceof InvalidArgumentError)
let ticked = false
client.request({ path: '/', method: 'GET', signal: {}, opaque: 'asd' }, (err, { opaque }) => {
t.strictEqual(ticked, true)
t.strictEqual(opaque, 'asd')
t.ok(err instanceof errors.InvalidArgumentError)
})
client.pipeline({ path: '/', method: 'GET', signal: {} }, () => {})
.on('error', (err) => {
t.ok(err instanceof InvalidArgumentError)
t.strictEqual(ticked, true)
t.ok(err instanceof errors.InvalidArgumentError)
})
client.stream({ path: '/', method: 'GET', signal: {} }, () => {}, (err) => {
t.ok(err instanceof InvalidArgumentError)
client.stream({ path: '/', method: 'GET', signal: {}, opaque: 'asd' }, () => {}, (err, { opaque }) => {
t.strictEqual(ticked, true)
t.strictEqual(opaque, 'asd')
t.ok(err instanceof errors.InvalidArgumentError)
})
ticked = true
})

@@ -5,3 +5,5 @@ 'use strict'

const { Client } = require('..')
const { kConnect } = require('../lib/core/symbols')
const { createServer } = require('net')
const http = require('http')

@@ -164,3 +166,3 @@ test('keep-alive header', (t) => {

idleTimeout: 30e3,
maxKeepAliveTimeout: 1e3
keepAliveMaxTimeout: 1e3
})

@@ -186,1 +188,97 @@ t.teardown(client.destroy.bind(client))

})
test('connection close', (t) => {
t.plan(4)
let close = false
const server = createServer((socket) => {
if (close) {
return
}
close = true
socket.write('HTTP/1.1 200 OK\r\n')
socket.write('Content-Length: 0\r\n')
socket.write('Connection: close\r\n')
socket.write('\r\n\r\n')
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 2
})
t.teardown(client.destroy.bind(client))
client[kConnect](() => {
client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
t.error(err)
body.on('end', () => {
const timeout = setTimeout(() => {
t.fail()
}, 3e3)
client.once('disconnect', () => {
close = false
t.pass()
clearTimeout(timeout)
})
}).resume()
})
client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
t.error(err)
body.on('end', () => {
const timeout = setTimeout(() => {
t.fail()
}, 3e3)
client.once('disconnect', () => {
t.pass()
clearTimeout(timeout)
})
}).resume()
})
})
})
})
test('Disable keep alive', (t) => {
t.plan(7)
const ports = []
const server = http.createServer((req, res) => {
t.false(ports.includes(req.socket.remotePort))
ports.push(req.socket.remotePort)
t.match(req.headers, { connection: 'close' })
res.writeHead(200, { connection: 'close' })
res.end()
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, { keepAlive: false })
t.teardown(client.destroy.bind(client))
client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
t.error(err)
body.on('end', () => {
client.request({
path: '/',
method: 'GET'
}, (err, { body }) => {
t.error(err)
body.on('end', () => {
t.pass()
}).resume()
})
}).resume()
})
})
})

@@ -415,6 +415,8 @@ 'use strict'

test('pipeline abort res', (t) => {
t.plan(1)
t.plan(2)
let _res
const server = createServer((req, res) => {
res.write('asd')
_res = res
})

@@ -433,2 +435,10 @@ t.tearDown(server.close.bind(server))

body.destroy()
_res.write('asdasdadasd')
const timeout = setTimeout(() => {
t.fail()
}, 100)
client.on('disconnect', () => {
clearTimeout(timeout)
t.pass()
})
})

@@ -750,3 +760,3 @@ return body

const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
t.tearDown(client.destroy.bind(client))

@@ -973,2 +983,5 @@ client

res.write('asd')
setImmediate(() => {
res.write('asd')
})
})

@@ -975,0 +988,0 @@ t.tearDown(server.close.bind(server))

@@ -7,2 +7,4 @@ 'use strict'

const { finished, Readable } = require('stream')
const { kConnect } = require('../lib/core/symbols')
const EE = require('events')

@@ -388,3 +390,3 @@ test('20 times GET with pipelining 10', (t) => {

test('pipelining HEAD busy', (t) => {
t.plan(6)
t.plan(7)

@@ -403,10 +405,87 @@ const server = createServer()

{
client[kConnect](() => {
let ended = false
client.once('disconnect', () => {
t.strictEqual(ended, true)
})
{
const body = new Readable({
read () { }
})
client.request({
path: '/',
method: 'GET',
body
}, (err, data) => {
t.error(err)
data.body
.resume()
.on('end', () => {
t.pass()
})
})
body.push(null)
t.strictEqual(client.busy, true)
}
{
const body = new Readable({
read () { }
})
client.request({
path: '/',
method: 'HEAD',
body
}, (err, data) => {
t.error(err)
data.body
.resume()
.on('end', () => {
ended = true
t.pass()
})
})
body.push(null)
t.strictEqual(client.busy, true)
}
})
})
})
test('pipelining empty pipeline before reset', (t) => {
t.plan(7)
let c = 0
const server = createServer()
server.on('request', (req, res) => {
if (c++ === 0) {
res.end('asd')
} else {
setTimeout(() => {
res.end('asd')
}, 100)
}
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 10
})
t.tearDown(client.close.bind(client))
client[kConnect](() => {
let ended = false
client.once('disconnect', () => {
t.strictEqual(ended, true)
})
const body = new Readable({
read () { }
})
client.request({
path: '/',
method: 'GET',
body
method: 'GET'
}, (err, data) => {

@@ -418,16 +497,11 @@ t.error(err)

t.pass()
body.push(null)
})
})
body.push(null)
t.strictEqual(client.busy, false)
}
{
const body = new Readable({
read () { }
})
client.request({
path: '/',
method: 'HEAD',
body
body: 'asd'
}, (err, data) => {

@@ -438,8 +512,9 @@ t.error(err)

.on('end', () => {
ended = true
t.pass()
})
})
body.push(null)
t.strictEqual(client.busy, true)
}
t.strictEqual(client.running, 2)
})
})

@@ -449,3 +524,3 @@ })

test('pipelining idempotent busy', (t) => {
t.plan(6)
t.plan(12)

@@ -481,26 +556,66 @@ const server = createServer()

body.push(null)
t.strictEqual(client.busy, false)
t.strictEqual(client.busy, true)
}
{
const body = new Readable({
read () { }
})
client.request({
path: '/',
method: 'GET',
idempotent: false,
body
}, (err, data) => {
t.error(err)
data.body
.resume()
.on('end', () => {
t.pass()
})
})
body.push(null)
t.strictEqual(client.busy, true)
}
client[kConnect](() => {
{
const body = new Readable({
read () { }
})
client.request({
path: '/',
method: 'GET',
body
}, (err, data) => {
t.error(err)
data.body
.resume()
.on('end', () => {
t.pass()
})
})
body.push(null)
t.strictEqual(client.busy, true)
}
{
const signal = new EE()
const body = new Readable({
read () { }
})
client.request({
path: '/',
method: 'GET',
body,
signal
}, (err, data) => {
t.ok(err)
})
t.strictEqual(client.busy, true)
signal.emit('abort')
t.strictEqual(client.busy, true)
}
{
const body = new Readable({
read () { }
})
client.request({
path: '/',
method: 'GET',
idempotent: false,
body
}, (err, data) => {
t.error(err)
data.body
.resume()
.on('end', () => {
t.pass()
})
})
body.push(null)
t.strictEqual(client.busy, true)
}
})
})
})

@@ -9,3 +9,3 @@ 'use strict'

test('multiple reconnect', (t) => {
t.plan(2)
t.plan(3)

@@ -25,3 +25,7 @@ const clock = FakeTimers.install()

t.error(err)
data.body.resume()
data.body
.resume()
.on('end', () => {
t.pass()
})
})

@@ -28,0 +32,0 @@

@@ -7,3 +7,4 @@ 'use strict'

const EE = require('events')
const { kConnect } = require('../lib/symbols')
const { kConnect } = require('../lib/core/symbols')
const { Readable } = require('stream')

@@ -22,3 +23,3 @@ test('request abort before headers', (t) => {

const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
t.tearDown(client.destroy.bind(client))

@@ -43,1 +44,27 @@ client[kConnect](() => {

})
test('request body destroyed on invalid callback', (t) => {
t.plan(1)
const server = createServer((req, res) => {
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
const body = new Readable({
read () {}
})
try {
client.request({
path: '/',
method: 'GET',
body
}, null)
} catch (err) {
t.strictEqual(body.destroyed, true)
}
})
})

@@ -6,3 +6,3 @@ 'use strict'

const { createServer } = require('http')
const { PassThrough, Writable } = require('stream')
const { PassThrough, Writable, Readable } = require('stream')
const EE = require('events')

@@ -196,5 +196,3 @@

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
maxAbortedPayload: 1e5
})
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))

@@ -460,5 +458,2 @@

})
client.on('disconnect', () => {
t.fail()
})
})

@@ -522,2 +517,32 @@ })

test('stream abort before dispatch', (t) => {
t.plan(1)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
const pt = new PassThrough()
const signal = new EE()
client.stream({
path: '/',
method: 'GET',
signal
}, () => {
return pt
}, (err) => {
t.ok(err instanceof errors.RequestAbortedError)
})
signal.emit('abort')
client.on('disconnect', () => {
t.fail()
})
})
})
test('trailers', (t) => {

@@ -607,1 +632,27 @@ t.plan(2)

})
test('stream body destroyed on invalid callback', (t) => {
t.plan(1)
const server = createServer((req, res) => {
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
const body = new Readable({
read () {}
})
try {
client.stream({
path: '/',
method: 'GET',
body
}, () => {}, null)
} catch (err) {
t.strictEqual(body.destroyed, true)
}
})
})

@@ -118,5 +118,5 @@ 'use strict'

})
c.on('end', () => {
c.end()
c.on('error', () => {
// Whether we get an error, end or close is undefined.
// Ignore error.
})

@@ -273,3 +273,3 @@ })

test('upgrade aborted', (t) => {
t.plan(3)
t.plan(4)

@@ -288,3 +288,3 @@ const server = http.createServer((req, res) => {

})
t.tearDown(client.close.bind(client))
t.tearDown(client.destroy.bind(client))

@@ -302,2 +302,6 @@ const signal = new EE()

signal.emit('abort')
client.close(() => {
t.pass()
})
})

@@ -319,2 +323,5 @@ })

c.end()
c.on('error', () => {
})
signal.emit('abort')

@@ -338,1 +345,67 @@ })

})
test('basic upgrade error', (t) => {
t.plan(2)
const server = net.createServer((c) => {
c.on('data', (d) => {
c.write('HTTP/1.1 101\r\n')
c.write('hello: world\r\n')
c.write('connection: upgrade\r\n')
c.write('upgrade: websocket\r\n')
c.write('\r\n')
c.write('Body')
})
c.on('error', () => {
})
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
const _err = new Error()
client.upgrade({
path: '/',
method: 'GET',
protocol: 'Websocket'
}, (err, data) => {
t.error(err)
data.socket.on('error', (err) => {
t.strictEqual(err, _err)
})
throw _err
})
})
})
test('upgrade invalid signal', (t) => {
t.plan(2)
const server = net.createServer(() => {
t.fail()
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
client.on('disconnect', () => {
t.fail()
})
client.upgrade({
path: '/',
method: 'GET',
protocol: 'Websocket',
signal: 'error',
opaque: 'asd'
}, (err, { opaque }) => {
t.strictEqual(opaque, 'asd')
t.ok(err instanceof errors.InvalidArgumentError)
})
})
})

@@ -8,3 +8,3 @@ 'use strict'

const { Readable } = require('stream')
const { kSocket } = require('../lib/symbols')
const { kSocket } = require('../lib/core/symbols')
const EE = require('events')

@@ -76,3 +76,3 @@

const server = createServer((req, res) => {
t.strictEqual('/', req.url)
t.strictEqual('/123', req.url)
t.strictEqual('HEAD', req.method)

@@ -89,3 +89,3 @@ t.strictEqual('localhost', req.headers.host)

client.request({ path: '/', method: 'HEAD' }, (err, { statusCode, headers, body }) => {
client.request({ path: '/123', method: 'HEAD' }, (err, { statusCode, headers, body }) => {
t.error(err)

@@ -101,3 +101,3 @@ t.strictEqual(statusCode, 200)

client.request({ path: '/', method: 'HEAD' }, (err, { statusCode, headers, body }) => {
client.request({ path: '/123', method: 'HEAD' }, (err, { statusCode, headers, body }) => {
t.error(err)

@@ -627,3 +627,3 @@ t.strictEqual(statusCode, 200)

test('multiple destroy callback', (t) => {
t.plan(3)
t.plan(4)

@@ -645,3 +645,7 @@ const server = createServer((req, res) => {

t.error(err)
data.body.resume()
data.body
.resume()
.on('error', () => {
t.pass()
})
client.destroy(new Error(), (err) => {

@@ -658,3 +662,3 @@ t.error(err)

test('only one streaming req at a time', (t) => {
t.plan(6)
t.plan(7)

@@ -703,3 +707,7 @@ const server = createServer((req, res) => {

t.error(err)
data.body.resume()
data.body
.resume()
.on('end', () => {
t.pass()
})
})

@@ -712,3 +720,3 @@ t.strictEqual(client.busy, true)

test('300 requests succeed', (t) => {
t.plan(300 * 2)
t.plan(300 * 3)

@@ -732,2 +740,4 @@ const server = createServer((req, res) => {

t.strictEqual(chunk.toString(), 'asd')
}).on('end', () => {
t.pass()
})

@@ -734,0 +744,0 @@ })

@@ -6,3 +6,3 @@ 'use strict'

const { createServer } = require('http')
const { kSocket } = require('../lib/symbols')
const { kSocket } = require('../lib/core/symbols')

@@ -9,0 +9,0 @@ test('close waits for queued requests to finish', (t) => {

@@ -7,3 +7,3 @@ 'use strict'

const { Readable } = require('stream')
const { kConnect } = require('../lib/symbols')
const { kConnect } = require('../lib/core/symbols')

@@ -77,22 +77,22 @@ test('GET and HEAD with body should reset connection', (t) => {

test('GET with body should work when target parses body as request', (t) => {
t.plan(4)
// TODO: Avoid external dependency.
// test('GET with body should work when target parses body as request', (t) => {
// t.plan(4)
// This URL will send double responses when receiving a
// GET request with body.
// TODO: Avoid external dependency.
const client = new Client('http://feeds.bbci.co.uk')
t.teardown(client.close.bind(client))
// // This URL will send double responses when receiving a
// // GET request with body.
// const client = new Client('http://feeds.bbci.co.uk')
// t.teardown(client.close.bind(client))
client.request({ method: 'GET', path: '/news/rss.xml', body: 'asd' }, (err, data) => {
t.error(err)
t.strictEqual(data.statusCode, 200)
data.body.resume()
})
client.request({ method: 'GET', path: '/news/rss.xml', body: 'asd' }, (err, data) => {
t.error(err)
t.strictEqual(data.statusCode, 200)
data.body.resume()
})
})
// client.request({ method: 'GET', path: '/news/rss.xml', body: 'asd' }, (err, data) => {
// t.error(err)
// t.strictEqual(data.statusCode, 200)
// data.body.resume()
// })
// client.request({ method: 'GET', path: '/news/rss.xml', body: 'asd' }, (err, data) => {
// t.error(err)
// t.strictEqual(data.statusCode, 200)
// data.body.resume()
// })
// })

@@ -99,0 +99,0 @@ test('HEAD should reset connection', (t) => {

@@ -6,3 +6,3 @@ 'use strict'

const { createServer } = require('http')
const { kConnect } = require('../lib/symbols')
const { kConnect } = require('../lib/core/symbols')

@@ -34,3 +34,3 @@ test('pipeline pipelining', (t) => {

}, ({ body }) => body).end().resume()
t.strictEqual(client.busy, false)
t.strictEqual(client.busy, true)
t.strictDeepEqual(client.running, 0)

@@ -87,3 +87,3 @@ t.strictDeepEqual(client.pending, 1)

})
t.strictEqual(client.busy, false)
t.strictEqual(client.busy, true)
t.strictDeepEqual(client.running, 0)

@@ -96,3 +96,3 @@ t.strictDeepEqual(client.pending, 1)

}, ({ body }) => body).end().resume()
t.strictEqual(client.busy, false)
t.strictEqual(client.busy, true)
t.strictDeepEqual(client.running, 0)

@@ -99,0 +99,0 @@ t.strictDeepEqual(client.pending, 2)

@@ -10,5 +10,6 @@ 'use strict'

const { promisify } = require('util')
const { PassThrough } = require('stream')
const { PassThrough, Readable } = require('stream')
const eos = require('stream').finished
const net = require('net')
const EE = require('events')

@@ -196,3 +197,3 @@ test('basic get', (t) => {

request (req, cb) {
dispatch (req, cb) {
seen.push({ req, cb, client: this, id: this.id })

@@ -203,3 +204,3 @@ }

const Pool = proxyquire('../lib/pool', {
'./client': FakeClient
'./core/client': FakeClient
})

@@ -211,4 +212,4 @@

pool.request({}, noop)
pool.request({}, noop)
pool.dispatch({}, noop)
pool.dispatch({}, noop)

@@ -222,7 +223,7 @@ const d1 = seen.shift() // d1 = c0

pool.request({}, noop) // d3 = c0
pool.dispatch({}, noop) // d3 = c0
d1.client._busy = true
pool.request({}, noop) // d4 = c1
pool.dispatch({}, noop) // d4 = c1

@@ -237,7 +238,7 @@ const d3 = seen.shift()

pool.request({}, noop) // d5 = c1
pool.dispatch({}, noop) // d5 = c1
d1.client._busy = false
pool.request({}, noop) // d6 = c0
pool.dispatch({}, noop) // d6 = c0

@@ -423,2 +424,4 @@ const d5 = seen.shift()

}, {
onConnect () {
},
onHeaders (statusCode, headers) {

@@ -436,1 +439,503 @@ t.strictEqual(statusCode, 200)

})
test('pool pipeline args validation', (t) => {
t.plan(2)
const client = new Pool('http://localhost:5000')
const ret = client.pipeline(null, () => {})
ret.on('error', (err) => {
t.ok(/opts/.test(err.message))
t.ok(err instanceof errors.InvalidArgumentError)
})
})
test('300 requests succeed', (t) => {
t.plan(300 * 3)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1
})
t.tearDown(client.destroy.bind(client))
for (let n = 0; n < 300; ++n) {
client.request({
path: '/',
method: 'GET'
}, (err, data) => {
t.error(err)
data.body.on('data', (chunk) => {
t.strictEqual(chunk.toString(), 'asd')
}).on('end', () => {
t.pass()
})
})
}
})
})
test('pool connect error', (t) => {
t.plan(1)
const server = createServer((c) => {
t.fail()
})
server.on('connect', (req, socket, firstBodyChunk) => {
socket.destroy()
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
try {
await client.connect({
path: '/'
})
} catch (err) {
t.ok(err)
}
})
})
test('pool upgrade error', (t) => {
t.plan(1)
const server = net.createServer((c) => {
c.on('data', (d) => {
c.write('HTTP/1.1 101\r\n')
c.write('hello: world\r\n')
c.write('connection: upgrade\r\n')
c.write('\r\n')
c.write('Body')
})
c.on('error', () => {
// Whether we get an error, end or close is undefined.
// Ignore error.
})
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
try {
await client.upgrade({
path: '/',
method: 'GET',
protocol: 'Websocket'
})
} catch (err) {
t.ok(err)
}
})
})
test('pool dispatch error', (t) => {
t.plan(3)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.tearDown(client.close.bind(client))
client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.strictEqual(statusCode, 200)
},
onData (chunk) {
},
onComplete () {
t.pass()
}
})
client.dispatch({
path: '/',
method: 'GET',
headers: {
'transfer-encoding': 'fail'
}
}, {
onConnect () {
t.fail()
},
onHeaders (statusCode, headers) {
t.fail()
},
onData (chunk) {
t.fail()
},
onError (err) {
t.strictEqual(err.code, 'UND_ERR_INVALID_ARG')
}
})
})
})
test('pool request abort in queue', (t) => {
t.plan(3)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.tearDown(client.close.bind(client))
client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.strictEqual(statusCode, 200)
},
onData (chunk) {
},
onComplete () {
t.pass()
}
})
const signal = new EE()
client.request({
path: '/',
method: 'GET',
signal
}, (err) => {
t.strictEqual(err.code, 'UND_ERR_ABORTED')
})
signal.emit('abort')
})
})
test('pool stream abort in queue', (t) => {
t.plan(3)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.tearDown(client.close.bind(client))
client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.strictEqual(statusCode, 200)
},
onData (chunk) {
},
onComplete () {
t.pass()
}
})
const signal = new EE()
client.stream({
path: '/',
method: 'GET',
signal
}, ({ body }) => body, (err) => {
t.strictEqual(err.code, 'UND_ERR_ABORTED')
})
signal.emit('abort')
})
})
test('pool pipeline abort in queue', (t) => {
t.plan(3)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.tearDown(client.close.bind(client))
client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.strictEqual(statusCode, 200)
},
onData (chunk) {
},
onComplete () {
t.pass()
}
})
const signal = new EE()
client.pipeline({
path: '/',
method: 'GET',
signal
}, ({ body }) => body).end().on('error', (err) => {
t.strictEqual(err.code, 'UND_ERR_ABORTED')
})
signal.emit('abort')
})
})
test('pool stream constructor error destroy body', (t) => {
t.plan(4)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.tearDown(client.close.bind(client))
{
const body = new Readable({
read () {
}
})
client.stream({
path: '/',
method: 'GET',
body,
headers: {
'transfer-encoding': 'fail'
}
}, () => {
t.fail()
}, (err) => {
t.strictEqual(err.code, 'UND_ERR_INVALID_ARG')
t.strictEqual(body.destroyed, true)
})
}
{
const body = new Readable({
read () {
}
})
client.stream({
path: '/',
method: 'CONNECT',
body
}, () => {
t.fail()
}, (err) => {
t.strictEqual(err.code, 'UND_ERR_INVALID_ARG')
t.strictEqual(body.destroyed, true)
})
}
})
})
test('pool request constructor error destroy body', (t) => {
t.plan(4)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.tearDown(client.close.bind(client))
{
const body = new Readable({
read () {
}
})
client.request({
path: '/',
method: 'GET',
body,
headers: {
'transfer-encoding': 'fail'
}
}, (err) => {
t.strictEqual(err.code, 'UND_ERR_INVALID_ARG')
t.strictEqual(body.destroyed, true)
})
}
{
const body = new Readable({
read () {
}
})
client.request({
path: '/',
method: 'CONNECT',
body
}, (err) => {
t.strictEqual(err.code, 'UND_ERR_INVALID_ARG')
t.strictEqual(body.destroyed, true)
})
}
})
})
test('pool close waits for all requests', (t) => {
t.plan(5)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.tearDown(client.destroy.bind(client))
client.request({
path: '/',
method: 'GET'
}, (err) => {
t.error(err)
})
client.request({
path: '/',
method: 'GET'
}, (err) => {
t.error(err)
})
client.close(() => {
t.pass()
})
client.close(() => {
t.pass()
})
client.request({
path: '/',
method: 'GET'
}, (err) => {
t.ok(err instanceof errors.ClientClosedError)
})
})
})
test('pool destroyed', (t) => {
t.plan(1)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.tearDown(client.destroy.bind(client))
client.destroy()
client.request({
path: '/',
method: 'GET'
}, (err) => {
t.ok(err instanceof errors.ClientDestroyedError)
})
})
})
test('pool destroy fails queued requests', (t) => {
t.plan(4)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.tearDown(client.destroy.bind(client))
const _err = new Error()
client.request({
path: '/',
method: 'GET'
}, (err) => {
t.strictEqual(err, _err)
})
client.request({
path: '/',
method: 'GET'
}, (err) => {
t.strictEqual(err, _err)
})
client.destroy(_err, () => {
t.pass()
})
client.request({
path: '/',
method: 'GET'
}, (err) => {
t.ok(err instanceof errors.ClientDestroyedError)
})
})
})

@@ -5,2 +5,3 @@ 'use strict'

const { Client, errors } = require('..')
const { kConnect } = require('../lib/core/symbols')
const { createServer } = require('http')

@@ -43,4 +44,4 @@ const EventEmitter = require('events')

test('request timeout immutable opts', (t) => {
t.plan(2)
test('Subsequent request starves', (t) => {
t.plan(3)

@@ -54,3 +55,3 @@ const clock = FakeTimers.install()

}, 100)
clock.tick(100)
clock.tick(50)
})

@@ -61,36 +62,13 @@ t.teardown(server.close.bind(server))

const client = new Client(`http://localhost:${server.address().port}`, {
requestTimeout: 50
pipelining: 2
})
t.teardown(client.destroy.bind(client))
const opts = { path: '/', method: 'GET' }
client.request(opts, (err, response) => {
t.ok(err instanceof errors.RequestTimeoutError)
t.strictEqual(opts.requestTimeout, undefined)
})
clock.tick(50)
})
})
test('Subsequent request starves', (t) => {
t.plan(2)
const clock = FakeTimers.install()
t.teardown(clock.uninstall.bind(clock))
const server = createServer((req, res) => {
setTimeout(() => {
res.end('hello')
}, 100)
clock.tick(50)
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET' }, (err, response) => {
t.error(err)
response.body
.resume()
.on('end', () => {
t.pass()
})
})

@@ -267,37 +245,2 @@

test('If a request starves, the server should never receive the request', (t) => {
t.plan(4)
const clock = FakeTimers.install()
t.teardown(clock.uninstall.bind(clock))
let count = 0
const server = createServer((req, res) => {
count += 1
t.is(count, 1)
setTimeout(() => {
res.end('hello')
}, 100)
clock.tick(50)
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET', requestTimeout: 50 }, (err, response) => {
t.ok(err instanceof errors.RequestTimeoutError)
})
client.request({ path: '/', method: 'GET', requestTimeout: 50 }, (err, response) => {
t.ok(err instanceof errors.RequestTimeoutError)
})
client.request({ path: '/', method: 'GET', requestTimeout: 50 }, (err, response) => {
t.ok(err instanceof errors.RequestTimeoutError)
})
})
})
test('Timeout with pipelining', (t) => {

@@ -350,6 +293,6 @@ t.plan(3)

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, { requestTimeout: 50 })
const client = new Client(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET' }, (err, response) => {
client.request({ path: '/', method: 'GET', requestTimeout: 50 }, (err, response) => {
t.ok(err instanceof errors.RequestTimeoutError)

@@ -377,3 +320,3 @@ })

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, { requestTimeout: 100 })
const client = new Client(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))

@@ -432,3 +375,7 @@

clock.tick(100)
client.on('connect', () => {
process.nextTick(() => {
clock.tick(100)
})
})
})

@@ -463,6 +410,6 @@ })

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, { requestTimeout: 0 })
const client = new Client(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET' }, (err, response) => {
client.request({ path: '/', method: 'GET', requestTimeout: 0 }, (err, response) => {
t.error(err)

@@ -560,3 +507,3 @@ const bufs = []

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, { requestTimeout: 0 })
const client = new Client(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))

@@ -641,3 +588,3 @@

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, { requestTimeout: 0 })
const client = new Client(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))

@@ -676,1 +623,35 @@

})
test('client.close should not deadlock', (t) => {
t.plan(2)
const clock = FakeTimers.install()
t.teardown(clock.uninstall.bind(clock))
const server = createServer((req, res) => {
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
socketTimeout: 200
})
t.teardown(client.destroy.bind(client))
client[kConnect](() => {
client.request({
path: '/',
method: 'GET',
requestTimeout: 100
}, (err, response) => {
t.ok(err instanceof errors.RequestTimeoutError)
})
client.close((err) => {
t.error(err)
})
clock.tick(100)
})
})
})

@@ -69,8 +69,7 @@ 'use strict'

const client = new Client(`http://localhost:${server.address().port}`, {
socketTimeout: 0,
requestTimeout: 0
socketTimeout: 0
})
t.tearDown(client.close.bind(client))
client.request({ path: '/', method: 'GET' }, (err, result) => {
client.request({ path: '/', method: 'GET', requestTimeout: 0 }, (err, result) => {
t.error(err)

@@ -77,0 +76,0 @@ const bufs = []

@@ -5,3 +5,3 @@ 'use strict'

const { Client } = require('..')
const { kSocket } = require('../lib/symbols')
const { kSocket } = require('../lib/core/symbols')
const { Readable } = require('stream')

@@ -8,0 +8,0 @@

@@ -19,3 +19,3 @@ 'use strict'

test('path', (t) => {
t.plan(6)
t.plan(4)

@@ -28,3 +28,2 @@ const client = new Client(url)

t.strictEqual(err.message, 'path must be a valid path')
t.strictEqual(res, null)
})

@@ -35,3 +34,2 @@

t.strictEqual(err.message, 'path must be a valid path')
t.strictEqual(res, null)
})

@@ -41,3 +39,3 @@ })

test('method', (t) => {
t.plan(3)
t.plan(2)

@@ -50,3 +48,2 @@ const client = new Client(url)

t.strictEqual(err.message, 'method must be a string')
t.strictEqual(res, null)
})

@@ -56,3 +53,3 @@ })

test('body', (t) => {
t.plan(6)
t.plan(4)

@@ -65,3 +62,2 @@ const client = new Client(url)

t.strictEqual(err.message, 'body must be a string, a Buffer or a Readable stream')
t.strictEqual(res, null)
})

@@ -72,5 +68,4 @@

t.strictEqual(err.message, 'body must be a string, a Buffer or a Readable stream')
t.strictEqual(res, null)
})
})
})

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc