Socket
Socket
Sign inDemoInstall

undici

Package Overview
Dependencies
Maintainers
2
Versions
212
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

undici - npm Package Compare versions

Comparing version 0.5.0 to 1.0.0

.github/workflows/nodejs.yml

2

index.js

@@ -5,2 +5,3 @@ 'use strict'

const Pool = require('./lib/pool')
const errors = require('./lib/errors')

@@ -13,3 +14,4 @@ function undici (url, opts) {

undici.Client = Client
undici.errors = errors
module.exports = undici

609

lib/client.js

@@ -1,204 +0,162 @@

'use strict'
const {
Readable,
Duplex,
PassThrough,
finished
} = require('stream')
const {
InvalidArgumentError,
InvalidReturnValueError,
RequestAbortedError
} = require('./errors')
const {
kEnqueue,
kResume
} = require('./symbols')
const ClientBase = require('./client-base')
const assert = require('assert')
/* eslint no-prototype-builtins: "off" */
const { URL } = require('url')
const net = require('net')
const tls = require('tls')
const Q = require('fastq')
const { HTTPParser } = require('http-parser-js')
const { Readable } = require('readable-stream')
const eos = require('end-of-stream')
const syncthrough = require('syncthrough')
const retimer = require('retimer')
const { EventEmitter } = require('events')
const Request = require('./request')
const kRead = Symbol('read')
const kReadCb = Symbol('readCallback')
const kIsWaiting = Symbol('isWaiting')
const kQueue = Symbol('queue')
const kCallbacks = Symbol('callbacks')
const kRequests = Symbol('requests')
const kTimer = Symbol('kTimer')
const kTLSOpts = Symbol('TLS Options')
function connect (client) {
var socket = null
var url = client.url
// the defaults port are needed because of the URL spec
if (url.protocol === 'https:') {
socket = tls.connect(url.port || 443, url.hostname, client[kTLSOpts])
} else {
socket = net.connect(url.port || 80, url.hostname)
}
client.socket = socket
// stop the queue and reset the parsing state
client[kQueue].pause()
client[kIsWaiting] = false
client._needHeaders = 0
client._lastBody = null
socket.on('connect', () => {
client[kQueue].resume()
})
socket.on('end', () => {
reconnect(client, new Error('other side closed - ended'))
})
socket.on('finish', () => {
reconnect(client, new Error('this side closed - finished'))
})
socket.on('error', reconnect.bind(undefined, client))
}
function reconnect (client, err) {
if (client.closed) {
// TODO what do we do with the error?
return
}
// reset events
client.socket.removeAllListeners('end')
client.socket.removeAllListeners('finish')
client.socket.removeAllListeners('error')
client.socket.on('error', () => {})
client.socket = null
// we reset the callbacks
const callbacks = client[kCallbacks]
client[kCallbacks] = []
client[kRequests] = []
if (client[kQueue].length() > 0) {
connect(client)
}
for (const cb of callbacks) {
cb(err, null)
}
}
class Client extends EventEmitter {
constructor (url, opts = {}) {
super()
if (!(url instanceof URL)) {
url = new URL(url)
class Client extends ClientBase {
request (opts, callback) {
if (callback === undefined) {
return new Promise((resolve, reject) => {
this.request(opts, (err, data) => {
return err ? reject(err) : resolve(data)
})
})
}
this.url = url
// state machine, might need more states
this.closed = false
this.parser = new HTTPParser(HTTPParser.RESPONSE)
this[kTLSOpts] = opts.tls || opts.https
const endRequest = () => {
this.socket.write('\r\n', 'ascii')
this.socket.uncork()
this._needHeaders++
this[kRead]()
if (typeof callback !== 'function') {
throw new InvalidArgumentError('invalid callback')
}
this.timeout = opts.timeout || 30000 // 30 seconds
this[kCallbacks] = []
this[kRequests] = []
const timerCb = () => {
if (this[kCallbacks].length > 0) {
this.socket.destroy(new Error('timeout'))
this[kTimer] = null
}
if (!opts || typeof opts !== 'object') {
process.nextTick(callback, new InvalidArgumentError('invalid opts'), null)
return
}
this[kQueue] = Q((request, cb) => {
if (this.closed) {
return cb(new Error('The client is closed'))
// TODO: Avoid closure due to callback capture.
this[kEnqueue](opts, function (err, data) {
if (err) {
callback(err, null)
return
}
if (this[kTimer]) {
this[kTimer].reschedule(this.timeout)
} else {
this[kTimer] = retimer(timerCb, this.timeout)
}
const {
statusCode,
headers,
opaque,
resume
} = data
var { method, path, body } = request
const headers = request.headers || {}
const reqArr = [
`${method} ${path} HTTP/1.1\r\nConnection: keep-alive\r\n`
]
const body = new Readable({
autoDestroy: true,
read: resume,
destroy (err, callback) {
if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
}
if (err) {
resume()
}
callback(err, null)
}
})
body.destroy = this.wrap(body, body.destroy)
// wrap the callback in a AsyncResource
cb = request.wrap(cb)
callback(null, {
statusCode,
headers,
opaque,
body
})
this[kRequests].push(request)
this[kCallbacks].push(cb)
this.socket.cork()
return this.wrap(body, function (err, chunk) {
if (this.destroyed) {
return null
} else if (err) {
this.destroy(err)
} else {
const ret = this.push(chunk)
return this.destroyed ? null : ret
}
})
})
}
if (!(headers.host || headers.Host)) {
reqArr.push('Host: ' + url.hostname + '\r\n')
}
const headerNames = Object.keys(headers)
for (let i = 0; i < headerNames.length; i++) {
const name = headerNames[i]
reqArr.push(name + ': ' + headers[name] + '\r\n')
}
pipeline (opts, handler) {
if (typeof handler !== 'function') {
return new PassThrough().destroy(new InvalidArgumentError('invalid handler'))
}
for (let i = 0; i < reqArr.length; i++) {
this.socket.write(reqArr[i], 'ascii')
const req = new Readable({
autoDestroy: true,
read () {
if (this[kResume]) {
const resume = this[kResume]
this[kResume] = null
resume()
}
},
destroy (err, callback) {
if (err) {
if (this[kResume]) {
const resume = this[kResume]
this[kResume] = null
resume(err)
} else if (!ret.destroyed) {
// Stop ret from scheduling more writes.
ret.destroy(err)
}
} else {
assert(this._readableState.endEmitted)
assert(!this[kResume])
}
callback(err)
}
})
let res
let body
if (typeof body === 'string' || body instanceof Uint8Array) {
if (headers.hasOwnProperty('content-length')) {
// we have already written the content-length header
this.socket.write('\r\n')
const ret = new Duplex({
autoDestroy: true,
read () {
if (body) {
body.resume()
}
},
write (chunk, encoding, callback) {
assert(!req.destroyed)
if (req.push(chunk, encoding)) {
callback()
} else {
this.socket.write(`content-length: ${Buffer.byteLength(body)}\r\n\r\n`, 'ascii')
req[kResume] = callback
}
this.socket.write(body)
} else if (body && typeof body.pipe === 'function') {
// TODO we should pause the queue while we are piping
if (headers.hasOwnProperty('content-length')) {
this.socket.write('\r\n', 'ascii')
body.pipe(this.socket, { end: false })
this.socket.uncork()
eos(body, (err) => {
if (err) {
// TODO we might want to wait before previous in-flight
// requests are finished before destroying
this.socket.destroy(err)
// needed because destroy will be delayed
setImmediate(cb, err, null)
return
}
},
final (callback) {
req.push(null)
callback()
},
destroy (err, callback) {
if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
}
if (!req.destroyed) {
req.destroy(err)
}
if (res && !res.destroyed) {
res.destroy(err)
}
callback(err)
}
})
endRequest()
})
} else {
this.socket.write('transfer-encoding: chunked\r\n', 'ascii')
var through = syncthrough(addTransferEncoding)
body.pipe(through)
through.pipe(this.socket, { end: false })
this.socket.uncork()
eos(body, (err) => {
if (err) {
// TODO we might want to wait before previous in-flight
// requests are finished before destroying
this.socket.destroy(err)
// needed because destroy will be delayed
setImmediate(cb, err, null)
return
}
// TODO: Avoid copy.
opts = { ...opts, body: req }
this.socket.cork()
this.socket.write('\r\n0\r\n', 'ascii')
endRequest()
})
this[kEnqueue](opts, function (err, data) {
if (err) {
if (!ret.destroyed) {
ret.destroy(err)
}

@@ -208,91 +166,95 @@ return

endRequest()
})
const {
statusCode,
headers,
opaque,
resume
} = data
this.pipelining = opts.pipelining || 1
res = new Readable({
autoDestroy: true,
read: resume,
destroy (err, callback) {
if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
}
if (err) {
if (!ret.destroyed) {
ret.destroy(err)
}
resume()
}
callback(err, null)
}
})
res.destroy = this.wrap(res, res.destroy)
this[kQueue].drain = () => {
this.emit('drain')
}
// TODO: Should this somehow be wrapped earlier?
ret.destroy = this.wrap(ret, ret.destroy)
this.parser[HTTPParser.kOnHeaders] = () => {}
this.parser[HTTPParser.kOnHeadersComplete] = ({ statusCode, headers }) => {
// TODO move this[kCallbacks] from being an array. The array allocation
// is showing up in the flamegraph.
const cb = this[kCallbacks].shift()
const request = this[kRequests].shift()
const skipBody = request.method === 'HEAD'
if (!skipBody) {
this._lastBody = new Readable({ read: this[kRead].bind(this) })
this._lastBody.push = request.wrapSimple(this._lastBody, this._lastBody.push)
try {
body = handler({
statusCode,
headers,
opaque,
body: res
})
} catch (err) {
if (!ret.destroyed) {
ret.destroy(err)
}
return
}
cb(null, {
statusCode,
headers: parseHeaders(headers),
body: this._lastBody
})
if (this.closed && this[kQueue].length() === 0) {
this.destroy()
// TODO: Should we allow !body?
if (!body || typeof body.pipe !== 'function') {
if (!ret.destroyed) {
ret.destroy(new InvalidReturnValueError('expected Readable'))
}
return
}
return skipBody
}
// TODO: If body === res then avoid intermediate
// and write directly to ret.push? Or should this
// happen when body is null?
this.parser[HTTPParser.kOnBody] = (chunk, offset, length) => {
this._lastBody.push(chunk.slice(offset, offset + length))
}
body
.on('data', function (chunk) {
if (!ret.push(chunk)) {
this.pause()
}
})
.on('error', function (err) {
if (!ret.destroyed) {
ret.destroy(err)
}
})
.on('end', function () {
ret.push(null)
})
.on('close', function () {
if (!this._readableState.endEmitted && !ret.destroyed) {
ret.destroy(new RequestAbortedError())
}
})
this.parser[HTTPParser.kOnMessageComplete] = () => {
const body = this._lastBody
this._lastBody = null
if (body !== null) {
body.push(null)
}
}
return this.wrap(res, function (err, chunk) {
if (this.destroyed) {
return null
} else if (err) {
this.destroy(err)
} else {
const ret = this.push(chunk)
return this.destroyed ? null : ret
}
})
})
this[kReadCb] = () => {
this[kIsWaiting] = false
this[kRead]()
}
return ret
}
get pipelining () {
return this[kQueue].concurrency
}
set pipelining (v) {
this[kQueue].concurrency = v
}
get full () {
// TODO q.length is slowish, optimize
return this[kQueue].length() > this.pipelining
}
[kRead] () {
var socket = this.socket
if (!socket) {
// TODO this should not happen
return
}
var chunk = null
var hasRead = false
while ((chunk = socket.read()) !== null) {
hasRead = true
this.parser.execute(chunk)
}
if (!this[kIsWaiting] && (!hasRead || this._needHeaders > 0)) {
this[kIsWaiting] = true
socket.once('readable', this[kReadCb])
}
}
request (opts, cb) {
if (cb === undefined) {
stream (opts, factory, callback) {
if (callback === undefined) {
return new Promise((resolve, reject) => {
this.request(opts, (err, data) => {
this.stream(opts, factory, (err, data) => {
return err ? reject(err) : resolve(data)

@@ -303,74 +265,87 @@ })

if (this.closed) {
process.nextTick(cb, new Error('The client is closed'))
return false
if (typeof callback !== 'function') {
throw new InvalidArgumentError('invalid callback')
}
if (!this.socket) {
connect(this)
if (!opts || typeof opts !== 'object') {
process.nextTick(callback, new InvalidArgumentError('invalid opts'), null)
return
}
try {
const req = new Request(opts)
this[kQueue].push(req, cb)
} catch (err) {
process.nextTick(cb, err, null)
if (typeof factory !== 'function') {
process.nextTick(callback, new InvalidArgumentError('invalid factory'), null)
return
}
return !this.full
}
// TODO: Avoid closure due to callback capture.
this[kEnqueue](opts, function (err, data) {
if (err) {
callback(err)
return
}
close () {
if (this[kTimer]) {
this[kTimer].clear()
this[kTimer] = null
}
this.closed = true
const {
statusCode,
headers,
opaque,
resume
} = data
// TODO test this
if (this[kQueue].length() === 0 && this.socket) {
this.socket.end()
this.socket = null
}
}
let body
try {
body = factory({
statusCode,
headers,
opaque
})
} catch (err) {
callback(err, null)
return
}
destroy (err) {
if (this[kTimer]) {
this[kTimer].clear()
this[kTimer] = null
}
this.closed = true
if (this.socket) {
// TODO make sure we error everything that
// is in flight
this.socket.destroy(err)
this.socket = null
}
}
}
if (!body) {
callback(null, null)
return
}
function parseHeaders (headers) {
const obj = {}
for (var i = 0; i < headers.length; i += 2) {
var key = headers[i]
var val = obj[key]
if (!val) {
obj[key] = headers[i + 1]
} else {
if (!Array.isArray(val)) {
val = [val]
obj[key] = val
if (
typeof body.write !== 'function' ||
typeof body.destroy !== 'function' ||
typeof body.destroyed !== 'boolean'
) {
callback(new InvalidReturnValueError('expected Writable'), null)
return
}
val.push(headers[i + 1])
}
body.on('drain', resume)
// TODO: Avoid finished. It registers an unecessary amount of listeners.
finished(body, { readable: false }, (err) => {
body.removeListener('drain', resume)
if (err) {
if (!body.destroyed) {
body.destroy(err)
}
resume()
}
callback(err, null)
})
body.destroy = this.wrap(body, body.destroy)
return this.wrap(body, function (err, chunk) {
if (this.destroyed) {
return null
} else if (err) {
this.destroy(err)
} else if (chunk == null) {
this.end()
} else {
const ret = this.write(chunk)
return this.destroyed ? null : ret
}
})
})
}
return obj
}
function addTransferEncoding (chunk) {
var toWrite = '\r\n' + Buffer.byteLength(chunk).toString(16) + '\r\n'
this.push(toWrite)
return chunk
}
module.exports = Client
'use strict'
const Client = require('./client')
const current = Symbol('current')
const {
InvalidArgumentError
} = require('./errors')
const {
kClients
} = require('./symbols')
class Pool {
constructor (url, opts = {}) {
let {
connections,
constructor (url, {
connections,
maxAbortedPayload,
socketTimeout,
requestTimeout,
pipelining,
tls
} = {}) {
if (connections != null && (!Number.isFinite(connections) || connections <= 0)) {
throw new InvalidArgumentError('invalid connections')
}
this[kClients] = Array.from({
length: connections || 10
}, () => new Client(url, {
maxAbortedPayload,
socketTimeout,
requestTimeout,
pipelining,
timeout
} = opts
connections = connections || 10
pipelining = pipelining || 1
this.clients = Array.from({
length: connections
}, x => new Client(url, {
pipelining,
timeout
tls
}))
}
for (const client of this.clients) {
client.on('drain', onDrain)
stream (opts, factory, cb) {
// needed because we need the return value from client.stream
if (cb === undefined) {
return new Promise((resolve, reject) => {
this.stream(opts, factory, (err, data) => {
return err ? reject(err) : resolve(data)
})
})
}
this.drained = []
this[current] = null
getNext(this).stream(opts, factory, cb)
}
const that = this
function onDrain () {
// this is the client
that.drained.push(this)
}
pipeline (opts, handler) {
return getNext(this).pipeline(opts, handler)
}

@@ -46,34 +62,52 @@

if (this[current] === null) {
if (this.drained.length > 0) {
// LIFO QUEUE
// we use the last one that drained, because that's the one
// that is more probable to have an alive socket
this[current] = this.drained.pop()
} else {
// if no one drained recently, let's just pick one randomly
this[current] = this.clients[Math.floor(Math.random() * this.clients.length)]
}
getNext(this).request(opts, cb)
}
close (cb) {
const promise = Promise.all(this[kClients].map(c => c.close()))
if (cb) {
promise.then(() => cb(null, null), (err) => cb(err, null))
} else {
return promise
}
}
const writeMore = this[current].request(opts, cb)
destroy (err, cb) {
if (typeof err === 'function') {
cb = err
err = null
}
if (!writeMore) {
this[current] = null
const promise = Promise.all(this[kClients].map(c => c.destroy(err)))
if (cb) {
promise.then(() => cb(null, null))
} else {
return promise
}
}
}
close () {
for (const client of this.clients) {
client.close()
function getNext (pool) {
let next
for (const client of pool[kClients]) {
if (client.full) {
continue
}
}
destroy () {
for (const client of this.clients) {
client.destroy()
if (!next) {
next = client
}
if (client.connected) {
return client
}
}
if (next) {
return next
}
return pool[kClients][Math.floor(Math.random() * pool[kClients].length)]
}
module.exports = Pool
'use strict'
const { AsyncResource } = require('async_hooks')
const {
InvalidArgumentError,
RequestAbortedError,
RequestTimeoutError
} = require('./errors')
const EE = require('events')
const assert = require('assert')

@@ -46,7 +53,4 @@ const methods = [

function isValidBody (body) {
if (!body) {
return true
}
return body instanceof Buffer ||
return body == null ||
body instanceof Uint8Array ||
typeof body === 'string' ||

@@ -57,58 +61,143 @@ typeof body.pipe === 'function'

class Request extends AsyncResource {
constructor (opts) {
constructor ({
path,
method,
body,
headers,
idempotent,
opaque,
signal,
requestTimeout
}, callback) {
super('UNDICI_REQ')
if (!opts) {
throw new Error('no options passed')
assert(typeof callback === 'function')
if (typeof path !== 'string' || path[0] !== '/') {
throw new InvalidArgumentError('path must be a valid path')
}
if (!(typeof opts.path === 'string' && opts.path[0] === '/')) {
throw new Error('path must be a valid path')
if (typeof method !== 'string' || !methods[method]) {
throw new InvalidArgumentError('method must be a valid method')
}
this.method = opts.method
if (!(typeof opts.method === 'string' && methods[opts.method])) {
throw new Error('method must be a valid method')
if (requestTimeout != null && (!Number.isInteger(requestTimeout) || requestTimeout < 0)) {
throw new InvalidArgumentError('requestTimeout must be a positive integer or zero')
}
this.path = opts.path
// TODO we should validate that the http method accepts a body or not
if (!isValidBody(opts.body)) {
throw new Error('body must be a string, a Buffer or a Readable stream')
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must implement .on(name, callback)')
}
this.body = opts.body
// should we validate the headers?
this.headers = opts.headers
}
if (!isValidBody(body)) {
throw new InvalidArgumentError('body must be a string, a Buffer or a Readable stream')
}
wrap (cb) {
// happy path for Node 10+
if (this.runInAsyncScope) {
return this.runInAsyncScope.bind(this, cb, undefined)
this.timeout = null
this.signal = null
this.method = method
this.path = path
this.streaming = body && typeof body.pipe === 'function'
this.body = typeof body === 'string'
? Buffer.from(body)
: body
this.host = headers && Boolean(headers.host || headers.Host)
this.chunked = !headers || headers['content-length'] === undefined
this.callback = callback
this.opaque = opaque
this.idempotent = idempotent == null
? method === 'HEAD' || method === 'GET'
: idempotent
{
// TODO (perf): Build directy into buffer instead of
// using an intermediate string.
let headersStr = ''
if (headers) {
const headerNames = Object.keys(headers)
for (let i = 0; i < headerNames.length; i++) {
const name = headerNames[i]
headersStr += name + ': ' + headers[name] + '\r\n'
}
}
if (this.body && this.chunked && !this.streaming) {
headersStr += `content-length: ${Buffer.byteLength(this.body)}\r\n`
}
this.headers = headersStr ? Buffer.from(headersStr, 'ascii') : null
}
// old API for Node 8
return (err, val) => {
this.emitBefore()
cb(err, val)
this.emitAfter()
if (signal) {
/* istanbul ignore else: can't happen but kept in case of refactoring */
if (!this.signal) {
this.signal = new EE()
}
const onAbort = () => {
this.signal.emit('error', new RequestAbortedError())
}
if ('addEventListener' in signal) {
signal.addEventListener('abort', onAbort)
} else {
signal.once('abort', onAbort)
}
}
}
wrapSimple (that, cb) {
// happy path for Node 10+
if (this.runInAsyncScope) {
return this.runInAsyncScope.bind(this, cb, that)
if (requestTimeout) {
if (!this.signal) {
this.signal = new EE()
}
const onTimeout = () => {
this.signal.emit('error', new RequestTimeoutError())
}
this.timeout = setTimeout(onTimeout, requestTimeout)
}
// old API for Node 8
return (a) => {
this.emitBefore()
cb.call(that, a)
this.emitAfter()
if (this.signal) {
this.signal.on('error', (err) => {
assert(err)
this.invoke(err, null)
})
}
}
wrap (that, cb) {
return this.runInAsyncScope.bind(this, cb, that)
}
invoke (err, val) {
const { callback } = this
if (!callback) {
return
}
clearTimeout(this.timeout)
this.timeout = null
this.path = null
this.body = null
this.callback = null
this.opaque = null
this.headers = null
return this.runInAsyncScope(callback, this, err, val)
}
}
module.exports = Request
{
"name": "undici",
"version": "0.5.0",
"version": "1.0.0",
"description": "An HTTP/1.1 client, written from scratch for Node.js",
"main": "index.js",
"scripts": {
"test": "standard | snazzy && tap test/*.js"
"lint": "standard | snazzy",
"test": "tap test/*.js --no-coverage",
"coverage": "standard | snazzy && tap test/*.js"
},

@@ -14,2 +16,9 @@ "repository": {

"author": "Matteo Collina <hello@matteocollina.com>",
"contributors": [
{
"name": "Robert Nagy",
"url": "https://github.com/ronag",
"author": true
}
],
"license": "MIT",

@@ -21,2 +30,5 @@ "bugs": {

"devDependencies": {
"@sinonjs/fake-timers": "^6.0.1",
"abort-controller": "^3.0.0",
"benchmark": "^2.1.4",
"https-pem": "^2.0.0",

@@ -30,9 +42,7 @@ "pre-commit": "^1.2.2",

"dependencies": {
"end-of-stream": "^1.4.1",
"fastq": "^1.6.0",
"readable-stream": "^3.0.0",
"http-parser-js": "^0.5.2",
"retimer": "^2.0.0",
"syncthrough": "^0.5.0"
}
"http-parser-js": "^0.5.2"
},
"pre-commit": [
"coverage"
]
}
# undici
[![Build
Status](https://travis-ci.com/mcollina/undici.svg?branch=master)](https://travis-ci.com/mcollina/undici)
![Node CI](https://github.com/mcollina/undici/workflows/Node%20CI/badge.svg) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat)](http://standardjs.com/)

@@ -21,6 +20,20 @@ An HTTP/1.1 client, written from scratch for Node.js.

## Benchmarks
Machine: 2.7 GHz Quad-Core Intel Core i7<br/>
Configuration: Node v14.2, HTTP/1.1 without TLS, 100 connections
```
http - keepalive - pipe x 6,545 ops/sec ±12.47% (64 runs sampled)
undici - pipeline - pipe x 9,560 ops/sec ±3.68% (77 runs sampled)
undici - request - pipe x 9,797 ops/sec ±6.80% (77 runs sampled)
undici - stream - pipe x 11,599 ops/sec ±0.89% (78 runs sampled)
```
The benchmark is a simple `hello world` [example](benchmarks/index.js).
## API
<a name='client'></a>
### new undici.Client(url, opts)
### `new undici.Client(url, opts)`

@@ -35,12 +48,26 @@ A basic HTTP/1.1 client, mapped on top a single TCP/TLS connection.

- `timeout`, the timeout after which a request will time out, in
milliseconds. Default:
`30000` milliseconds.
- `socketTimeout`, the timeout after which a socket will time out, in
milliseconds. Monitors time between activity on a connected socket.
Use `0` to disable it entirely. Default: `30e3` milliseconds (30s).
- `requestTimeout`, the timeout after which a request will time out, in
milliseconds. Monitors time between request being enqueued and receiving
a response. Use `0` to disable it entirely.
Default: `30e3` milliseconds (30s).
- `maxAbortedPayload`, the maximum number of bytes read after which an
aborted response will close the connection. Closing the connection
will error other inflight requests in the pipeline.
Default: `1e6` bytes (1MiB).
- `pipelining`, the amount of concurrent requests to be sent over the
single TCP/TLS connection according to
[RFC7230](https://tools.ietf.org/html/rfc7230#section-6.3.2). Default: `1`.
[RFC7230](https://tools.ietf.org/html/rfc7230#section-6.3.2).
Default: `1`.
- `tls`, an options object which in the case of `https` will be passed to
[`tls.connect`](https://nodejs.org/api/tls.html#tls_tls_connect_options_callback).
<a name='request'></a>
#### `client.request(opts, cb(err, data))`
#### `client.request(opts, callback(err, data))`

@@ -53,4 +80,13 @@ Performs an HTTP request.

* `method`
* `body`, it can be a `String`, a `Buffer` or a `stream.Readable`.
* `body`, it can be a `String`, a `Buffer`, `Uint8Array` or a `stream.Readable`.
* `headers`, an object with header-value pairs.
* `signal`, either an `AbortController` or an `EventEmitter`.
* `requestTimeout`, the timeout after which a request will time out, in
milliseconds. Monitors time between request being enqueued and receiving
a response. Use `0` to disable it entirely.
Default: `30e3` milliseconds (30s).
* `idempotent`, whether the requests can be safely retried or not.
If `false` the request won't be sent until all preceeding
requests in the pipeline has completed.
Default: `true` if `method` is `HEAD` or `GET`.

@@ -71,9 +107,14 @@ Headers are represented by an object like this:

The `data` parameter in the callback is defined as follow:
The `data` parameter in `callback` is defined as follow:
* `statusCode`
* `headers`
* `body`, a `stream.Readable` with the body to read. A user **must**
call `data.body.resume()`
* `body`, a `stream.Readable` with the body to read. A user **must**
either fully consume or destroy the body unless there is an error, or no further requests
will be processed.
`headers` is an object where all keys have been lowercased.
Returns a promise if no callback is provided.
Example:

@@ -110,57 +151,371 @@

Promises and async await are supported as well!
Non-idempotent requests will not be pipelined in order
to avoid indirect failures.
Idempotent requests will be automatically retried if
they fail due to indirect failure from the request
at the head of the pipeline. This does not apply to
idempotent requests with a stream request body.
##### Aborting a request
A request can may be aborted using either an `AbortController` or an `EventEmitter`.
To use `AbortController`, you will need to `npm i abort-controller`.
```js
const { statusCode, headers, body } = await client.request({
const { AbortController } = require('abort-controller')
const { Client } = require('undici')
const client = new Client'http://localhost:3000')
const abortController = new AbortController()
client.request({
path: '/',
method: 'GET'
method: 'GET',
signal: abortController.signal
}, function (err, data) {
console.log(err) // RequestAbortedError
client.close()
})
abortController.abort()
```
#### `client.pipelining`
Property to set the pipelining factor.
Alternatively, any `EventEmitter` that emits an `'abort'` event may be used as an abort controller:
#### `client.full`
```js
const EventEmitter = require('events')
const { Client } = require('undici')
True if the number of requests waiting to be sent is greater
than the pipelining factor. Keeping a client full ensures that once the
inflight set of requests finishes there is a full batch ready to go.
const client = new Client'http://localhost:3000')
const ee = new EventEmitter()
client.request({
path: '/',
method: 'GET',
signal: ee
}, function (err, data) {
console.log(err) // RequestAbortedError
client.close()
})
ee.emit('abort')
```
Destroying the request or response body will have the same effect.
<a name='stream'></a>
#### `client.stream(opts, factory(data), callback(err))`
A faster version of [`request`][request].
Unlike [`request`][request] this method expects `factory`
to return a [`Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable) which the response will be
written to. This improves performance by avoiding
creating an intermediate [`Readable`](https://nodejs.org/api/stream.html#stream_readable_streams) when the user
expects to directly pipe the response body to a
[`Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable).
Options:
* ... same as [`client.request(opts, callback)`][request].
* `opaque`, passed as `opaque` to `factory`. Used
to avoid creating a closure.
The `data` parameter in `factory` is defined as follow:
* `statusCode`
* `headers`
* `opaque`
`headers` is an object where all keys have been lowercased.
Returns a promise if no callback is provided.
```js
const { Client } = require('undici')
const client = new Client(`http://localhost:3000`)
const fs = require('fs')
client.stream({
path: '/',
method: 'GET',
opaque: filename
}, ({ statusCode, headers, opaque: filename }) => {
console.log('response received', statusCode)
console.log('headers', headers)
return fs.createWriteStream(filename)
}, (err) => {
if (err) {
console.error('failure', err)
} else {
console.log('success')
}
})
```
`opaque` makes it possible to avoid creating a closure
for the `factory` method:
```js
function (req, res) {
return client.stream({ ...opts, opaque: res }, proxy)
}
```
Instead of:
```js
function (req, res) {
return client.stream(opts, (data) => {
// Creates closure to capture `res`.
proxy({ ...data, opaque: res })
}
}
```
<a name='pipeline'></a>
#### `client.pipeline(opts, handler(data))`
For easy use with [`stream.pipeline`](https://nodejs.org/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback).
Options:
* ... same as [`client.request(opts, callback)`][request].
* `opaque`, passed as `opaque` to `handler`. Used
to avoid creating a closure.
The `data` parameter in `handler` is defined as follow:
* `statusCode`
* `headers`
* `opaque`
* `body`, a `stream.Readable` with the body to read. A user **must**
either fully consume or destroy the body unless there is an error, or no further requests
will be processed.
`handler` should return a [`Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable) to which the response will be
written to. Usually it should just return the `body` argument unless
some kind of transformation needs to be performed based on e.g.
`headers` or `statusCode`.
`headers` is an object where all keys have been lowercased.
The `handler` should validate the response and save any
required state. If there is an error it should be thrown.
Returns a `Duplex` which writes to the request and reads from
the response.
```js
const { Client } = require('undici')
const client = new Client(`http://localhost:3000`)
const fs = require('fs')
const stream = require('stream')
stream.pipeline(
fs.createReadStream('source.raw'),
client.pipeline({
path: '/',
method: 'PUT',
}, ({ statusCode, headers, body }) => {
if (statusCode !== 201) {
throw new Error('invalid response')
}
if (isZipped(headers)) {
return pipeline(body, unzip(), () => {})
}
return body
}),
fs.createReadStream('response.raw'),
(err) => {
if (err) {
console.error('failed')
} else {
console.log('succeeded')
}
}
)
```
<a name='close'></a>
#### `client.close()`
#### `client.close([callback])`
Close the client as soon as all the enqueued requests are completed
Closes the client and gracefully waits fo enqueued requests to
complete before invoking the callback.
Returns a promise if no callback is provided.
<a name='destroy'></a>
#### `client.destroy(err)`
#### `client.destroy([err][, callback])`
Destroy the client abruptly with the given `err`. All the current and
enqueued requests will error.
Destroy the client abruptly with the given `err`. All the pending and running
requests will be aborted and error. Waits until socket is closed before
invoking the callback.
Returns a promise if no callback is provided.
#### `client.pipelining`
Property to get and set the pipelining factor.
#### `client.pending`
Number of queued requests.
#### `client.running`
Number of inflight requests.
#### `client.size`
Number of pending and running requests.
#### `client.connected`
True if the client has an active connection. The client will lazily
create a connection when it receives a request and will destroy it
if there is no activity for the duration of the `timeout` value.
#### `client.full`
True if `client.size` is greater than the `client.pipelining` factor.
Keeping a client full ensures that once a inflight requests finishes
the the pipeline will schedule new one and keep the pipeline saturated.
#### `client.closed`
True after `client.close()` has been called.
#### `client.destroyed`
True after `client.destroyed()` has been called or `client.close()` has been
called and the client shutdown has completed.
#### Events
* `'drain'`, emitted when the queue is empty.
* `'connect'`, emitted when a socket has been created and
connected. The client will connect once `client.size > 0`.
### undici.Pool
* `'disconnect'`, emitted when socket has disconnected. The
first argument of the event is the error which caused the
socket to disconnect. The client will reconnect if or once
`client.size > 0`.
<a name='pool'></a>
### `new undici.Pool(url, opts)`
A pool of [`Client`][] connected to the same upstream target.
A pool creates a fixed number of [`Client`][]
Options:
* `connections`, the number of clients to create. Default `100`.
* `pipelining`, the pipelining factor. Default `1`.
* `timeout`, the timeout for each request. Default `30000` milliseconds.
* ... same as [`Client`][].
* `connections`, the number of clients to create.
Default `100`.
#### `pool.request(req, cb)`
#### `pool.request(opts, callback)`
Calls [`client.request(req, cb)`][request] on one of the clients.
Calls [`client.request(opts, callback)`][request] on one of the clients.
#### `pool.close()`
#### `pool.stream(opts, factory, callback)`
Calls [`client.close()`](#close) on all the clients.
Calls [`client.stream(opts, factory, callback)`][stream] on one of the clients.
#### `pool.destroy()`
#### `pool.pipeline(opts, handler)`
Calls [`client.destroy()`](#destroy) on all the clients.
Calls [`client.pipeline(opts, handler)`][pipeline] on one of the clients.
#### `pool.close([callback])`
Calls [`client.close(callback)`](#close) on all the clients.
#### `pool.destroy([err][, callback])`
Calls [`client.destroy(err, callback)`](#destroy) on all the clients.
<a name='errors'></a>
### `undici.errors`
Undici exposes a variety of error objects that you can use to enhance your error handling.
You can find all the error objects inside the `errors` key.
```js
const { errors } = require('undici')
```
| Error | Error Codes | Description |
| --------------------------|-----------------------------------|------------------------------------------------|
| `InvalidArgumentError` | `UND_ERR_INVALID_ARG` | passed an invalid argument. |
| `InvalidReturnValueError` | `UND_ERR_INVALID_RETURN_VALUE` | returned an invalid value. |
| `SocketTimeoutError` | `UND_ERR_SOCKET_TIMEOUT` | a socket exceeds the `socketTimeout` option. |
| `RequestTimeoutError` | `UND_ERR_REQUEST_TIMEOUT` | a request exceeds the `requestTimeout` option. |
| `RequestAbortedError` | `UND_ERR_ABORTED` | the request has been aborted by the user |
| `ClientDestroyedError` | `UND_ERR_DESTROYED` | trying to use a destroyed client. |
| `ClientClosedError` | `UND_ERR_CLOSED` | trying to use a closed client. |
| `SocketError` | `UND_ERR_SOCKET` | there is an error with the socket. |
| `NotSupportedError` | `UND_ERR_NOT_SUPPORTED` | encountered unsupported functionality. |
## Specification Compliance
This section documents parts of the HTTP/1.1 specification which Undici does
not support or does not fully implement.
### Informational Responses
Undici does not support 1xx informational responses and will either
ignore or error them.
#### Expect
Undici does not support the `Expect` request header field. The request
body is always immediately sent and the `100 Continue` response will be
ignored.
Refs: https://tools.ietf.org/html/rfc7231#section-5.1.1
#### Upgrade
Undici does not support the the `Upgrade` request header field. A
`101 Switching Protocols` response will cause an `UND_ERR_NOT_SUPPORTED` error.
Refs: https://tools.ietf.org/html/rfc7230#section-6.7
#### Hints
Undici does not support early hints. A `103 Early Hint` response will
be ignored.
Refs: https://tools.ietf.org/html/rfc8297
### Trailer
Undici does not support the the `Trailer` response header field. Any response
trailer headers will be ignored.
Refs: https://tools.ietf.org/html/rfc7230#section-4.4
### Pipelining
Uncidi will only use pipelining if configured with a `pipelining` factor
greater than `1`.
Undici always assumes that connections are persistent and will immediatly
pipeline requests, without checking whether the connection is persistent.
Hence, automatic fallback to HTTP/1.0 or HTTP/1.1 without pipelining is
not supported.
Undici will immediately pipeline when retrying requests afters a failed
connection. However, Undici will not retry the first remaining requests in
the prior pipeline and instead error the corresponding callback/promise/stream.
Refs: https://tools.ietf.org/html/rfc2616#section-8.1.2.2<br/>
Refs: https://tools.ietf.org/html/rfc7230#section-6.3.2
## Collaborators
* [__Robert Nagy__](https://github.com/ronag), <https://www.npmjs.com/~ronag>
## License

@@ -172,1 +527,3 @@

[request]: #request
[stream]: #stream
[pipeline]: #pipeline
'use strict'
const t = require('tap')
const { test } = require('tap')
const { Client } = require('..')

@@ -8,2 +8,3 @@ const { createServer } = require('http')

const { readFile } = require('fs')
const { PassThrough } = require('stream')

@@ -35,67 +36,191 @@ const transactions = new Map()

t.plan(16)
test('async hooks', (t) => {
t.plan(31)
const server = createServer((req, res) => {
res.setHeader('content-type', 'text/plain')
readFile(__filename, (err, buf) => {
t.error(err)
const buf1 = buf.slice(0, buf.length / 2)
const buf2 = buf.slice(buf.length / 2)
// we split the file so that it's received in 2 chunks
// and it should restore the state on the second
res.write(buf1)
setTimeout(() => {
res.end(buf2)
}, 10)
const server = createServer((req, res) => {
res.setHeader('content-type', 'text/plain')
readFile(__filename, (err, buf) => {
t.error(err)
const buf1 = buf.slice(0, buf.length / 2)
const buf2 = buf.slice(buf.length / 2)
// we split the file so that it's received in 2 chunks
// and it should restore the state on the second
res.write(buf1)
setTimeout(() => {
res.end(buf2)
}, 10)
})
})
})
t.tearDown(server.close.bind(server))
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
body.resume()
t.strictDeepEqual(getCurrentTransaction(), null)
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
body.resume()
t.strictDeepEqual(getCurrentTransaction(), null)
setCurrentTransaction({ hello: 'world2' })
setCurrentTransaction({ hello: 'world2' })
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })
body.once('data', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })
body.resume()
})
body.on('end', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })
})
})
})
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })
body.resume()
t.strictDeepEqual(getCurrentTransaction(), null)
body.once('data', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })
body.resume()
setCurrentTransaction({ hello: 'world' })
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
body.once('data', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
body.resume()
})
body.on('end', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
})
})
})
body.on('end', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })
client.request({ path: '/', method: 'HEAD' }, (err, { statusCode, headers, body }) => {
t.error(err)
body.resume()
t.strictDeepEqual(getCurrentTransaction(), null)
setCurrentTransaction({ hello: 'world' })
client.request({ path: '/', method: 'HEAD' }, (err, { statusCode, headers, body }) => {
t.error(err)
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
body.once('data', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
body.resume()
})
body.on('end', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
})
})
})
client.stream({ path: '/', method: 'GET' }, () => {
t.strictDeepEqual(getCurrentTransaction(), null)
return new PassThrough().resume()
}, (err) => {
t.error(err)
t.strictDeepEqual(getCurrentTransaction(), null)
setCurrentTransaction({ hello: 'world' })
client.stream({ path: '/', method: 'GET' }, () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
return new PassThrough().resume()
}, (err) => {
t.error(err)
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
})
})
})
})
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
body.resume()
t.strictDeepEqual(getCurrentTransaction(), null)
test('async hooks client is destroyed', (t) => {
t.plan(7)
setCurrentTransaction({ hello: 'world' })
const server = createServer((req, res) => {
res.setHeader('content-type', 'text/plain')
readFile(__filename, (err, buf) => {
t.error(err)
const buf1 = buf.slice(0, buf.length / 2)
const buf2 = buf.slice(buf.length / 2)
// we split the file so that it's received in 2 chunks
// and it should restore the state on the second
res.write(buf1)
setTimeout(() => {
res.end(buf2)
}, 10)
})
})
t.tearDown(server.close.bind(server))
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET' }, (err, { body }) => {
t.error(err)
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
body.resume()
body.on('error', (err) => {
t.ok(err)
})
t.strictDeepEqual(getCurrentTransaction(), null)
body.once('data', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
body.resume()
setCurrentTransaction({ hello: 'world2' })
client.request({ path: '/', method: 'GET' }, (err) => {
t.strictEqual(err.message, 'The client is destroyed')
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })
})
client.destroy((err) => {
t.error(err)
})
})
})
})
body.on('end', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
test('async hooks error and close', (t) => {
t.plan(6)
const server = createServer((req, res) => {
res.write('asd')
setImmediate(() => {
res.destroy()
})
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
client.request({ path: '/', method: 'GET' }, (err, { body }) => {
t.error(err)
body.resume()
body.on('error', (err) => {
t.ok(err)
})
t.strictDeepEqual(getCurrentTransaction(), null)
setCurrentTransaction({ hello: 'world2' })
client.request({ path: '/', method: 'GET' }, (err, data) => {
t.error(err)
data.body.on('error', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })
})
data.body.on('close', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })
})
})
})
})
})
'use strict'
const { test } = require('tap')
const { Client } = require('..')
const { Client, errors } = require('..')
const { createServer } = require('http')
const { Readable } = require('readable-stream')
const net = require('net')
const { Readable } = require('stream')
const {
kParser,
kSocket,
kEnqueue
} = require('../lib/symbols')
test('GET errors and reconnect with pipelining 1', (t) => {

@@ -30,5 +37,5 @@ t.plan(9)

})
t.tearDown(client.close.bind(client))
t.tearDown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET' }, (err, data) => {
client.request({ path: '/', method: 'GET', idempotent: false }, (err, data) => {
t.ok(err instanceof Error) // we are expecting an error

@@ -82,7 +89,7 @@ t.strictEqual(null, data)

})
t.tearDown(client.close.bind(client))
t.tearDown(client.destroy.bind(client))
// all of these will error
for (let i = 0; i < 3; i++) {
client.request({ path: '/', method: 'GET' }, (err, data) => {
client.request({ path: '/', method: 'GET', idempotent: false }, (err, data) => {
t.ok(err instanceof Error) // we are expecting an error

@@ -94,3 +101,3 @@ t.strictEqual(null, data)

// this will be queued up
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
client.request({ path: '/', method: 'GET', idempotent: false }, (err, { statusCode, headers, body }) => {
t.error(err)

@@ -123,3 +130,2 @@ t.strictEqual(statusCode, 200)

})
// req.socket.on('end', console.log.bind(console, 'end'))

@@ -143,3 +149,3 @@ req.on('aborted', () => {

const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
t.tearDown(client.destroy.bind(client))

@@ -165,3 +171,3 @@ client.request({

// this will be queued up
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
client.request({ path: '/', method: 'GET', idempotent: false }, (err, { statusCode, headers, body }) => {
t.error(err)

@@ -194,3 +200,2 @@ t.strictEqual(statusCode, 200)

})
// req.socket.on('end', console.log.bind(console, 'end'))

@@ -214,3 +219,3 @@ req.on('aborted', () => {

const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
t.tearDown(client.destroy.bind(client))

@@ -246,1 +251,596 @@ client.request({

})
test('invalid options throws', (t) => {
t.plan(24)
try {
new Client({ port: 'foobar' }) // eslint-disable-line
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid port')
}
try {
new Client(new URL('http://asd:200/somepath')) // eslint-disable-line
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid url')
}
try {
new Client(new URL('http://asd:200?q=asd')) // eslint-disable-line
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid url')
}
try {
new Client(new URL('http://asd:200#asd')) // eslint-disable-line
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid url')
}
try {
new Client(new URL('http://localhost:200'), { // eslint-disable-line
maxAbortedPayload: 'asd'
})
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid maxAbortedPayload')
}
try {
new Client(new URL('http://localhost:200'), { // eslint-disable-line
socketTimeout: 'asd'
})
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid socketTimeout')
}
try {
new Client(new URL('http://localhost:200'), { // eslint-disable-line
requestTimeout: 'asd'
})
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid requestTimeout')
}
try {
new Client({ // eslint-disable-line
protocol: 'asd'
})
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid protocol')
}
try {
new Client({ // eslint-disable-line
hostname: 1
})
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid hostname')
}
try {
new Client(1) // eslint-disable-line
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid url')
}
try {
const client = new Client(new URL('http://localhost:200'))
client.destroy(null, null)
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid callback')
}
try {
const client = new Client(new URL('http://localhost:200'))
client.close(null)
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid callback')
}
})
test('POST which fails should error response', (t) => {
t.plan(4)
const server = createServer()
server.on('request', (req, res) => {
req.once('data', () => {
res.destroy()
})
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
function checkError (err) {
// Different platforms error with different codes...
t.ok(
err.code === 'EPIPE' ||
err.code === 'ECONNRESET' ||
err.message === 'other side closed'
)
}
{
const body = new Readable({ read () {} })
body.push('asd')
body.on('error', (err) => {
checkError(err)
})
client.request({
path: '/',
method: 'POST',
body
}, (err) => {
checkError(err)
})
}
{
const body = new Readable({ read () {} })
body.push('asd')
body.on('error', (err) => {
checkError(err)
})
client.request({
path: '/',
method: 'POST',
headers: {
'content-length': 100
},
body
}, (err) => {
checkError(err)
})
}
})
})
test('client destroy cleanup', (t) => {
t.plan(3)
const _err = new Error('kaboom')
let client
const server = createServer()
server.once('request', (req, res) => {
req.once('data', () => {
client.destroy(_err, (err) => {
t.error(err)
})
})
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
const body = new Readable({ read () {} })
body.push('asd')
body.on('error', (err) => {
t.strictEqual(err, _err)
})
client.request({
path: '/',
method: 'POST',
body
}, (err, data) => {
t.strictEqual(err, _err)
})
})
})
test('GET errors body', (t) => {
t.plan(2)
const server = createServer()
server.once('request', (req, res) => {
res.write('asd')
setTimeout(() => {
res.destroy()
}, 19)
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
body.resume()
body.on('error', err => (
t.ok(err)
))
})
})
})
test('reset parser', (t) => {
t.plan(6)
const server = createServer()
let res2
server.on('request', (req, res) => {
res2 = res
res.write('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET' }, (err, { body }) => {
t.error(err)
res2.destroy()
body.resume()
body.on('error', err => {
t.ok(err)
})
})
client.once('disconnect', () => {
client.request({ path: '/', method: 'GET' }, (err, { body }) => {
t.error(err)
res2.destroy()
body.resume()
body.on('error', err => {
t.ok(err)
})
})
client.on('connect', () => {
t.ok(!client[kSocket][kParser].chunk)
t.ok(!client[kSocket][kParser].offset)
})
})
})
})
test('validate request body', (t) => {
t.plan(6)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
client.request({
path: '/',
method: 'POST',
body: /asdasd/
}, (err, data) => {
t.ok(err instanceof errors.InvalidArgumentError)
})
client.request({
path: '/',
method: 'POST',
body: 0
}, (err, data) => {
t.ok(err instanceof errors.InvalidArgumentError)
})
client.request({
path: '/',
method: 'POST',
body: false
}, (err, data) => {
t.ok(err instanceof errors.InvalidArgumentError)
})
client.request({
path: '/',
method: 'POST',
body: ''
}, (err, data) => {
t.error(err instanceof errors.InvalidArgumentError)
data.body.resume()
})
client.request({
path: '/',
method: 'POST',
body: new Uint8Array()
}, (err, data) => {
t.error(err instanceof errors.InvalidArgumentError)
data.body.resume()
})
client.request({
path: '/',
method: 'POST',
body: Buffer.alloc(10)
}, (err, data) => {
t.error(err instanceof errors.InvalidArgumentError)
data.body.resume()
})
})
})
test('parser error', (t) => {
t.plan(2)
const server = net.createServer()
server.once('connection', (socket) => {
socket.write('asd\n\r213123')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET' }, (err) => {
t.ok(err)
client.close((err) => {
t.error(err)
})
})
})
})
test('socket fail while writing request body', (t) => {
t.plan(2)
const server = createServer()
server.once('request', (req, res) => {
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
const body = new Readable({ read () {} })
body.push('asd')
client.on('connect', () => {
process.nextTick(() => {
client[kSocket].destroy('kaboom')
})
})
client.request({
path: '/',
method: 'POST',
body
}, (err) => {
t.ok(err)
})
client.close((err) => {
t.error(err)
})
})
})
test('socket fail while ending request body', (t) => {
t.plan(3)
const server = createServer()
server.once('request', (req, res) => {
res.end()
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 2
})
t.tearDown(client.destroy.bind(client))
const _err = new Error('kaboom')
client.on('connect', () => {
process.nextTick(() => {
client[kSocket].destroy(_err)
})
})
const body = new Readable({ read () {} })
body.push(null)
client.request({
path: '/',
method: 'POST',
body
}, (err) => {
t.strictEqual(err, _err)
})
client.close((err) => {
t.error(err)
client.close((err) => {
t.ok(err instanceof errors.ClientDestroyedError)
})
})
})
})
test('queued request should not fail on socket destroy', (t) => {
t.plan(2)
const server = createServer()
server.on('request', (req, res) => {
res.end()
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 1
})
t.tearDown(client.destroy.bind(client))
client.request({
path: '/',
method: 'GET'
}, (err, data) => {
t.error(err)
data.body.resume()
client[kSocket].destroy()
client.request({
path: '/',
method: 'GET'
}, (err, data) => {
t.error(err)
data.body.resume()
})
})
})
})
test('queued request should fail on client destroy', (t) => {
t.plan(5)
const server = createServer()
server.on('request', (req, res) => {
res.end()
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 1
})
t.tearDown(client.destroy.bind(client))
let requestErrored = false
client.request({
path: '/',
method: 'GET'
}, (err, data) => {
t.error(err)
data.body.resume()
client.destroy((err) => {
t.error(err)
t.strictEqual(requestErrored, true)
})
})
client.request({
path: '/',
method: 'GET'
}, (err, data) => {
requestErrored = true
t.ok(err)
t.strictEqual(data, null)
})
})
})
test('retry idempotent inflight', (t) => {
t.plan(3)
const server = createServer()
server.on('request', (req, res) => {
res.end()
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 3
})
t.tearDown(client.destroy.bind(client))
client.request({
path: '/',
method: 'POST',
body: new Readable({
read () {
this.destroy(new Error('kaboom'))
}
})
}, (err) => {
t.ok(err)
})
client.request({
path: '/',
method: 'GET'
}, (err, data) => {
t.error(err)
data.body.resume()
})
client.request({
path: '/',
method: 'GET'
}, (err, data) => {
t.error(err)
data.body.resume()
})
})
})
test('invalid opts', (t) => {
t.plan(6)
const client = new Client('http://localhost:5000')
client.request(null, (err) => {
t.ok(err instanceof errors.InvalidArgumentError)
})
client.pipeline(null).on('error', (err) => {
t.ok(err instanceof errors.InvalidArgumentError)
})
client[kEnqueue](null, (err) => {
t.ok(err instanceof errors.InvalidArgumentError)
})
client[kEnqueue]({ path: '/', method: 'GET', signal: 1 }, (err) => {
t.ok(err instanceof errors.InvalidArgumentError)
})
client[kEnqueue]({ path: '/', method: 'GET', signal: {} }, (err) => {
t.ok(err instanceof errors.InvalidArgumentError)
})
try {
client[kEnqueue]({ path: '/', method: 'GET', signal: {} }, null)
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
}
})
test('default port for http and https', (t) => {
t.plan(4)
try {
new Client(new URL('http://localhost:80')) // eslint-disable-line
t.pass('Should not throw')
} catch (err) {
t.fail(err)
}
try {
new Client(new URL('http://localhost')) // eslint-disable-line
t.pass('Should not throw')
} catch (err) {
t.fail(err)
}
try {
new Client(new URL('https://localhost:443')) // eslint-disable-line
t.pass('Should not throw')
} catch (err) {
t.fail(err)
}
try {
new Client(new URL('https://localhost')) // eslint-disable-line
t.pass('Should not throw')
} catch (err) {
t.fail(err)
}
})
'use strict'
const { test } = require('tap')
const { Client } = require('..')
const { Client, errors } = require('..')
const { createServer } = require('http')
const { readFileSync, createReadStream } = require('fs')
const { Readable } = require('stream')

@@ -15,3 +16,3 @@ test('basic get', (t) => {

t.strictEqual('localhost', req.headers.host)
res.setHeader('content-type', 'text/plain')
res.setHeader('Content-Type', 'text/plain')
res.end('hello')

@@ -60,3 +61,7 @@ })

t.strictEqual(headers['content-type'], 'text/plain')
t.strictEqual(body, null)
body
.resume()
.on('end', () => {
t.pass()
})
})

@@ -117,3 +122,7 @@ })

t.strictEqual(headers['content-type'], 'text/plain')
t.strictEqual(body, null)
body
.resume()
.on('end', () => {
t.pass()
})
})

@@ -267,2 +276,3 @@ })

},
requestTimeout: 0,
body: createReadStream(__filename)

@@ -368,3 +378,7 @@ }, (err, { statusCode, headers, body }) => {

t.strictEqual(statusCode, 200)
t.strictEqual(body, null)
body
.resume()
.on('end', () => {
t.pass()
})
})

@@ -375,99 +389,178 @@ }

test('20 times GET with pipelining 10', (t) => {
const num = 20
t.plan(3 * num + 1)
test('Set-Cookie', (t) => {
t.plan(4)
let count = 0
let countGreaterThanOne = false
const server = createServer((req, res) => {
count++
setTimeout(function () {
countGreaterThanOne = countGreaterThanOne || count > 1
res.end(req.url)
}, 10)
res.setHeader('content-type', 'text/plain')
res.setHeader('Set-Cookie', ['a cookie', 'another cookie', 'more cookies'])
res.end('hello')
})
t.tearDown(server.close.bind(server))
// needed to check for a warning on the maxListeners on the socket
process.on('warning', t.fail)
t.tearDown(() => {
process.removeListener('warning', t.fail)
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
t.strictEqual(statusCode, 200)
t.strictDeepEqual(headers['set-cookie'], ['a cookie', 'another cookie', 'more cookies'])
const bufs = []
body.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8'))
})
})
})
})
test('ignore request header mutations', (t) => {
t.plan(2)
const server = createServer((req, res) => {
t.strictEqual(req.headers.test, 'test')
res.end()
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 10
})
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
for (var i = 0; i < num; i++) {
makeRequest(i)
}
const headers = { test: 'test' }
client.request({
path: '/',
method: 'GET',
headers
}, (err, { body }) => {
t.error(err)
body.resume()
})
headers.test = 'asd'
})
})
function makeRequest (i) {
makeRequestAndExpectUrl(client, i, t, () => {
count--
test('url-like url', (t) => {
t.plan(1)
if (i === num - 1) {
t.ok(countGreaterThanOne, 'seen more than one parallel request')
}
})
}
const server = createServer((req, res) => {
res.end()
})
})
t.tearDown(server.close.bind(server))
function makeRequestAndExpectUrl (client, i, t, cb) {
return client.request({ path: '/' + i, method: 'GET' }, (err, { statusCode, headers, body }) => {
cb()
t.error(err)
t.strictEqual(statusCode, 200)
const bufs = []
body.on('data', (buf) => {
bufs.push(buf)
server.listen(0, () => {
const client = new Client({
hostname: 'localhost',
port: server.address().port,
protocol: 'http'
})
body.on('end', () => {
t.strictEqual('/' + i, Buffer.concat(bufs).toString('utf8'))
t.tearDown(client.close.bind(client))
client.request({ path: '/', method: 'GET' }, (err, data) => {
t.error(err)
data.body.resume()
})
})
}
})
test('20 times HEAD with pipelining 10', (t) => {
const num = 20
t.plan(3 * num + 1)
test('multiple destroy callback', (t) => {
t.plan(3)
let count = 0
let countGreaterThanOne = false
const server = createServer((req, res) => {
count++
setTimeout(function () {
countGreaterThanOne = countGreaterThanOne || count > 1
res.end(req.url)
}, 10)
res.end()
})
t.tearDown(server.close.bind(server))
// needed to check for a warning on the maxListeners on the socket
process.on('warning', t.fail)
t.tearDown(() => {
process.removeListener('warning', t.fail)
server.listen(0, () => {
const client = new Client({
hostname: 'localhost',
port: server.address().port,
protocol: 'http'
})
t.tearDown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET' }, (err, data) => {
t.error(err)
data.body.resume()
client.destroy(new Error(), (err) => {
t.error(err)
})
client.destroy(new Error(), (err) => {
t.error(err)
})
})
})
})
test('only one streaming req at a time', (t) => {
t.plan(4)
const server = createServer((req, res) => {
req.pipe(res)
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 10
pipelining: 4
})
t.tearDown(client.close.bind(client))
t.tearDown(client.destroy.bind(client))
for (let i = 0; i < num; i++) {
makeRequest(i)
}
client.request({
path: '/',
method: 'GET'
}, (err, data) => {
t.error(err)
data.body.resume()
function makeRequest (i) {
makeHeadRequestAndExpectUrl(client, i, t, () => {
count--
client.request({
path: '/',
method: 'GET'
}, (err, data) => {
t.error(err)
data.body.resume()
})
if (i === num - 1) {
t.ok(countGreaterThanOne, 'seen more than one parallel request')
}
client.request({
path: '/',
method: 'PUT',
idempotent: true,
body: new Readable({
read () {
t.strictEqual(client.size, 1)
this.push(null)
}
})
}, (err, data) => {
t.error(err)
data.body.resume()
})
})
})
})
test('300 requests succeed', (t) => {
t.plan(300 * 2)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
for (let n = 0; n < 300; ++n) {
client.request({
path: '/',
method: 'GET'
}, (err, data) => {
t.error(err)
data.body.on('data', (chunk) => {
t.strictEqual(chunk.toString(), 'asd')
})
})
}

@@ -477,25 +570,33 @@ })

function makeHeadRequestAndExpectUrl (client, i, t, cb) {
return client.request({ path: '/' + i, method: 'HEAD' }, (err, { statusCode, headers, body }) => {
cb()
t.error(err)
t.strictEqual(statusCode, 200)
t.strictEqual(body, null)
test('request args validation', (t) => {
t.plan(2)
const client = new Client('http://localhost:5000')
client.request(null, (err) => {
t.ok(err instanceof errors.InvalidArgumentError)
})
}
test('A client should enqueue as much as twice its pipelining factor', (t) => {
const num = 10
let sent = 0
t.plan(6 * num + 5)
try {
client.request(null, 'asd')
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
}
})
let count = 0
let countGreaterThanOne = false
test('request args validation promise', (t) => {
t.plan(1)
const client = new Client('http://localhost:5000')
client.request(null).catch((err) => {
t.ok(err instanceof errors.InvalidArgumentError)
})
})
test('increase pipelining', (t) => {
t.plan(4)
const server = createServer((req, res) => {
count++
t.ok(count <= 5)
setTimeout(function () {
countGreaterThanOne = countGreaterThanOne || count > 1
res.end(req.url)
}, 10)
req.resume()
})

@@ -505,38 +606,42 @@ t.tearDown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 2
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
client.request({
path: '/',
method: 'GET'
}, () => {
if (!client.destroyed) {
t.fail()
}
})
t.tearDown(client.close.bind(client))
for (; sent < 2;) {
t.notOk(client.full, 'client is not full')
t.ok(makeRequest(), 'we can send more requests')
}
t.notOk(client.full, 'client is full')
t.notOk(makeRequest(), 'we must stop now')
t.ok(client.full, 'client is full')
client.on('drain', () => {
t.ok(countGreaterThanOne, 'seen more than one parallel request')
const start = sent
for (; sent < start + 3 && sent < num;) {
t.notOk(client.full, 'client is not full')
t.ok(makeRequest())
client.request({
path: '/',
method: 'GET'
}, () => {
if (!client.destroyed) {
t.fail()
}
})
function makeRequest () {
return makeRequestAndExpectUrl(client, sent++, t, () => count--)
}
t.strictEqual(client.running, 0)
client.on('connect', () => {
t.strictEqual(client.running, 0)
process.nextTick(() => {
t.strictEqual(client.running, 1)
client.pipelining = 3
t.strictEqual(client.running, 2)
})
})
})
})
test('Set-Cookie', (t) => {
test('destroy in push', (t) => {
t.plan(4)
let _res
const server = createServer((req, res) => {
res.setHeader('content-type', 'text/plain')
res.setHeader('Set-Cookie', ['a cookie', 'another cookie'])
res.end('hello')
res.write('asd')
_res = res
})

@@ -549,12 +654,23 @@ t.tearDown(server.close.bind(server))

client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
client.request({ path: '/', method: 'GET' }, (err, { body }) => {
t.error(err)
t.strictEqual(statusCode, 200)
t.strictDeepEqual(headers['Set-Cookie'], ['a cookie', 'another cookie'])
const bufs = []
body.on('data', (buf) => {
bufs.push(buf)
body.once('data', () => {
_res.write('asd')
body.on('data', (buf) => {
body.destroy()
_res.end()
}).on('error', (err) => {
t.ok(err)
})
})
body.on('end', () => {
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8'))
})
client.request({ path: '/', method: 'GET' }, (err, { body }) => {
t.error(err)
let buf = ''
body.on('data', (chunk) => {
buf = chunk.toString()
_res.end()
}).on('end', () => {
t.strictEqual('asd', buf)
})

@@ -561,0 +677,0 @@ })

'use strict'
const { test } = require('tap')
const { Client } = require('..')
const { Client, errors } = require('..')
const { createServer } = require('http')
const { kSocket } = require('../lib/symbols')
test('close waits for the in-flight requests to finish', (t) => {
t.plan(10)
test('close waits for queued requests to finish', (t) => {
t.plan(16)

@@ -19,5 +20,4 @@ const server = createServer()

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 1
})
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))

@@ -28,4 +28,4 @@ client.request({ path: '/', method: 'GET' }, function (err, data) {

client.request({ path: '/', method: 'GET' }, onRequest)
client.request({ path: '/', method: 'GET' }, reqClosed)
client.request({ path: '/', method: 'GET' }, reqClosed)
client.request({ path: '/', method: 'GET' }, onRequest)
client.request({ path: '/', method: 'GET' }, onRequest)

@@ -51,6 +51,168 @@ // needed because the next element in the queue will be called

}
})
function reqClosed (err) {
t.equal(err.message, 'The client is closed')
}
test('destroy invoked all pending callbacks', (t) => {
t.plan(4)
const server = createServer()
server.on('request', (req, res) => {
res.write('hello')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 2
})
t.tearDown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET' }, (err, data) => {
t.error(err)
data.body.on('error', (err) => {
t.ok(err)
}).resume()
client.destroy()
})
client.request({ path: '/', method: 'GET' }, (err) => {
t.ok(err instanceof errors.ClientDestroyedError)
})
client.request({ path: '/', method: 'GET' }, (err) => {
t.ok(err instanceof errors.ClientDestroyedError)
})
})
})
test('close waits until socket is destroyed', (t) => {
t.plan(4)
const server = createServer((req, res) => {
res.end(req.url)
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
makeRequest()
client.once('connect', () => {
let done = false
client[kSocket].on('close', () => {
done = true
})
client.close((err) => {
t.error(err)
t.strictEqual(client.closed, true)
t.strictEqual(done, true)
})
})
function makeRequest () {
client.request({ path: '/', method: 'GET' }, (err, data) => {
t.error(err instanceof errors.ClientClosedError)
})
return !client.full
}
})
})
test('close should still reconnect', (t) => {
t.plan(6)
const server = createServer((req, res) => {
res.end(req.url)
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
t.ok(makeRequest())
t.ok(!makeRequest())
client.close((err) => {
t.strictEqual(err, null)
t.strictEqual(client.closed, true)
})
client[kSocket].destroy()
function makeRequest () {
client.request({ path: '/', method: 'GET' }, (err, data) => {
data.body.resume()
t.error(err)
})
return !client.full
}
})
})
test('close should call callback once finished', (t) => {
t.plan(6)
const server = createServer((req, res) => {
setImmediate(function () {
res.end(req.url)
})
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
t.ok(makeRequest())
t.ok(!makeRequest())
client.close((err) => {
t.strictEqual(err, null)
t.strictEqual(client.closed, true)
})
function makeRequest () {
client.request({ path: '/', method: 'GET' }, (err, data) => {
t.error(err)
data.body.resume()
})
return !client.full
}
})
})
test('closed and destroyed errors', (t) => {
t.plan(4)
const client = new Client('http://localhost:4000')
t.tearDown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET' }, (err) => {
t.ok(err)
})
client.close((err) => {
t.error(err)
})
client.request({}, (err) => {
t.ok(err instanceof errors.ClientClosedError)
client.destroy()
client.request({}, (err) => {
t.ok(err instanceof errors.ClientDestroyedError)
})
})
})
test('close after and destroy should error', (t) => {
t.plan(2)
const client = new Client('http://localhost:4000')
t.tearDown(client.destroy.bind(client))
client.destroy()
client.close((err) => {
t.ok(err instanceof errors.ClientDestroyedError)
})
client.close().catch((err) => {
t.ok(err instanceof errors.ClientDestroyedError)
})
})

@@ -41,35 +41,1 @@ 'use strict'

})
test('https get with https opts', (t) => {
t.plan(6)
const server = createServer(pem, (req, res) => {
t.strictEqual('/', req.url)
t.strictEqual('GET', req.method)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`https://localhost:${server.address().port}`, {
https: {
rejectUnauthorized: false
}
})
t.tearDown(client.close.bind(client))
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
t.strictEqual(statusCode, 200)
t.strictEqual(headers['content-type'], 'text/plain')
const bufs = []
body.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8'))
})
})
})
})

@@ -5,10 +5,11 @@ 'use strict'

const { test } = require('tap')
const { Pool } = require('..')
const undici = require('..')
const { Pool, errors } = require('..')
const { createServer } = require('http')
const { EventEmitter } = require('events')
const { promisify } = require('util')
const eos = require('end-of-stream')
const eos = require('stream').finished
test('basic get', (t) => {
t.plan(6)
t.plan(9)

@@ -23,5 +24,5 @@ const server = createServer((req, res) => {

server.listen(0, () => {
const client = new Pool(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
server.listen(0, async () => {
const client = undici(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))

@@ -40,5 +41,40 @@ client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {

})
client.close((err) => {
t.error(err)
client.destroy((err) => {
t.error(err)
client.close((err) => {
t.ok(err instanceof errors.ClientDestroyedError)
})
})
})
})
})
test('basic get error async/await', (t) => {
t.plan(2)
const server = createServer((req, res) => {
res.destroy()
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = undici(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
await client.request({ path: '/', method: 'GET' })
.catch((err) => {
t.ok(err)
})
await client.destroy()
await client.close().catch((err) => {
t.ok(err instanceof errors.ClientDestroyedError)
})
})
})
test('basic get with async/await', async (t) => {

@@ -55,3 +91,3 @@ const server = createServer((req, res) => {

const client = new Pool(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
t.tearDown(client.destroy.bind(client))

@@ -64,8 +100,81 @@ const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' })

await promisify(eos)(body)
await client.close()
await client.destroy()
})
test('stream get async/await', async (t) => {
const server = createServer((req, res) => {
t.strictEqual('/', req.url)
t.strictEqual('GET', req.method)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.tearDown(server.close.bind(server))
await promisify(server.listen.bind(server))(0)
const client = new Pool(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
await client.stream({ path: '/', method: 'GET' }, ({ statusCode, headers }) => {
t.strictEqual(statusCode, 200)
t.strictEqual(headers['content-type'], 'text/plain')
})
})
test('stream get error async/await', (t) => {
t.plan(1)
const server = createServer((req, res) => {
res.destroy()
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = undici(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
await client.stream({ path: '/', method: 'GET' }, () => {
})
.catch((err) => {
t.ok(err)
})
})
})
test('pipeline get', (t) => {
t.plan(5)
const server = createServer((req, res) => {
t.strictEqual('/', req.url)
t.strictEqual('GET', req.method)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = undici(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
const bufs = []
client.pipeline({ path: '/', method: 'GET' }, ({ statusCode, headers, body }) => {
t.strictEqual(statusCode, 200)
t.strictEqual(headers['content-type'], 'text/plain')
return body
})
.end()
.on('data', (buf) => {
bufs.push(buf)
})
.on('end', () => {
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8'))
})
})
})
test('backpressure algorithm', (t) => {
const seen = []
let total = 0
let writeMore = false

@@ -77,7 +186,15 @@ class FakeClient extends EventEmitter {

this.id = total++
this._full = false
}
get full () {
return this._full
}
get connected () {
return true
}
request (req, cb) {
seen.push({ req, cb, client: this })
return writeMore
seen.push({ req, cb, client: this, id: this.id })
}

@@ -94,37 +211,39 @@ }

writeMore = true
pool.request({}, noop)
pool.request({}, noop)
const d1 = seen.shift()
const d2 = seen.shift()
const d1 = seen.shift() // d1 = c0
t.strictEqual(d1.id, 0)
const d2 = seen.shift() // d1 = c0
t.strictEqual(d1.id, 0)
t.strictEqual(d1.client, d2.client)
t.strictEqual(d1.id, d2.id)
writeMore = false
pool.request({}, noop)
pool.request({}, noop) // d3 = c0
writeMore = true
pool.request({}, noop)
d1.client._full = true
pool.request({}, noop) // d4 = c1
const d3 = seen.shift()
t.strictEqual(d3.id, 0)
const d4 = seen.shift()
t.strictEqual(d4.id, 1)
t.strictEqual(d3.client, d2.client)
t.notStrictEqual(d3.client, d4.client)
t.strictEqual(d3.id, d2.id)
t.notStrictEqual(d3.id, d4.id)
d3.client.emit('drain')
pool.request({}, noop) // d5 = c1
writeMore = false
pool.request({}, noop)
d1.client._full = false
writeMore = true
pool.request({}, noop)
pool.request({}, noop) // d6 = c0
const d5 = seen.shift()
t.strictEqual(d5.id, 1)
const d6 = seen.shift()
t.strictEqual(d6.id, 0)
t.strictEqual(d5.client, d4.client)
t.strictEqual(d3.client, d6.client)
t.strictEqual(d5.id, d4.id)
t.strictEqual(d3.id, d6.id)

@@ -135,1 +254,61 @@ t.end()

function noop () {}
test('full', (t) => {
t.plan(8 * 6)
const server = createServer((req, res) => {
t.strictEqual('/', req.url)
t.strictEqual('GET', req.method)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.tearDown(server.close.bind(server))
server.listen(0, async () => {
const client = undici(`http://localhost:${server.address().port}`, {
connections: 2,
pipelining: 2
})
t.tearDown(client.destroy.bind(client))
for (let n = 0; n < 8; ++n) {
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
t.strictEqual(statusCode, 200)
t.strictEqual(headers['content-type'], 'text/plain')
const bufs = []
body.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8'))
})
})
}
})
})
test('invalid options throws', (t) => {
t.plan(6)
try {
new Pool(null, { connections: 0 }) // eslint-disable-line
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid connections')
}
try {
new Pool(null, { connections: -1 }) // eslint-disable-line
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid connections')
}
try {
new Pool(null, { connections: true }) // eslint-disable-line
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid connections')
}
})

@@ -6,3 +6,3 @@ 'use strict'

const { createServer } = require('http')
const { Client } = require('..')
const { Client, errors } = require('..')

@@ -26,3 +26,3 @@ const server = createServer((req, res) => {

client.request({ path: null, method: 'GET' }, (err, res) => {
t.ok(err)
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'path must be a valid path')

@@ -33,3 +33,3 @@ t.strictEqual(res, null)

client.request({ path: 'aaa', method: 'GET' }, (err, res) => {
t.ok(err)
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'path must be a valid path')

@@ -47,3 +47,3 @@ t.strictEqual(res, null)

client.request({ path: '/', method: null }, (err, res) => {
t.ok(err)
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'method must be a valid method')

@@ -54,3 +54,3 @@ t.strictEqual(res, null)

client.request({ path: '/', method: 'WOOW' }, (err, res) => {
t.ok(err)
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'method must be a valid method')

@@ -68,3 +68,3 @@ t.strictEqual(res, null)

client.request({ path: '/', method: 'POST', body: 42 }, (err, res) => {
t.ok(err)
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'body must be a string, a Buffer or a Readable stream')

@@ -75,3 +75,3 @@ t.strictEqual(res, null)

client.request({ path: '/', method: 'POST', body: { hello: 'world' } }, (err, res) => {
t.ok(err)
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'body must be a string, a Buffer or a Readable stream')

@@ -78,0 +78,0 @@ t.strictEqual(res, null)

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc