Comparing version 0.5.0 to 1.0.0
@@ -5,2 +5,3 @@ 'use strict' | ||
const Pool = require('./lib/pool') | ||
const errors = require('./lib/errors') | ||
@@ -13,3 +14,4 @@ function undici (url, opts) { | ||
undici.Client = Client | ||
undici.errors = errors | ||
module.exports = undici |
@@ -1,204 +0,162 @@ | ||
'use strict' | ||
const { | ||
Readable, | ||
Duplex, | ||
PassThrough, | ||
finished | ||
} = require('stream') | ||
const { | ||
InvalidArgumentError, | ||
InvalidReturnValueError, | ||
RequestAbortedError | ||
} = require('./errors') | ||
const { | ||
kEnqueue, | ||
kResume | ||
} = require('./symbols') | ||
const ClientBase = require('./client-base') | ||
const assert = require('assert') | ||
/* eslint no-prototype-builtins: "off" */ | ||
const { URL } = require('url') | ||
const net = require('net') | ||
const tls = require('tls') | ||
const Q = require('fastq') | ||
const { HTTPParser } = require('http-parser-js') | ||
const { Readable } = require('readable-stream') | ||
const eos = require('end-of-stream') | ||
const syncthrough = require('syncthrough') | ||
const retimer = require('retimer') | ||
const { EventEmitter } = require('events') | ||
const Request = require('./request') | ||
const kRead = Symbol('read') | ||
const kReadCb = Symbol('readCallback') | ||
const kIsWaiting = Symbol('isWaiting') | ||
const kQueue = Symbol('queue') | ||
const kCallbacks = Symbol('callbacks') | ||
const kRequests = Symbol('requests') | ||
const kTimer = Symbol('kTimer') | ||
const kTLSOpts = Symbol('TLS Options') | ||
function connect (client) { | ||
var socket = null | ||
var url = client.url | ||
// the defaults port are needed because of the URL spec | ||
if (url.protocol === 'https:') { | ||
socket = tls.connect(url.port || 443, url.hostname, client[kTLSOpts]) | ||
} else { | ||
socket = net.connect(url.port || 80, url.hostname) | ||
} | ||
client.socket = socket | ||
// stop the queue and reset the parsing state | ||
client[kQueue].pause() | ||
client[kIsWaiting] = false | ||
client._needHeaders = 0 | ||
client._lastBody = null | ||
socket.on('connect', () => { | ||
client[kQueue].resume() | ||
}) | ||
socket.on('end', () => { | ||
reconnect(client, new Error('other side closed - ended')) | ||
}) | ||
socket.on('finish', () => { | ||
reconnect(client, new Error('this side closed - finished')) | ||
}) | ||
socket.on('error', reconnect.bind(undefined, client)) | ||
} | ||
function reconnect (client, err) { | ||
if (client.closed) { | ||
// TODO what do we do with the error? | ||
return | ||
} | ||
// reset events | ||
client.socket.removeAllListeners('end') | ||
client.socket.removeAllListeners('finish') | ||
client.socket.removeAllListeners('error') | ||
client.socket.on('error', () => {}) | ||
client.socket = null | ||
// we reset the callbacks | ||
const callbacks = client[kCallbacks] | ||
client[kCallbacks] = [] | ||
client[kRequests] = [] | ||
if (client[kQueue].length() > 0) { | ||
connect(client) | ||
} | ||
for (const cb of callbacks) { | ||
cb(err, null) | ||
} | ||
} | ||
class Client extends EventEmitter { | ||
constructor (url, opts = {}) { | ||
super() | ||
if (!(url instanceof URL)) { | ||
url = new URL(url) | ||
class Client extends ClientBase { | ||
request (opts, callback) { | ||
if (callback === undefined) { | ||
return new Promise((resolve, reject) => { | ||
this.request(opts, (err, data) => { | ||
return err ? reject(err) : resolve(data) | ||
}) | ||
}) | ||
} | ||
this.url = url | ||
// state machine, might need more states | ||
this.closed = false | ||
this.parser = new HTTPParser(HTTPParser.RESPONSE) | ||
this[kTLSOpts] = opts.tls || opts.https | ||
const endRequest = () => { | ||
this.socket.write('\r\n', 'ascii') | ||
this.socket.uncork() | ||
this._needHeaders++ | ||
this[kRead]() | ||
if (typeof callback !== 'function') { | ||
throw new InvalidArgumentError('invalid callback') | ||
} | ||
this.timeout = opts.timeout || 30000 // 30 seconds | ||
this[kCallbacks] = [] | ||
this[kRequests] = [] | ||
const timerCb = () => { | ||
if (this[kCallbacks].length > 0) { | ||
this.socket.destroy(new Error('timeout')) | ||
this[kTimer] = null | ||
} | ||
if (!opts || typeof opts !== 'object') { | ||
process.nextTick(callback, new InvalidArgumentError('invalid opts'), null) | ||
return | ||
} | ||
this[kQueue] = Q((request, cb) => { | ||
if (this.closed) { | ||
return cb(new Error('The client is closed')) | ||
// TODO: Avoid closure due to callback capture. | ||
this[kEnqueue](opts, function (err, data) { | ||
if (err) { | ||
callback(err, null) | ||
return | ||
} | ||
if (this[kTimer]) { | ||
this[kTimer].reschedule(this.timeout) | ||
} else { | ||
this[kTimer] = retimer(timerCb, this.timeout) | ||
} | ||
const { | ||
statusCode, | ||
headers, | ||
opaque, | ||
resume | ||
} = data | ||
var { method, path, body } = request | ||
const headers = request.headers || {} | ||
const reqArr = [ | ||
`${method} ${path} HTTP/1.1\r\nConnection: keep-alive\r\n` | ||
] | ||
const body = new Readable({ | ||
autoDestroy: true, | ||
read: resume, | ||
destroy (err, callback) { | ||
if (!err && !this._readableState.endEmitted) { | ||
err = new RequestAbortedError() | ||
} | ||
if (err) { | ||
resume() | ||
} | ||
callback(err, null) | ||
} | ||
}) | ||
body.destroy = this.wrap(body, body.destroy) | ||
// wrap the callback in a AsyncResource | ||
cb = request.wrap(cb) | ||
callback(null, { | ||
statusCode, | ||
headers, | ||
opaque, | ||
body | ||
}) | ||
this[kRequests].push(request) | ||
this[kCallbacks].push(cb) | ||
this.socket.cork() | ||
return this.wrap(body, function (err, chunk) { | ||
if (this.destroyed) { | ||
return null | ||
} else if (err) { | ||
this.destroy(err) | ||
} else { | ||
const ret = this.push(chunk) | ||
return this.destroyed ? null : ret | ||
} | ||
}) | ||
}) | ||
} | ||
if (!(headers.host || headers.Host)) { | ||
reqArr.push('Host: ' + url.hostname + '\r\n') | ||
} | ||
const headerNames = Object.keys(headers) | ||
for (let i = 0; i < headerNames.length; i++) { | ||
const name = headerNames[i] | ||
reqArr.push(name + ': ' + headers[name] + '\r\n') | ||
} | ||
pipeline (opts, handler) { | ||
if (typeof handler !== 'function') { | ||
return new PassThrough().destroy(new InvalidArgumentError('invalid handler')) | ||
} | ||
for (let i = 0; i < reqArr.length; i++) { | ||
this.socket.write(reqArr[i], 'ascii') | ||
const req = new Readable({ | ||
autoDestroy: true, | ||
read () { | ||
if (this[kResume]) { | ||
const resume = this[kResume] | ||
this[kResume] = null | ||
resume() | ||
} | ||
}, | ||
destroy (err, callback) { | ||
if (err) { | ||
if (this[kResume]) { | ||
const resume = this[kResume] | ||
this[kResume] = null | ||
resume(err) | ||
} else if (!ret.destroyed) { | ||
// Stop ret from scheduling more writes. | ||
ret.destroy(err) | ||
} | ||
} else { | ||
assert(this._readableState.endEmitted) | ||
assert(!this[kResume]) | ||
} | ||
callback(err) | ||
} | ||
}) | ||
let res | ||
let body | ||
if (typeof body === 'string' || body instanceof Uint8Array) { | ||
if (headers.hasOwnProperty('content-length')) { | ||
// we have already written the content-length header | ||
this.socket.write('\r\n') | ||
const ret = new Duplex({ | ||
autoDestroy: true, | ||
read () { | ||
if (body) { | ||
body.resume() | ||
} | ||
}, | ||
write (chunk, encoding, callback) { | ||
assert(!req.destroyed) | ||
if (req.push(chunk, encoding)) { | ||
callback() | ||
} else { | ||
this.socket.write(`content-length: ${Buffer.byteLength(body)}\r\n\r\n`, 'ascii') | ||
req[kResume] = callback | ||
} | ||
this.socket.write(body) | ||
} else if (body && typeof body.pipe === 'function') { | ||
// TODO we should pause the queue while we are piping | ||
if (headers.hasOwnProperty('content-length')) { | ||
this.socket.write('\r\n', 'ascii') | ||
body.pipe(this.socket, { end: false }) | ||
this.socket.uncork() | ||
eos(body, (err) => { | ||
if (err) { | ||
// TODO we might want to wait before previous in-flight | ||
// requests are finished before destroying | ||
this.socket.destroy(err) | ||
// needed because destroy will be delayed | ||
setImmediate(cb, err, null) | ||
return | ||
} | ||
}, | ||
final (callback) { | ||
req.push(null) | ||
callback() | ||
}, | ||
destroy (err, callback) { | ||
if (!err && !this._readableState.endEmitted) { | ||
err = new RequestAbortedError() | ||
} | ||
if (!req.destroyed) { | ||
req.destroy(err) | ||
} | ||
if (res && !res.destroyed) { | ||
res.destroy(err) | ||
} | ||
callback(err) | ||
} | ||
}) | ||
endRequest() | ||
}) | ||
} else { | ||
this.socket.write('transfer-encoding: chunked\r\n', 'ascii') | ||
var through = syncthrough(addTransferEncoding) | ||
body.pipe(through) | ||
through.pipe(this.socket, { end: false }) | ||
this.socket.uncork() | ||
eos(body, (err) => { | ||
if (err) { | ||
// TODO we might want to wait before previous in-flight | ||
// requests are finished before destroying | ||
this.socket.destroy(err) | ||
// needed because destroy will be delayed | ||
setImmediate(cb, err, null) | ||
return | ||
} | ||
// TODO: Avoid copy. | ||
opts = { ...opts, body: req } | ||
this.socket.cork() | ||
this.socket.write('\r\n0\r\n', 'ascii') | ||
endRequest() | ||
}) | ||
this[kEnqueue](opts, function (err, data) { | ||
if (err) { | ||
if (!ret.destroyed) { | ||
ret.destroy(err) | ||
} | ||
@@ -208,91 +166,95 @@ return | ||
endRequest() | ||
}) | ||
const { | ||
statusCode, | ||
headers, | ||
opaque, | ||
resume | ||
} = data | ||
this.pipelining = opts.pipelining || 1 | ||
res = new Readable({ | ||
autoDestroy: true, | ||
read: resume, | ||
destroy (err, callback) { | ||
if (!err && !this._readableState.endEmitted) { | ||
err = new RequestAbortedError() | ||
} | ||
if (err) { | ||
if (!ret.destroyed) { | ||
ret.destroy(err) | ||
} | ||
resume() | ||
} | ||
callback(err, null) | ||
} | ||
}) | ||
res.destroy = this.wrap(res, res.destroy) | ||
this[kQueue].drain = () => { | ||
this.emit('drain') | ||
} | ||
// TODO: Should this somehow be wrapped earlier? | ||
ret.destroy = this.wrap(ret, ret.destroy) | ||
this.parser[HTTPParser.kOnHeaders] = () => {} | ||
this.parser[HTTPParser.kOnHeadersComplete] = ({ statusCode, headers }) => { | ||
// TODO move this[kCallbacks] from being an array. The array allocation | ||
// is showing up in the flamegraph. | ||
const cb = this[kCallbacks].shift() | ||
const request = this[kRequests].shift() | ||
const skipBody = request.method === 'HEAD' | ||
if (!skipBody) { | ||
this._lastBody = new Readable({ read: this[kRead].bind(this) }) | ||
this._lastBody.push = request.wrapSimple(this._lastBody, this._lastBody.push) | ||
try { | ||
body = handler({ | ||
statusCode, | ||
headers, | ||
opaque, | ||
body: res | ||
}) | ||
} catch (err) { | ||
if (!ret.destroyed) { | ||
ret.destroy(err) | ||
} | ||
return | ||
} | ||
cb(null, { | ||
statusCode, | ||
headers: parseHeaders(headers), | ||
body: this._lastBody | ||
}) | ||
if (this.closed && this[kQueue].length() === 0) { | ||
this.destroy() | ||
// TODO: Should we allow !body? | ||
if (!body || typeof body.pipe !== 'function') { | ||
if (!ret.destroyed) { | ||
ret.destroy(new InvalidReturnValueError('expected Readable')) | ||
} | ||
return | ||
} | ||
return skipBody | ||
} | ||
// TODO: If body === res then avoid intermediate | ||
// and write directly to ret.push? Or should this | ||
// happen when body is null? | ||
this.parser[HTTPParser.kOnBody] = (chunk, offset, length) => { | ||
this._lastBody.push(chunk.slice(offset, offset + length)) | ||
} | ||
body | ||
.on('data', function (chunk) { | ||
if (!ret.push(chunk)) { | ||
this.pause() | ||
} | ||
}) | ||
.on('error', function (err) { | ||
if (!ret.destroyed) { | ||
ret.destroy(err) | ||
} | ||
}) | ||
.on('end', function () { | ||
ret.push(null) | ||
}) | ||
.on('close', function () { | ||
if (!this._readableState.endEmitted && !ret.destroyed) { | ||
ret.destroy(new RequestAbortedError()) | ||
} | ||
}) | ||
this.parser[HTTPParser.kOnMessageComplete] = () => { | ||
const body = this._lastBody | ||
this._lastBody = null | ||
if (body !== null) { | ||
body.push(null) | ||
} | ||
} | ||
return this.wrap(res, function (err, chunk) { | ||
if (this.destroyed) { | ||
return null | ||
} else if (err) { | ||
this.destroy(err) | ||
} else { | ||
const ret = this.push(chunk) | ||
return this.destroyed ? null : ret | ||
} | ||
}) | ||
}) | ||
this[kReadCb] = () => { | ||
this[kIsWaiting] = false | ||
this[kRead]() | ||
} | ||
return ret | ||
} | ||
get pipelining () { | ||
return this[kQueue].concurrency | ||
} | ||
set pipelining (v) { | ||
this[kQueue].concurrency = v | ||
} | ||
get full () { | ||
// TODO q.length is slowish, optimize | ||
return this[kQueue].length() > this.pipelining | ||
} | ||
[kRead] () { | ||
var socket = this.socket | ||
if (!socket) { | ||
// TODO this should not happen | ||
return | ||
} | ||
var chunk = null | ||
var hasRead = false | ||
while ((chunk = socket.read()) !== null) { | ||
hasRead = true | ||
this.parser.execute(chunk) | ||
} | ||
if (!this[kIsWaiting] && (!hasRead || this._needHeaders > 0)) { | ||
this[kIsWaiting] = true | ||
socket.once('readable', this[kReadCb]) | ||
} | ||
} | ||
request (opts, cb) { | ||
if (cb === undefined) { | ||
stream (opts, factory, callback) { | ||
if (callback === undefined) { | ||
return new Promise((resolve, reject) => { | ||
this.request(opts, (err, data) => { | ||
this.stream(opts, factory, (err, data) => { | ||
return err ? reject(err) : resolve(data) | ||
@@ -303,74 +265,87 @@ }) | ||
if (this.closed) { | ||
process.nextTick(cb, new Error('The client is closed')) | ||
return false | ||
if (typeof callback !== 'function') { | ||
throw new InvalidArgumentError('invalid callback') | ||
} | ||
if (!this.socket) { | ||
connect(this) | ||
if (!opts || typeof opts !== 'object') { | ||
process.nextTick(callback, new InvalidArgumentError('invalid opts'), null) | ||
return | ||
} | ||
try { | ||
const req = new Request(opts) | ||
this[kQueue].push(req, cb) | ||
} catch (err) { | ||
process.nextTick(cb, err, null) | ||
if (typeof factory !== 'function') { | ||
process.nextTick(callback, new InvalidArgumentError('invalid factory'), null) | ||
return | ||
} | ||
return !this.full | ||
} | ||
// TODO: Avoid closure due to callback capture. | ||
this[kEnqueue](opts, function (err, data) { | ||
if (err) { | ||
callback(err) | ||
return | ||
} | ||
close () { | ||
if (this[kTimer]) { | ||
this[kTimer].clear() | ||
this[kTimer] = null | ||
} | ||
this.closed = true | ||
const { | ||
statusCode, | ||
headers, | ||
opaque, | ||
resume | ||
} = data | ||
// TODO test this | ||
if (this[kQueue].length() === 0 && this.socket) { | ||
this.socket.end() | ||
this.socket = null | ||
} | ||
} | ||
let body | ||
try { | ||
body = factory({ | ||
statusCode, | ||
headers, | ||
opaque | ||
}) | ||
} catch (err) { | ||
callback(err, null) | ||
return | ||
} | ||
destroy (err) { | ||
if (this[kTimer]) { | ||
this[kTimer].clear() | ||
this[kTimer] = null | ||
} | ||
this.closed = true | ||
if (this.socket) { | ||
// TODO make sure we error everything that | ||
// is in flight | ||
this.socket.destroy(err) | ||
this.socket = null | ||
} | ||
} | ||
} | ||
if (!body) { | ||
callback(null, null) | ||
return | ||
} | ||
function parseHeaders (headers) { | ||
const obj = {} | ||
for (var i = 0; i < headers.length; i += 2) { | ||
var key = headers[i] | ||
var val = obj[key] | ||
if (!val) { | ||
obj[key] = headers[i + 1] | ||
} else { | ||
if (!Array.isArray(val)) { | ||
val = [val] | ||
obj[key] = val | ||
if ( | ||
typeof body.write !== 'function' || | ||
typeof body.destroy !== 'function' || | ||
typeof body.destroyed !== 'boolean' | ||
) { | ||
callback(new InvalidReturnValueError('expected Writable'), null) | ||
return | ||
} | ||
val.push(headers[i + 1]) | ||
} | ||
body.on('drain', resume) | ||
// TODO: Avoid finished. It registers an unecessary amount of listeners. | ||
finished(body, { readable: false }, (err) => { | ||
body.removeListener('drain', resume) | ||
if (err) { | ||
if (!body.destroyed) { | ||
body.destroy(err) | ||
} | ||
resume() | ||
} | ||
callback(err, null) | ||
}) | ||
body.destroy = this.wrap(body, body.destroy) | ||
return this.wrap(body, function (err, chunk) { | ||
if (this.destroyed) { | ||
return null | ||
} else if (err) { | ||
this.destroy(err) | ||
} else if (chunk == null) { | ||
this.end() | ||
} else { | ||
const ret = this.write(chunk) | ||
return this.destroyed ? null : ret | ||
} | ||
}) | ||
}) | ||
} | ||
return obj | ||
} | ||
function addTransferEncoding (chunk) { | ||
var toWrite = '\r\n' + Buffer.byteLength(chunk).toString(16) + '\r\n' | ||
this.push(toWrite) | ||
return chunk | ||
} | ||
module.exports = Client |
118
lib/pool.js
'use strict' | ||
const Client = require('./client') | ||
const current = Symbol('current') | ||
const { | ||
InvalidArgumentError | ||
} = require('./errors') | ||
const { | ||
kClients | ||
} = require('./symbols') | ||
class Pool { | ||
constructor (url, opts = {}) { | ||
let { | ||
connections, | ||
constructor (url, { | ||
connections, | ||
maxAbortedPayload, | ||
socketTimeout, | ||
requestTimeout, | ||
pipelining, | ||
tls | ||
} = {}) { | ||
if (connections != null && (!Number.isFinite(connections) || connections <= 0)) { | ||
throw new InvalidArgumentError('invalid connections') | ||
} | ||
this[kClients] = Array.from({ | ||
length: connections || 10 | ||
}, () => new Client(url, { | ||
maxAbortedPayload, | ||
socketTimeout, | ||
requestTimeout, | ||
pipelining, | ||
timeout | ||
} = opts | ||
connections = connections || 10 | ||
pipelining = pipelining || 1 | ||
this.clients = Array.from({ | ||
length: connections | ||
}, x => new Client(url, { | ||
pipelining, | ||
timeout | ||
tls | ||
})) | ||
} | ||
for (const client of this.clients) { | ||
client.on('drain', onDrain) | ||
stream (opts, factory, cb) { | ||
// needed because we need the return value from client.stream | ||
if (cb === undefined) { | ||
return new Promise((resolve, reject) => { | ||
this.stream(opts, factory, (err, data) => { | ||
return err ? reject(err) : resolve(data) | ||
}) | ||
}) | ||
} | ||
this.drained = [] | ||
this[current] = null | ||
getNext(this).stream(opts, factory, cb) | ||
} | ||
const that = this | ||
function onDrain () { | ||
// this is the client | ||
that.drained.push(this) | ||
} | ||
pipeline (opts, handler) { | ||
return getNext(this).pipeline(opts, handler) | ||
} | ||
@@ -46,34 +62,52 @@ | ||
if (this[current] === null) { | ||
if (this.drained.length > 0) { | ||
// LIFO QUEUE | ||
// we use the last one that drained, because that's the one | ||
// that is more probable to have an alive socket | ||
this[current] = this.drained.pop() | ||
} else { | ||
// if no one drained recently, let's just pick one randomly | ||
this[current] = this.clients[Math.floor(Math.random() * this.clients.length)] | ||
} | ||
getNext(this).request(opts, cb) | ||
} | ||
close (cb) { | ||
const promise = Promise.all(this[kClients].map(c => c.close())) | ||
if (cb) { | ||
promise.then(() => cb(null, null), (err) => cb(err, null)) | ||
} else { | ||
return promise | ||
} | ||
} | ||
const writeMore = this[current].request(opts, cb) | ||
destroy (err, cb) { | ||
if (typeof err === 'function') { | ||
cb = err | ||
err = null | ||
} | ||
if (!writeMore) { | ||
this[current] = null | ||
const promise = Promise.all(this[kClients].map(c => c.destroy(err))) | ||
if (cb) { | ||
promise.then(() => cb(null, null)) | ||
} else { | ||
return promise | ||
} | ||
} | ||
} | ||
close () { | ||
for (const client of this.clients) { | ||
client.close() | ||
function getNext (pool) { | ||
let next | ||
for (const client of pool[kClients]) { | ||
if (client.full) { | ||
continue | ||
} | ||
} | ||
destroy () { | ||
for (const client of this.clients) { | ||
client.destroy() | ||
if (!next) { | ||
next = client | ||
} | ||
if (client.connected) { | ||
return client | ||
} | ||
} | ||
if (next) { | ||
return next | ||
} | ||
return pool[kClients][Math.floor(Math.random() * pool[kClients].length)] | ||
} | ||
module.exports = Pool |
'use strict' | ||
const { AsyncResource } = require('async_hooks') | ||
const { | ||
InvalidArgumentError, | ||
RequestAbortedError, | ||
RequestTimeoutError | ||
} = require('./errors') | ||
const EE = require('events') | ||
const assert = require('assert') | ||
@@ -46,7 +53,4 @@ const methods = [ | ||
function isValidBody (body) { | ||
if (!body) { | ||
return true | ||
} | ||
return body instanceof Buffer || | ||
return body == null || | ||
body instanceof Uint8Array || | ||
typeof body === 'string' || | ||
@@ -57,58 +61,143 @@ typeof body.pipe === 'function' | ||
class Request extends AsyncResource { | ||
constructor (opts) { | ||
constructor ({ | ||
path, | ||
method, | ||
body, | ||
headers, | ||
idempotent, | ||
opaque, | ||
signal, | ||
requestTimeout | ||
}, callback) { | ||
super('UNDICI_REQ') | ||
if (!opts) { | ||
throw new Error('no options passed') | ||
assert(typeof callback === 'function') | ||
if (typeof path !== 'string' || path[0] !== '/') { | ||
throw new InvalidArgumentError('path must be a valid path') | ||
} | ||
if (!(typeof opts.path === 'string' && opts.path[0] === '/')) { | ||
throw new Error('path must be a valid path') | ||
if (typeof method !== 'string' || !methods[method]) { | ||
throw new InvalidArgumentError('method must be a valid method') | ||
} | ||
this.method = opts.method | ||
if (!(typeof opts.method === 'string' && methods[opts.method])) { | ||
throw new Error('method must be a valid method') | ||
if (requestTimeout != null && (!Number.isInteger(requestTimeout) || requestTimeout < 0)) { | ||
throw new InvalidArgumentError('requestTimeout must be a positive integer or zero') | ||
} | ||
this.path = opts.path | ||
// TODO we should validate that the http method accepts a body or not | ||
if (!isValidBody(opts.body)) { | ||
throw new Error('body must be a string, a Buffer or a Readable stream') | ||
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { | ||
throw new InvalidArgumentError('signal must implement .on(name, callback)') | ||
} | ||
this.body = opts.body | ||
// should we validate the headers? | ||
this.headers = opts.headers | ||
} | ||
if (!isValidBody(body)) { | ||
throw new InvalidArgumentError('body must be a string, a Buffer or a Readable stream') | ||
} | ||
wrap (cb) { | ||
// happy path for Node 10+ | ||
if (this.runInAsyncScope) { | ||
return this.runInAsyncScope.bind(this, cb, undefined) | ||
this.timeout = null | ||
this.signal = null | ||
this.method = method | ||
this.path = path | ||
this.streaming = body && typeof body.pipe === 'function' | ||
this.body = typeof body === 'string' | ||
? Buffer.from(body) | ||
: body | ||
this.host = headers && Boolean(headers.host || headers.Host) | ||
this.chunked = !headers || headers['content-length'] === undefined | ||
this.callback = callback | ||
this.opaque = opaque | ||
this.idempotent = idempotent == null | ||
? method === 'HEAD' || method === 'GET' | ||
: idempotent | ||
{ | ||
// TODO (perf): Build directy into buffer instead of | ||
// using an intermediate string. | ||
let headersStr = '' | ||
if (headers) { | ||
const headerNames = Object.keys(headers) | ||
for (let i = 0; i < headerNames.length; i++) { | ||
const name = headerNames[i] | ||
headersStr += name + ': ' + headers[name] + '\r\n' | ||
} | ||
} | ||
if (this.body && this.chunked && !this.streaming) { | ||
headersStr += `content-length: ${Buffer.byteLength(this.body)}\r\n` | ||
} | ||
this.headers = headersStr ? Buffer.from(headersStr, 'ascii') : null | ||
} | ||
// old API for Node 8 | ||
return (err, val) => { | ||
this.emitBefore() | ||
cb(err, val) | ||
this.emitAfter() | ||
if (signal) { | ||
/* istanbul ignore else: can't happen but kept in case of refactoring */ | ||
if (!this.signal) { | ||
this.signal = new EE() | ||
} | ||
const onAbort = () => { | ||
this.signal.emit('error', new RequestAbortedError()) | ||
} | ||
if ('addEventListener' in signal) { | ||
signal.addEventListener('abort', onAbort) | ||
} else { | ||
signal.once('abort', onAbort) | ||
} | ||
} | ||
} | ||
wrapSimple (that, cb) { | ||
// happy path for Node 10+ | ||
if (this.runInAsyncScope) { | ||
return this.runInAsyncScope.bind(this, cb, that) | ||
if (requestTimeout) { | ||
if (!this.signal) { | ||
this.signal = new EE() | ||
} | ||
const onTimeout = () => { | ||
this.signal.emit('error', new RequestTimeoutError()) | ||
} | ||
this.timeout = setTimeout(onTimeout, requestTimeout) | ||
} | ||
// old API for Node 8 | ||
return (a) => { | ||
this.emitBefore() | ||
cb.call(that, a) | ||
this.emitAfter() | ||
if (this.signal) { | ||
this.signal.on('error', (err) => { | ||
assert(err) | ||
this.invoke(err, null) | ||
}) | ||
} | ||
} | ||
wrap (that, cb) { | ||
return this.runInAsyncScope.bind(this, cb, that) | ||
} | ||
invoke (err, val) { | ||
const { callback } = this | ||
if (!callback) { | ||
return | ||
} | ||
clearTimeout(this.timeout) | ||
this.timeout = null | ||
this.path = null | ||
this.body = null | ||
this.callback = null | ||
this.opaque = null | ||
this.headers = null | ||
return this.runInAsyncScope(callback, this, err, val) | ||
} | ||
} | ||
module.exports = Request |
{ | ||
"name": "undici", | ||
"version": "0.5.0", | ||
"version": "1.0.0", | ||
"description": "An HTTP/1.1 client, written from scratch for Node.js", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "standard | snazzy && tap test/*.js" | ||
"lint": "standard | snazzy", | ||
"test": "tap test/*.js --no-coverage", | ||
"coverage": "standard | snazzy && tap test/*.js" | ||
}, | ||
@@ -14,2 +16,9 @@ "repository": { | ||
"author": "Matteo Collina <hello@matteocollina.com>", | ||
"contributors": [ | ||
{ | ||
"name": "Robert Nagy", | ||
"url": "https://github.com/ronag", | ||
"author": true | ||
} | ||
], | ||
"license": "MIT", | ||
@@ -21,2 +30,5 @@ "bugs": { | ||
"devDependencies": { | ||
"@sinonjs/fake-timers": "^6.0.1", | ||
"abort-controller": "^3.0.0", | ||
"benchmark": "^2.1.4", | ||
"https-pem": "^2.0.0", | ||
@@ -30,9 +42,7 @@ "pre-commit": "^1.2.2", | ||
"dependencies": { | ||
"end-of-stream": "^1.4.1", | ||
"fastq": "^1.6.0", | ||
"readable-stream": "^3.0.0", | ||
"http-parser-js": "^0.5.2", | ||
"retimer": "^2.0.0", | ||
"syncthrough": "^0.5.0" | ||
} | ||
"http-parser-js": "^0.5.2" | ||
}, | ||
"pre-commit": [ | ||
"coverage" | ||
] | ||
} |
433
README.md
# undici | ||
[![Build | ||
Status](https://travis-ci.com/mcollina/undici.svg?branch=master)](https://travis-ci.com/mcollina/undici) | ||
![Node CI](https://github.com/mcollina/undici/workflows/Node%20CI/badge.svg) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat)](http://standardjs.com/) | ||
@@ -21,6 +20,20 @@ An HTTP/1.1 client, written from scratch for Node.js. | ||
## Benchmarks | ||
Machine: 2.7 GHz Quad-Core Intel Core i7<br/> | ||
Configuration: Node v14.2, HTTP/1.1 without TLS, 100 connections | ||
``` | ||
http - keepalive - pipe x 6,545 ops/sec ±12.47% (64 runs sampled) | ||
undici - pipeline - pipe x 9,560 ops/sec ±3.68% (77 runs sampled) | ||
undici - request - pipe x 9,797 ops/sec ±6.80% (77 runs sampled) | ||
undici - stream - pipe x 11,599 ops/sec ±0.89% (78 runs sampled) | ||
``` | ||
The benchmark is a simple `hello world` [example](benchmarks/index.js). | ||
## API | ||
<a name='client'></a> | ||
### new undici.Client(url, opts) | ||
### `new undici.Client(url, opts)` | ||
@@ -35,12 +48,26 @@ A basic HTTP/1.1 client, mapped on top a single TCP/TLS connection. | ||
- `timeout`, the timeout after which a request will time out, in | ||
milliseconds. Default: | ||
`30000` milliseconds. | ||
- `socketTimeout`, the timeout after which a socket will time out, in | ||
milliseconds. Monitors time between activity on a connected socket. | ||
Use `0` to disable it entirely. Default: `30e3` milliseconds (30s). | ||
- `requestTimeout`, the timeout after which a request will time out, in | ||
milliseconds. Monitors time between request being enqueued and receiving | ||
a response. Use `0` to disable it entirely. | ||
Default: `30e3` milliseconds (30s). | ||
- `maxAbortedPayload`, the maximum number of bytes read after which an | ||
aborted response will close the connection. Closing the connection | ||
will error other inflight requests in the pipeline. | ||
Default: `1e6` bytes (1MiB). | ||
- `pipelining`, the amount of concurrent requests to be sent over the | ||
single TCP/TLS connection according to | ||
[RFC7230](https://tools.ietf.org/html/rfc7230#section-6.3.2). Default: `1`. | ||
[RFC7230](https://tools.ietf.org/html/rfc7230#section-6.3.2). | ||
Default: `1`. | ||
- `tls`, an options object which in the case of `https` will be passed to | ||
[`tls.connect`](https://nodejs.org/api/tls.html#tls_tls_connect_options_callback). | ||
<a name='request'></a> | ||
#### `client.request(opts, cb(err, data))` | ||
#### `client.request(opts, callback(err, data))` | ||
@@ -53,4 +80,13 @@ Performs an HTTP request. | ||
* `method` | ||
* `body`, it can be a `String`, a `Buffer` or a `stream.Readable`. | ||
* `body`, it can be a `String`, a `Buffer`, `Uint8Array` or a `stream.Readable`. | ||
* `headers`, an object with header-value pairs. | ||
* `signal`, either an `AbortController` or an `EventEmitter`. | ||
* `requestTimeout`, the timeout after which a request will time out, in | ||
milliseconds. Monitors time between request being enqueued and receiving | ||
a response. Use `0` to disable it entirely. | ||
Default: `30e3` milliseconds (30s). | ||
* `idempotent`, whether the requests can be safely retried or not. | ||
If `false` the request won't be sent until all preceeding | ||
requests in the pipeline has completed. | ||
Default: `true` if `method` is `HEAD` or `GET`. | ||
@@ -71,9 +107,14 @@ Headers are represented by an object like this: | ||
The `data` parameter in the callback is defined as follow: | ||
The `data` parameter in `callback` is defined as follow: | ||
* `statusCode` | ||
* `headers` | ||
* `body`, a `stream.Readable` with the body to read. A user **must** | ||
call `data.body.resume()` | ||
* `body`, a `stream.Readable` with the body to read. A user **must** | ||
either fully consume or destroy the body unless there is an error, or no further requests | ||
will be processed. | ||
`headers` is an object where all keys have been lowercased. | ||
Returns a promise if no callback is provided. | ||
Example: | ||
@@ -110,57 +151,371 @@ | ||
Promises and async await are supported as well! | ||
Non-idempotent requests will not be pipelined in order | ||
to avoid indirect failures. | ||
Idempotent requests will be automatically retried if | ||
they fail due to indirect failure from the request | ||
at the head of the pipeline. This does not apply to | ||
idempotent requests with a stream request body. | ||
##### Aborting a request | ||
A request can may be aborted using either an `AbortController` or an `EventEmitter`. | ||
To use `AbortController`, you will need to `npm i abort-controller`. | ||
```js | ||
const { statusCode, headers, body } = await client.request({ | ||
const { AbortController } = require('abort-controller') | ||
const { Client } = require('undici') | ||
const client = new Client'http://localhost:3000') | ||
const abortController = new AbortController() | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
method: 'GET', | ||
signal: abortController.signal | ||
}, function (err, data) { | ||
console.log(err) // RequestAbortedError | ||
client.close() | ||
}) | ||
abortController.abort() | ||
``` | ||
#### `client.pipelining` | ||
Property to set the pipelining factor. | ||
Alternatively, any `EventEmitter` that emits an `'abort'` event may be used as an abort controller: | ||
#### `client.full` | ||
```js | ||
const EventEmitter = require('events') | ||
const { Client } = require('undici') | ||
True if the number of requests waiting to be sent is greater | ||
than the pipelining factor. Keeping a client full ensures that once the | ||
inflight set of requests finishes there is a full batch ready to go. | ||
const client = new Client'http://localhost:3000') | ||
const ee = new EventEmitter() | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
signal: ee | ||
}, function (err, data) { | ||
console.log(err) // RequestAbortedError | ||
client.close() | ||
}) | ||
ee.emit('abort') | ||
``` | ||
Destroying the request or response body will have the same effect. | ||
<a name='stream'></a> | ||
#### `client.stream(opts, factory(data), callback(err))` | ||
A faster version of [`request`][request]. | ||
Unlike [`request`][request] this method expects `factory` | ||
to return a [`Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable) which the response will be | ||
written to. This improves performance by avoiding | ||
creating an intermediate [`Readable`](https://nodejs.org/api/stream.html#stream_readable_streams) when the user | ||
expects to directly pipe the response body to a | ||
[`Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable). | ||
Options: | ||
* ... same as [`client.request(opts, callback)`][request]. | ||
* `opaque`, passed as `opaque` to `factory`. Used | ||
to avoid creating a closure. | ||
The `data` parameter in `factory` is defined as follow: | ||
* `statusCode` | ||
* `headers` | ||
* `opaque` | ||
`headers` is an object where all keys have been lowercased. | ||
Returns a promise if no callback is provided. | ||
```js | ||
const { Client } = require('undici') | ||
const client = new Client(`http://localhost:3000`) | ||
const fs = require('fs') | ||
client.stream({ | ||
path: '/', | ||
method: 'GET', | ||
opaque: filename | ||
}, ({ statusCode, headers, opaque: filename }) => { | ||
console.log('response received', statusCode) | ||
console.log('headers', headers) | ||
return fs.createWriteStream(filename) | ||
}, (err) => { | ||
if (err) { | ||
console.error('failure', err) | ||
} else { | ||
console.log('success') | ||
} | ||
}) | ||
``` | ||
`opaque` makes it possible to avoid creating a closure | ||
for the `factory` method: | ||
```js | ||
function (req, res) { | ||
return client.stream({ ...opts, opaque: res }, proxy) | ||
} | ||
``` | ||
Instead of: | ||
```js | ||
function (req, res) { | ||
return client.stream(opts, (data) => { | ||
// Creates closure to capture `res`. | ||
proxy({ ...data, opaque: res }) | ||
} | ||
} | ||
``` | ||
<a name='pipeline'></a> | ||
#### `client.pipeline(opts, handler(data))` | ||
For easy use with [`stream.pipeline`](https://nodejs.org/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback). | ||
Options: | ||
* ... same as [`client.request(opts, callback)`][request]. | ||
* `opaque`, passed as `opaque` to `handler`. Used | ||
to avoid creating a closure. | ||
The `data` parameter in `handler` is defined as follow: | ||
* `statusCode` | ||
* `headers` | ||
* `opaque` | ||
* `body`, a `stream.Readable` with the body to read. A user **must** | ||
either fully consume or destroy the body unless there is an error, or no further requests | ||
will be processed. | ||
`handler` should return a [`Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable) to which the response will be | ||
written to. Usually it should just return the `body` argument unless | ||
some kind of transformation needs to be performed based on e.g. | ||
`headers` or `statusCode`. | ||
`headers` is an object where all keys have been lowercased. | ||
The `handler` should validate the response and save any | ||
required state. If there is an error it should be thrown. | ||
Returns a `Duplex` which writes to the request and reads from | ||
the response. | ||
```js | ||
const { Client } = require('undici') | ||
const client = new Client(`http://localhost:3000`) | ||
const fs = require('fs') | ||
const stream = require('stream') | ||
stream.pipeline( | ||
fs.createReadStream('source.raw'), | ||
client.pipeline({ | ||
path: '/', | ||
method: 'PUT', | ||
}, ({ statusCode, headers, body }) => { | ||
if (statusCode !== 201) { | ||
throw new Error('invalid response') | ||
} | ||
if (isZipped(headers)) { | ||
return pipeline(body, unzip(), () => {}) | ||
} | ||
return body | ||
}), | ||
fs.createReadStream('response.raw'), | ||
(err) => { | ||
if (err) { | ||
console.error('failed') | ||
} else { | ||
console.log('succeeded') | ||
} | ||
} | ||
) | ||
``` | ||
<a name='close'></a> | ||
#### `client.close()` | ||
#### `client.close([callback])` | ||
Close the client as soon as all the enqueued requests are completed | ||
Closes the client and gracefully waits fo enqueued requests to | ||
complete before invoking the callback. | ||
Returns a promise if no callback is provided. | ||
<a name='destroy'></a> | ||
#### `client.destroy(err)` | ||
#### `client.destroy([err][, callback])` | ||
Destroy the client abruptly with the given `err`. All the current and | ||
enqueued requests will error. | ||
Destroy the client abruptly with the given `err`. All the pending and running | ||
requests will be aborted and error. Waits until socket is closed before | ||
invoking the callback. | ||
Returns a promise if no callback is provided. | ||
#### `client.pipelining` | ||
Property to get and set the pipelining factor. | ||
#### `client.pending` | ||
Number of queued requests. | ||
#### `client.running` | ||
Number of inflight requests. | ||
#### `client.size` | ||
Number of pending and running requests. | ||
#### `client.connected` | ||
True if the client has an active connection. The client will lazily | ||
create a connection when it receives a request and will destroy it | ||
if there is no activity for the duration of the `timeout` value. | ||
#### `client.full` | ||
True if `client.size` is greater than the `client.pipelining` factor. | ||
Keeping a client full ensures that once a inflight requests finishes | ||
the the pipeline will schedule new one and keep the pipeline saturated. | ||
#### `client.closed` | ||
True after `client.close()` has been called. | ||
#### `client.destroyed` | ||
True after `client.destroyed()` has been called or `client.close()` has been | ||
called and the client shutdown has completed. | ||
#### Events | ||
* `'drain'`, emitted when the queue is empty. | ||
* `'connect'`, emitted when a socket has been created and | ||
connected. The client will connect once `client.size > 0`. | ||
### undici.Pool | ||
* `'disconnect'`, emitted when socket has disconnected. The | ||
first argument of the event is the error which caused the | ||
socket to disconnect. The client will reconnect if or once | ||
`client.size > 0`. | ||
<a name='pool'></a> | ||
### `new undici.Pool(url, opts)` | ||
A pool of [`Client`][] connected to the same upstream target. | ||
A pool creates a fixed number of [`Client`][] | ||
Options: | ||
* `connections`, the number of clients to create. Default `100`. | ||
* `pipelining`, the pipelining factor. Default `1`. | ||
* `timeout`, the timeout for each request. Default `30000` milliseconds. | ||
* ... same as [`Client`][]. | ||
* `connections`, the number of clients to create. | ||
Default `100`. | ||
#### `pool.request(req, cb)` | ||
#### `pool.request(opts, callback)` | ||
Calls [`client.request(req, cb)`][request] on one of the clients. | ||
Calls [`client.request(opts, callback)`][request] on one of the clients. | ||
#### `pool.close()` | ||
#### `pool.stream(opts, factory, callback)` | ||
Calls [`client.close()`](#close) on all the clients. | ||
Calls [`client.stream(opts, factory, callback)`][stream] on one of the clients. | ||
#### `pool.destroy()` | ||
#### `pool.pipeline(opts, handler)` | ||
Calls [`client.destroy()`](#destroy) on all the clients. | ||
Calls [`client.pipeline(opts, handler)`][pipeline] on one of the clients. | ||
#### `pool.close([callback])` | ||
Calls [`client.close(callback)`](#close) on all the clients. | ||
#### `pool.destroy([err][, callback])` | ||
Calls [`client.destroy(err, callback)`](#destroy) on all the clients. | ||
<a name='errors'></a> | ||
### `undici.errors` | ||
Undici exposes a variety of error objects that you can use to enhance your error handling. | ||
You can find all the error objects inside the `errors` key. | ||
```js | ||
const { errors } = require('undici') | ||
``` | ||
| Error | Error Codes | Description | | ||
| --------------------------|-----------------------------------|------------------------------------------------| | ||
| `InvalidArgumentError` | `UND_ERR_INVALID_ARG` | passed an invalid argument. | | ||
| `InvalidReturnValueError` | `UND_ERR_INVALID_RETURN_VALUE` | returned an invalid value. | | ||
| `SocketTimeoutError` | `UND_ERR_SOCKET_TIMEOUT` | a socket exceeds the `socketTimeout` option. | | ||
| `RequestTimeoutError` | `UND_ERR_REQUEST_TIMEOUT` | a request exceeds the `requestTimeout` option. | | ||
| `RequestAbortedError` | `UND_ERR_ABORTED` | the request has been aborted by the user | | ||
| `ClientDestroyedError` | `UND_ERR_DESTROYED` | trying to use a destroyed client. | | ||
| `ClientClosedError` | `UND_ERR_CLOSED` | trying to use a closed client. | | ||
| `SocketError` | `UND_ERR_SOCKET` | there is an error with the socket. | | ||
| `NotSupportedError` | `UND_ERR_NOT_SUPPORTED` | encountered unsupported functionality. | | ||
## Specification Compliance | ||
This section documents parts of the HTTP/1.1 specification which Undici does | ||
not support or does not fully implement. | ||
### Informational Responses | ||
Undici does not support 1xx informational responses and will either | ||
ignore or error them. | ||
#### Expect | ||
Undici does not support the `Expect` request header field. The request | ||
body is always immediately sent and the `100 Continue` response will be | ||
ignored. | ||
Refs: https://tools.ietf.org/html/rfc7231#section-5.1.1 | ||
#### Upgrade | ||
Undici does not support the the `Upgrade` request header field. A | ||
`101 Switching Protocols` response will cause an `UND_ERR_NOT_SUPPORTED` error. | ||
Refs: https://tools.ietf.org/html/rfc7230#section-6.7 | ||
#### Hints | ||
Undici does not support early hints. A `103 Early Hint` response will | ||
be ignored. | ||
Refs: https://tools.ietf.org/html/rfc8297 | ||
### Trailer | ||
Undici does not support the the `Trailer` response header field. Any response | ||
trailer headers will be ignored. | ||
Refs: https://tools.ietf.org/html/rfc7230#section-4.4 | ||
### Pipelining | ||
Uncidi will only use pipelining if configured with a `pipelining` factor | ||
greater than `1`. | ||
Undici always assumes that connections are persistent and will immediatly | ||
pipeline requests, without checking whether the connection is persistent. | ||
Hence, automatic fallback to HTTP/1.0 or HTTP/1.1 without pipelining is | ||
not supported. | ||
Undici will immediately pipeline when retrying requests afters a failed | ||
connection. However, Undici will not retry the first remaining requests in | ||
the prior pipeline and instead error the corresponding callback/promise/stream. | ||
Refs: https://tools.ietf.org/html/rfc2616#section-8.1.2.2<br/> | ||
Refs: https://tools.ietf.org/html/rfc7230#section-6.3.2 | ||
## Collaborators | ||
* [__Robert Nagy__](https://github.com/ronag), <https://www.npmjs.com/~ronag> | ||
## License | ||
@@ -172,1 +527,3 @@ | ||
[request]: #request | ||
[stream]: #stream | ||
[pipeline]: #pipeline |
'use strict' | ||
const t = require('tap') | ||
const { test } = require('tap') | ||
const { Client } = require('..') | ||
@@ -8,2 +8,3 @@ const { createServer } = require('http') | ||
const { readFile } = require('fs') | ||
const { PassThrough } = require('stream') | ||
@@ -35,67 +36,191 @@ const transactions = new Map() | ||
t.plan(16) | ||
test('async hooks', (t) => { | ||
t.plan(31) | ||
const server = createServer((req, res) => { | ||
res.setHeader('content-type', 'text/plain') | ||
readFile(__filename, (err, buf) => { | ||
t.error(err) | ||
const buf1 = buf.slice(0, buf.length / 2) | ||
const buf2 = buf.slice(buf.length / 2) | ||
// we split the file so that it's received in 2 chunks | ||
// and it should restore the state on the second | ||
res.write(buf1) | ||
setTimeout(() => { | ||
res.end(buf2) | ||
}, 10) | ||
const server = createServer((req, res) => { | ||
res.setHeader('content-type', 'text/plain') | ||
readFile(__filename, (err, buf) => { | ||
t.error(err) | ||
const buf1 = buf.slice(0, buf.length / 2) | ||
const buf2 = buf.slice(buf.length / 2) | ||
// we split the file so that it's received in 2 chunks | ||
// and it should restore the state on the second | ||
res.write(buf1) | ||
setTimeout(() => { | ||
res.end(buf2) | ||
}, 10) | ||
}) | ||
}) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
body.resume() | ||
t.strictDeepEqual(getCurrentTransaction(), null) | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
body.resume() | ||
t.strictDeepEqual(getCurrentTransaction(), null) | ||
setCurrentTransaction({ hello: 'world2' }) | ||
setCurrentTransaction({ hello: 'world2' }) | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
body.once('data', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
body.resume() | ||
}) | ||
body.on('end', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
}) | ||
}) | ||
}) | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
body.resume() | ||
t.strictDeepEqual(getCurrentTransaction(), null) | ||
body.once('data', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
body.resume() | ||
setCurrentTransaction({ hello: 'world' }) | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
body.once('data', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
body.resume() | ||
}) | ||
body.on('end', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
}) | ||
}) | ||
}) | ||
body.on('end', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
client.request({ path: '/', method: 'HEAD' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
body.resume() | ||
t.strictDeepEqual(getCurrentTransaction(), null) | ||
setCurrentTransaction({ hello: 'world' }) | ||
client.request({ path: '/', method: 'HEAD' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
body.once('data', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
body.resume() | ||
}) | ||
body.on('end', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
}) | ||
}) | ||
}) | ||
client.stream({ path: '/', method: 'GET' }, () => { | ||
t.strictDeepEqual(getCurrentTransaction(), null) | ||
return new PassThrough().resume() | ||
}, (err) => { | ||
t.error(err) | ||
t.strictDeepEqual(getCurrentTransaction(), null) | ||
setCurrentTransaction({ hello: 'world' }) | ||
client.stream({ path: '/', method: 'GET' }, () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
return new PassThrough().resume() | ||
}, (err) => { | ||
t.error(err) | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
}) | ||
}) | ||
}) | ||
}) | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
body.resume() | ||
t.strictDeepEqual(getCurrentTransaction(), null) | ||
test('async hooks client is destroyed', (t) => { | ||
t.plan(7) | ||
setCurrentTransaction({ hello: 'world' }) | ||
const server = createServer((req, res) => { | ||
res.setHeader('content-type', 'text/plain') | ||
readFile(__filename, (err, buf) => { | ||
t.error(err) | ||
const buf1 = buf.slice(0, buf.length / 2) | ||
const buf2 = buf.slice(buf.length / 2) | ||
// we split the file so that it's received in 2 chunks | ||
// and it should restore the state on the second | ||
res.write(buf1) | ||
setTimeout(() => { | ||
res.end(buf2) | ||
}, 10) | ||
}) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, { body }) => { | ||
t.error(err) | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
body.resume() | ||
body.on('error', (err) => { | ||
t.ok(err) | ||
}) | ||
t.strictDeepEqual(getCurrentTransaction(), null) | ||
body.once('data', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
body.resume() | ||
setCurrentTransaction({ hello: 'world2' }) | ||
client.request({ path: '/', method: 'GET' }, (err) => { | ||
t.strictEqual(err.message, 'The client is destroyed') | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
}) | ||
client.destroy((err) => { | ||
t.error(err) | ||
}) | ||
}) | ||
}) | ||
}) | ||
body.on('end', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' }) | ||
test('async hooks error and close', (t) => { | ||
t.plan(6) | ||
const server = createServer((req, res) => { | ||
res.write('asd') | ||
setImmediate(() => { | ||
res.destroy() | ||
}) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, { body }) => { | ||
t.error(err) | ||
body.resume() | ||
body.on('error', (err) => { | ||
t.ok(err) | ||
}) | ||
t.strictDeepEqual(getCurrentTransaction(), null) | ||
setCurrentTransaction({ hello: 'world2' }) | ||
client.request({ path: '/', method: 'GET' }, (err, data) => { | ||
t.error(err) | ||
data.body.on('error', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
}) | ||
data.body.on('close', () => { | ||
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' }) | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) |
'use strict' | ||
const { test } = require('tap') | ||
const { Client } = require('..') | ||
const { Client, errors } = require('..') | ||
const { createServer } = require('http') | ||
const { Readable } = require('readable-stream') | ||
const net = require('net') | ||
const { Readable } = require('stream') | ||
const { | ||
kParser, | ||
kSocket, | ||
kEnqueue | ||
} = require('../lib/symbols') | ||
test('GET errors and reconnect with pipelining 1', (t) => { | ||
@@ -30,5 +37,5 @@ t.plan(9) | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, data) => { | ||
client.request({ path: '/', method: 'GET', idempotent: false }, (err, data) => { | ||
t.ok(err instanceof Error) // we are expecting an error | ||
@@ -82,7 +89,7 @@ t.strictEqual(null, data) | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
t.tearDown(client.destroy.bind(client)) | ||
// all of these will error | ||
for (let i = 0; i < 3; i++) { | ||
client.request({ path: '/', method: 'GET' }, (err, data) => { | ||
client.request({ path: '/', method: 'GET', idempotent: false }, (err, data) => { | ||
t.ok(err instanceof Error) // we are expecting an error | ||
@@ -94,3 +101,3 @@ t.strictEqual(null, data) | ||
// this will be queued up | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
client.request({ path: '/', method: 'GET', idempotent: false }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
@@ -123,3 +130,2 @@ t.strictEqual(statusCode, 200) | ||
}) | ||
// req.socket.on('end', console.log.bind(console, 'end')) | ||
@@ -143,3 +149,3 @@ req.on('aborted', () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
t.tearDown(client.destroy.bind(client)) | ||
@@ -165,3 +171,3 @@ client.request({ | ||
// this will be queued up | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
client.request({ path: '/', method: 'GET', idempotent: false }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
@@ -194,3 +200,2 @@ t.strictEqual(statusCode, 200) | ||
}) | ||
// req.socket.on('end', console.log.bind(console, 'end')) | ||
@@ -214,3 +219,3 @@ req.on('aborted', () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
t.tearDown(client.destroy.bind(client)) | ||
@@ -246,1 +251,596 @@ client.request({ | ||
}) | ||
test('invalid options throws', (t) => { | ||
t.plan(24) | ||
try { | ||
new Client({ port: 'foobar' }) // eslint-disable-line | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid port') | ||
} | ||
try { | ||
new Client(new URL('http://asd:200/somepath')) // eslint-disable-line | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid url') | ||
} | ||
try { | ||
new Client(new URL('http://asd:200?q=asd')) // eslint-disable-line | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid url') | ||
} | ||
try { | ||
new Client(new URL('http://asd:200#asd')) // eslint-disable-line | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid url') | ||
} | ||
try { | ||
new Client(new URL('http://localhost:200'), { // eslint-disable-line | ||
maxAbortedPayload: 'asd' | ||
}) | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid maxAbortedPayload') | ||
} | ||
try { | ||
new Client(new URL('http://localhost:200'), { // eslint-disable-line | ||
socketTimeout: 'asd' | ||
}) | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid socketTimeout') | ||
} | ||
try { | ||
new Client(new URL('http://localhost:200'), { // eslint-disable-line | ||
requestTimeout: 'asd' | ||
}) | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid requestTimeout') | ||
} | ||
try { | ||
new Client({ // eslint-disable-line | ||
protocol: 'asd' | ||
}) | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid protocol') | ||
} | ||
try { | ||
new Client({ // eslint-disable-line | ||
hostname: 1 | ||
}) | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid hostname') | ||
} | ||
try { | ||
new Client(1) // eslint-disable-line | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid url') | ||
} | ||
try { | ||
const client = new Client(new URL('http://localhost:200')) | ||
client.destroy(null, null) | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid callback') | ||
} | ||
try { | ||
const client = new Client(new URL('http://localhost:200')) | ||
client.close(null) | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid callback') | ||
} | ||
}) | ||
test('POST which fails should error response', (t) => { | ||
t.plan(4) | ||
const server = createServer() | ||
server.on('request', (req, res) => { | ||
req.once('data', () => { | ||
res.destroy() | ||
}) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
function checkError (err) { | ||
// Different platforms error with different codes... | ||
t.ok( | ||
err.code === 'EPIPE' || | ||
err.code === 'ECONNRESET' || | ||
err.message === 'other side closed' | ||
) | ||
} | ||
{ | ||
const body = new Readable({ read () {} }) | ||
body.push('asd') | ||
body.on('error', (err) => { | ||
checkError(err) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'POST', | ||
body | ||
}, (err) => { | ||
checkError(err) | ||
}) | ||
} | ||
{ | ||
const body = new Readable({ read () {} }) | ||
body.push('asd') | ||
body.on('error', (err) => { | ||
checkError(err) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'POST', | ||
headers: { | ||
'content-length': 100 | ||
}, | ||
body | ||
}, (err) => { | ||
checkError(err) | ||
}) | ||
} | ||
}) | ||
}) | ||
test('client destroy cleanup', (t) => { | ||
t.plan(3) | ||
const _err = new Error('kaboom') | ||
let client | ||
const server = createServer() | ||
server.once('request', (req, res) => { | ||
req.once('data', () => { | ||
client.destroy(_err, (err) => { | ||
t.error(err) | ||
}) | ||
}) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
const body = new Readable({ read () {} }) | ||
body.push('asd') | ||
body.on('error', (err) => { | ||
t.strictEqual(err, _err) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'POST', | ||
body | ||
}, (err, data) => { | ||
t.strictEqual(err, _err) | ||
}) | ||
}) | ||
}) | ||
test('GET errors body', (t) => { | ||
t.plan(2) | ||
const server = createServer() | ||
server.once('request', (req, res) => { | ||
res.write('asd') | ||
setTimeout(() => { | ||
res.destroy() | ||
}, 19) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
body.resume() | ||
body.on('error', err => ( | ||
t.ok(err) | ||
)) | ||
}) | ||
}) | ||
}) | ||
test('reset parser', (t) => { | ||
t.plan(6) | ||
const server = createServer() | ||
let res2 | ||
server.on('request', (req, res) => { | ||
res2 = res | ||
res.write('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, { body }) => { | ||
t.error(err) | ||
res2.destroy() | ||
body.resume() | ||
body.on('error', err => { | ||
t.ok(err) | ||
}) | ||
}) | ||
client.once('disconnect', () => { | ||
client.request({ path: '/', method: 'GET' }, (err, { body }) => { | ||
t.error(err) | ||
res2.destroy() | ||
body.resume() | ||
body.on('error', err => { | ||
t.ok(err) | ||
}) | ||
}) | ||
client.on('connect', () => { | ||
t.ok(!client[kSocket][kParser].chunk) | ||
t.ok(!client[kSocket][kParser].offset) | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('validate request body', (t) => { | ||
t.plan(6) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
client.request({ | ||
path: '/', | ||
method: 'POST', | ||
body: /asdasd/ | ||
}, (err, data) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'POST', | ||
body: 0 | ||
}, (err, data) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'POST', | ||
body: false | ||
}, (err, data) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'POST', | ||
body: '' | ||
}, (err, data) => { | ||
t.error(err instanceof errors.InvalidArgumentError) | ||
data.body.resume() | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'POST', | ||
body: new Uint8Array() | ||
}, (err, data) => { | ||
t.error(err instanceof errors.InvalidArgumentError) | ||
data.body.resume() | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'POST', | ||
body: Buffer.alloc(10) | ||
}, (err, data) => { | ||
t.error(err instanceof errors.InvalidArgumentError) | ||
data.body.resume() | ||
}) | ||
}) | ||
}) | ||
test('parser error', (t) => { | ||
t.plan(2) | ||
const server = net.createServer() | ||
server.once('connection', (socket) => { | ||
socket.write('asd\n\r213123') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err) => { | ||
t.ok(err) | ||
client.close((err) => { | ||
t.error(err) | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('socket fail while writing request body', (t) => { | ||
t.plan(2) | ||
const server = createServer() | ||
server.once('request', (req, res) => { | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
const body = new Readable({ read () {} }) | ||
body.push('asd') | ||
client.on('connect', () => { | ||
process.nextTick(() => { | ||
client[kSocket].destroy('kaboom') | ||
}) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'POST', | ||
body | ||
}, (err) => { | ||
t.ok(err) | ||
}) | ||
client.close((err) => { | ||
t.error(err) | ||
}) | ||
}) | ||
}) | ||
test('socket fail while ending request body', (t) => { | ||
t.plan(3) | ||
const server = createServer() | ||
server.once('request', (req, res) => { | ||
res.end() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 2 | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
const _err = new Error('kaboom') | ||
client.on('connect', () => { | ||
process.nextTick(() => { | ||
client[kSocket].destroy(_err) | ||
}) | ||
}) | ||
const body = new Readable({ read () {} }) | ||
body.push(null) | ||
client.request({ | ||
path: '/', | ||
method: 'POST', | ||
body | ||
}, (err) => { | ||
t.strictEqual(err, _err) | ||
}) | ||
client.close((err) => { | ||
t.error(err) | ||
client.close((err) => { | ||
t.ok(err instanceof errors.ClientDestroyedError) | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('queued request should not fail on socket destroy', (t) => { | ||
t.plan(2) | ||
const server = createServer() | ||
server.on('request', (req, res) => { | ||
res.end() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 1 | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body.resume() | ||
client[kSocket].destroy() | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body.resume() | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('queued request should fail on client destroy', (t) => { | ||
t.plan(5) | ||
const server = createServer() | ||
server.on('request', (req, res) => { | ||
res.end() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 1 | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
let requestErrored = false | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body.resume() | ||
client.destroy((err) => { | ||
t.error(err) | ||
t.strictEqual(requestErrored, true) | ||
}) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, data) => { | ||
requestErrored = true | ||
t.ok(err) | ||
t.strictEqual(data, null) | ||
}) | ||
}) | ||
}) | ||
test('retry idempotent inflight', (t) => { | ||
t.plan(3) | ||
const server = createServer() | ||
server.on('request', (req, res) => { | ||
res.end() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 3 | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ | ||
path: '/', | ||
method: 'POST', | ||
body: new Readable({ | ||
read () { | ||
this.destroy(new Error('kaboom')) | ||
} | ||
}) | ||
}, (err) => { | ||
t.ok(err) | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body.resume() | ||
}) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body.resume() | ||
}) | ||
}) | ||
}) | ||
test('invalid opts', (t) => { | ||
t.plan(6) | ||
const client = new Client('http://localhost:5000') | ||
client.request(null, (err) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
client.pipeline(null).on('error', (err) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
client[kEnqueue](null, (err) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
client[kEnqueue]({ path: '/', method: 'GET', signal: 1 }, (err) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
client[kEnqueue]({ path: '/', method: 'GET', signal: {} }, (err) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
try { | ||
client[kEnqueue]({ path: '/', method: 'GET', signal: {} }, null) | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
} | ||
}) | ||
test('default port for http and https', (t) => { | ||
t.plan(4) | ||
try { | ||
new Client(new URL('http://localhost:80')) // eslint-disable-line | ||
t.pass('Should not throw') | ||
} catch (err) { | ||
t.fail(err) | ||
} | ||
try { | ||
new Client(new URL('http://localhost')) // eslint-disable-line | ||
t.pass('Should not throw') | ||
} catch (err) { | ||
t.fail(err) | ||
} | ||
try { | ||
new Client(new URL('https://localhost:443')) // eslint-disable-line | ||
t.pass('Should not throw') | ||
} catch (err) { | ||
t.fail(err) | ||
} | ||
try { | ||
new Client(new URL('https://localhost')) // eslint-disable-line | ||
t.pass('Should not throw') | ||
} catch (err) { | ||
t.fail(err) | ||
} | ||
}) |
'use strict' | ||
const { test } = require('tap') | ||
const { Client } = require('..') | ||
const { Client, errors } = require('..') | ||
const { createServer } = require('http') | ||
const { readFileSync, createReadStream } = require('fs') | ||
const { Readable } = require('stream') | ||
@@ -15,3 +16,3 @@ test('basic get', (t) => { | ||
t.strictEqual('localhost', req.headers.host) | ||
res.setHeader('content-type', 'text/plain') | ||
res.setHeader('Content-Type', 'text/plain') | ||
res.end('hello') | ||
@@ -60,3 +61,7 @@ }) | ||
t.strictEqual(headers['content-type'], 'text/plain') | ||
t.strictEqual(body, null) | ||
body | ||
.resume() | ||
.on('end', () => { | ||
t.pass() | ||
}) | ||
}) | ||
@@ -117,3 +122,7 @@ }) | ||
t.strictEqual(headers['content-type'], 'text/plain') | ||
t.strictEqual(body, null) | ||
body | ||
.resume() | ||
.on('end', () => { | ||
t.pass() | ||
}) | ||
}) | ||
@@ -267,2 +276,3 @@ }) | ||
}, | ||
requestTimeout: 0, | ||
body: createReadStream(__filename) | ||
@@ -368,3 +378,7 @@ }, (err, { statusCode, headers, body }) => { | ||
t.strictEqual(statusCode, 200) | ||
t.strictEqual(body, null) | ||
body | ||
.resume() | ||
.on('end', () => { | ||
t.pass() | ||
}) | ||
}) | ||
@@ -375,99 +389,178 @@ } | ||
test('20 times GET with pipelining 10', (t) => { | ||
const num = 20 | ||
t.plan(3 * num + 1) | ||
test('Set-Cookie', (t) => { | ||
t.plan(4) | ||
let count = 0 | ||
let countGreaterThanOne = false | ||
const server = createServer((req, res) => { | ||
count++ | ||
setTimeout(function () { | ||
countGreaterThanOne = countGreaterThanOne || count > 1 | ||
res.end(req.url) | ||
}, 10) | ||
res.setHeader('content-type', 'text/plain') | ||
res.setHeader('Set-Cookie', ['a cookie', 'another cookie', 'more cookies']) | ||
res.end('hello') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
// needed to check for a warning on the maxListeners on the socket | ||
process.on('warning', t.fail) | ||
t.tearDown(() => { | ||
process.removeListener('warning', t.fail) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
t.strictEqual(statusCode, 200) | ||
t.strictDeepEqual(headers['set-cookie'], ['a cookie', 'another cookie', 'more cookies']) | ||
const bufs = [] | ||
body.on('data', (buf) => { | ||
bufs.push(buf) | ||
}) | ||
body.on('end', () => { | ||
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('ignore request header mutations', (t) => { | ||
t.plan(2) | ||
const server = createServer((req, res) => { | ||
t.strictEqual(req.headers.test, 'test') | ||
res.end() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 10 | ||
}) | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
for (var i = 0; i < num; i++) { | ||
makeRequest(i) | ||
} | ||
const headers = { test: 'test' } | ||
client.request({ | ||
path: '/', | ||
method: 'GET', | ||
headers | ||
}, (err, { body }) => { | ||
t.error(err) | ||
body.resume() | ||
}) | ||
headers.test = 'asd' | ||
}) | ||
}) | ||
function makeRequest (i) { | ||
makeRequestAndExpectUrl(client, i, t, () => { | ||
count-- | ||
test('url-like url', (t) => { | ||
t.plan(1) | ||
if (i === num - 1) { | ||
t.ok(countGreaterThanOne, 'seen more than one parallel request') | ||
} | ||
}) | ||
} | ||
const server = createServer((req, res) => { | ||
res.end() | ||
}) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
function makeRequestAndExpectUrl (client, i, t, cb) { | ||
return client.request({ path: '/' + i, method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
cb() | ||
t.error(err) | ||
t.strictEqual(statusCode, 200) | ||
const bufs = [] | ||
body.on('data', (buf) => { | ||
bufs.push(buf) | ||
server.listen(0, () => { | ||
const client = new Client({ | ||
hostname: 'localhost', | ||
port: server.address().port, | ||
protocol: 'http' | ||
}) | ||
body.on('end', () => { | ||
t.strictEqual('/' + i, Buffer.concat(bufs).toString('utf8')) | ||
t.tearDown(client.close.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, data) => { | ||
t.error(err) | ||
data.body.resume() | ||
}) | ||
}) | ||
} | ||
}) | ||
test('20 times HEAD with pipelining 10', (t) => { | ||
const num = 20 | ||
t.plan(3 * num + 1) | ||
test('multiple destroy callback', (t) => { | ||
t.plan(3) | ||
let count = 0 | ||
let countGreaterThanOne = false | ||
const server = createServer((req, res) => { | ||
count++ | ||
setTimeout(function () { | ||
countGreaterThanOne = countGreaterThanOne || count > 1 | ||
res.end(req.url) | ||
}, 10) | ||
res.end() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
// needed to check for a warning on the maxListeners on the socket | ||
process.on('warning', t.fail) | ||
t.tearDown(() => { | ||
process.removeListener('warning', t.fail) | ||
server.listen(0, () => { | ||
const client = new Client({ | ||
hostname: 'localhost', | ||
port: server.address().port, | ||
protocol: 'http' | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, data) => { | ||
t.error(err) | ||
data.body.resume() | ||
client.destroy(new Error(), (err) => { | ||
t.error(err) | ||
}) | ||
client.destroy(new Error(), (err) => { | ||
t.error(err) | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('only one streaming req at a time', (t) => { | ||
t.plan(4) | ||
const server = createServer((req, res) => { | ||
req.pipe(res) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 10 | ||
pipelining: 4 | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
t.tearDown(client.destroy.bind(client)) | ||
for (let i = 0; i < num; i++) { | ||
makeRequest(i) | ||
} | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body.resume() | ||
function makeRequest (i) { | ||
makeHeadRequestAndExpectUrl(client, i, t, () => { | ||
count-- | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body.resume() | ||
}) | ||
if (i === num - 1) { | ||
t.ok(countGreaterThanOne, 'seen more than one parallel request') | ||
} | ||
client.request({ | ||
path: '/', | ||
method: 'PUT', | ||
idempotent: true, | ||
body: new Readable({ | ||
read () { | ||
t.strictEqual(client.size, 1) | ||
this.push(null) | ||
} | ||
}) | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body.resume() | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('300 requests succeed', (t) => { | ||
t.plan(300 * 2) | ||
const server = createServer((req, res) => { | ||
res.end('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
for (let n = 0; n < 300; ++n) { | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, (err, data) => { | ||
t.error(err) | ||
data.body.on('data', (chunk) => { | ||
t.strictEqual(chunk.toString(), 'asd') | ||
}) | ||
}) | ||
} | ||
@@ -477,25 +570,33 @@ }) | ||
function makeHeadRequestAndExpectUrl (client, i, t, cb) { | ||
return client.request({ path: '/' + i, method: 'HEAD' }, (err, { statusCode, headers, body }) => { | ||
cb() | ||
t.error(err) | ||
t.strictEqual(statusCode, 200) | ||
t.strictEqual(body, null) | ||
test('request args validation', (t) => { | ||
t.plan(2) | ||
const client = new Client('http://localhost:5000') | ||
client.request(null, (err) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
} | ||
test('A client should enqueue as much as twice its pipelining factor', (t) => { | ||
const num = 10 | ||
let sent = 0 | ||
t.plan(6 * num + 5) | ||
try { | ||
client.request(null, 'asd') | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
} | ||
}) | ||
let count = 0 | ||
let countGreaterThanOne = false | ||
test('request args validation promise', (t) => { | ||
t.plan(1) | ||
const client = new Client('http://localhost:5000') | ||
client.request(null).catch((err) => { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
}) | ||
}) | ||
test('increase pipelining', (t) => { | ||
t.plan(4) | ||
const server = createServer((req, res) => { | ||
count++ | ||
t.ok(count <= 5) | ||
setTimeout(function () { | ||
countGreaterThanOne = countGreaterThanOne || count > 1 | ||
res.end(req.url) | ||
}, 10) | ||
req.resume() | ||
}) | ||
@@ -505,38 +606,42 @@ t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 2 | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, () => { | ||
if (!client.destroyed) { | ||
t.fail() | ||
} | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
for (; sent < 2;) { | ||
t.notOk(client.full, 'client is not full') | ||
t.ok(makeRequest(), 'we can send more requests') | ||
} | ||
t.notOk(client.full, 'client is full') | ||
t.notOk(makeRequest(), 'we must stop now') | ||
t.ok(client.full, 'client is full') | ||
client.on('drain', () => { | ||
t.ok(countGreaterThanOne, 'seen more than one parallel request') | ||
const start = sent | ||
for (; sent < start + 3 && sent < num;) { | ||
t.notOk(client.full, 'client is not full') | ||
t.ok(makeRequest()) | ||
client.request({ | ||
path: '/', | ||
method: 'GET' | ||
}, () => { | ||
if (!client.destroyed) { | ||
t.fail() | ||
} | ||
}) | ||
function makeRequest () { | ||
return makeRequestAndExpectUrl(client, sent++, t, () => count--) | ||
} | ||
t.strictEqual(client.running, 0) | ||
client.on('connect', () => { | ||
t.strictEqual(client.running, 0) | ||
process.nextTick(() => { | ||
t.strictEqual(client.running, 1) | ||
client.pipelining = 3 | ||
t.strictEqual(client.running, 2) | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('Set-Cookie', (t) => { | ||
test('destroy in push', (t) => { | ||
t.plan(4) | ||
let _res | ||
const server = createServer((req, res) => { | ||
res.setHeader('content-type', 'text/plain') | ||
res.setHeader('Set-Cookie', ['a cookie', 'another cookie']) | ||
res.end('hello') | ||
res.write('asd') | ||
_res = res | ||
}) | ||
@@ -549,12 +654,23 @@ t.tearDown(server.close.bind(server)) | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
client.request({ path: '/', method: 'GET' }, (err, { body }) => { | ||
t.error(err) | ||
t.strictEqual(statusCode, 200) | ||
t.strictDeepEqual(headers['Set-Cookie'], ['a cookie', 'another cookie']) | ||
const bufs = [] | ||
body.on('data', (buf) => { | ||
bufs.push(buf) | ||
body.once('data', () => { | ||
_res.write('asd') | ||
body.on('data', (buf) => { | ||
body.destroy() | ||
_res.end() | ||
}).on('error', (err) => { | ||
t.ok(err) | ||
}) | ||
}) | ||
body.on('end', () => { | ||
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) | ||
}) | ||
client.request({ path: '/', method: 'GET' }, (err, { body }) => { | ||
t.error(err) | ||
let buf = '' | ||
body.on('data', (chunk) => { | ||
buf = chunk.toString() | ||
_res.end() | ||
}).on('end', () => { | ||
t.strictEqual('asd', buf) | ||
}) | ||
@@ -561,0 +677,0 @@ }) |
'use strict' | ||
const { test } = require('tap') | ||
const { Client } = require('..') | ||
const { Client, errors } = require('..') | ||
const { createServer } = require('http') | ||
const { kSocket } = require('../lib/symbols') | ||
test('close waits for the in-flight requests to finish', (t) => { | ||
t.plan(10) | ||
test('close waits for queued requests to finish', (t) => { | ||
t.plan(16) | ||
@@ -19,5 +20,4 @@ const server = createServer() | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 1 | ||
}) | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
@@ -28,4 +28,4 @@ client.request({ path: '/', method: 'GET' }, function (err, data) { | ||
client.request({ path: '/', method: 'GET' }, onRequest) | ||
client.request({ path: '/', method: 'GET' }, reqClosed) | ||
client.request({ path: '/', method: 'GET' }, reqClosed) | ||
client.request({ path: '/', method: 'GET' }, onRequest) | ||
client.request({ path: '/', method: 'GET' }, onRequest) | ||
@@ -51,6 +51,168 @@ // needed because the next element in the queue will be called | ||
} | ||
}) | ||
function reqClosed (err) { | ||
t.equal(err.message, 'The client is closed') | ||
} | ||
test('destroy invoked all pending callbacks', (t) => { | ||
t.plan(4) | ||
const server = createServer() | ||
server.on('request', (req, res) => { | ||
res.write('hello') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`, { | ||
pipelining: 2 | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, data) => { | ||
t.error(err) | ||
data.body.on('error', (err) => { | ||
t.ok(err) | ||
}).resume() | ||
client.destroy() | ||
}) | ||
client.request({ path: '/', method: 'GET' }, (err) => { | ||
t.ok(err instanceof errors.ClientDestroyedError) | ||
}) | ||
client.request({ path: '/', method: 'GET' }, (err) => { | ||
t.ok(err instanceof errors.ClientDestroyedError) | ||
}) | ||
}) | ||
}) | ||
test('close waits until socket is destroyed', (t) => { | ||
t.plan(4) | ||
const server = createServer((req, res) => { | ||
res.end(req.url) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
makeRequest() | ||
client.once('connect', () => { | ||
let done = false | ||
client[kSocket].on('close', () => { | ||
done = true | ||
}) | ||
client.close((err) => { | ||
t.error(err) | ||
t.strictEqual(client.closed, true) | ||
t.strictEqual(done, true) | ||
}) | ||
}) | ||
function makeRequest () { | ||
client.request({ path: '/', method: 'GET' }, (err, data) => { | ||
t.error(err instanceof errors.ClientClosedError) | ||
}) | ||
return !client.full | ||
} | ||
}) | ||
}) | ||
test('close should still reconnect', (t) => { | ||
t.plan(6) | ||
const server = createServer((req, res) => { | ||
res.end(req.url) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
t.ok(makeRequest()) | ||
t.ok(!makeRequest()) | ||
client.close((err) => { | ||
t.strictEqual(err, null) | ||
t.strictEqual(client.closed, true) | ||
}) | ||
client[kSocket].destroy() | ||
function makeRequest () { | ||
client.request({ path: '/', method: 'GET' }, (err, data) => { | ||
data.body.resume() | ||
t.error(err) | ||
}) | ||
return !client.full | ||
} | ||
}) | ||
}) | ||
test('close should call callback once finished', (t) => { | ||
t.plan(6) | ||
const server = createServer((req, res) => { | ||
setImmediate(function () { | ||
res.end(req.url) | ||
}) | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
t.ok(makeRequest()) | ||
t.ok(!makeRequest()) | ||
client.close((err) => { | ||
t.strictEqual(err, null) | ||
t.strictEqual(client.closed, true) | ||
}) | ||
function makeRequest () { | ||
client.request({ path: '/', method: 'GET' }, (err, data) => { | ||
t.error(err) | ||
data.body.resume() | ||
}) | ||
return !client.full | ||
} | ||
}) | ||
}) | ||
test('closed and destroyed errors', (t) => { | ||
t.plan(4) | ||
const client = new Client('http://localhost:4000') | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err) => { | ||
t.ok(err) | ||
}) | ||
client.close((err) => { | ||
t.error(err) | ||
}) | ||
client.request({}, (err) => { | ||
t.ok(err instanceof errors.ClientClosedError) | ||
client.destroy() | ||
client.request({}, (err) => { | ||
t.ok(err instanceof errors.ClientDestroyedError) | ||
}) | ||
}) | ||
}) | ||
test('close after and destroy should error', (t) => { | ||
t.plan(2) | ||
const client = new Client('http://localhost:4000') | ||
t.tearDown(client.destroy.bind(client)) | ||
client.destroy() | ||
client.close((err) => { | ||
t.ok(err instanceof errors.ClientDestroyedError) | ||
}) | ||
client.close().catch((err) => { | ||
t.ok(err instanceof errors.ClientDestroyedError) | ||
}) | ||
}) |
@@ -41,35 +41,1 @@ 'use strict' | ||
}) | ||
test('https get with https opts', (t) => { | ||
t.plan(6) | ||
const server = createServer(pem, (req, res) => { | ||
t.strictEqual('/', req.url) | ||
t.strictEqual('GET', req.method) | ||
res.setHeader('content-type', 'text/plain') | ||
res.end('hello') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`https://localhost:${server.address().port}`, { | ||
https: { | ||
rejectUnauthorized: false | ||
} | ||
}) | ||
t.tearDown(client.close.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
t.strictEqual(statusCode, 200) | ||
t.strictEqual(headers['content-type'], 'text/plain') | ||
const bufs = [] | ||
body.on('data', (buf) => { | ||
bufs.push(buf) | ||
}) | ||
body.on('end', () => { | ||
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) | ||
}) | ||
}) | ||
}) | ||
}) |
235
test/pool.js
@@ -5,10 +5,11 @@ 'use strict' | ||
const { test } = require('tap') | ||
const { Pool } = require('..') | ||
const undici = require('..') | ||
const { Pool, errors } = require('..') | ||
const { createServer } = require('http') | ||
const { EventEmitter } = require('events') | ||
const { promisify } = require('util') | ||
const eos = require('end-of-stream') | ||
const eos = require('stream').finished | ||
test('basic get', (t) => { | ||
t.plan(6) | ||
t.plan(9) | ||
@@ -23,5 +24,5 @@ const server = createServer((req, res) => { | ||
server.listen(0, () => { | ||
const client = new Pool(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
server.listen(0, async () => { | ||
const client = undici(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
@@ -40,5 +41,40 @@ client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
}) | ||
client.close((err) => { | ||
t.error(err) | ||
client.destroy((err) => { | ||
t.error(err) | ||
client.close((err) => { | ||
t.ok(err instanceof errors.ClientDestroyedError) | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('basic get error async/await', (t) => { | ||
t.plan(2) | ||
const server = createServer((req, res) => { | ||
res.destroy() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = undici(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
await client.request({ path: '/', method: 'GET' }) | ||
.catch((err) => { | ||
t.ok(err) | ||
}) | ||
await client.destroy() | ||
await client.close().catch((err) => { | ||
t.ok(err instanceof errors.ClientDestroyedError) | ||
}) | ||
}) | ||
}) | ||
test('basic get with async/await', async (t) => { | ||
@@ -55,3 +91,3 @@ const server = createServer((req, res) => { | ||
const client = new Pool(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
t.tearDown(client.destroy.bind(client)) | ||
@@ -64,8 +100,81 @@ const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' }) | ||
await promisify(eos)(body) | ||
await client.close() | ||
await client.destroy() | ||
}) | ||
test('stream get async/await', async (t) => { | ||
const server = createServer((req, res) => { | ||
t.strictEqual('/', req.url) | ||
t.strictEqual('GET', req.method) | ||
res.setHeader('content-type', 'text/plain') | ||
res.end('hello') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
await promisify(server.listen.bind(server))(0) | ||
const client = new Pool(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
await client.stream({ path: '/', method: 'GET' }, ({ statusCode, headers }) => { | ||
t.strictEqual(statusCode, 200) | ||
t.strictEqual(headers['content-type'], 'text/plain') | ||
}) | ||
}) | ||
test('stream get error async/await', (t) => { | ||
t.plan(1) | ||
const server = createServer((req, res) => { | ||
res.destroy() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = undici(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
await client.stream({ path: '/', method: 'GET' }, () => { | ||
}) | ||
.catch((err) => { | ||
t.ok(err) | ||
}) | ||
}) | ||
}) | ||
test('pipeline get', (t) => { | ||
t.plan(5) | ||
const server = createServer((req, res) => { | ||
t.strictEqual('/', req.url) | ||
t.strictEqual('GET', req.method) | ||
res.setHeader('content-type', 'text/plain') | ||
res.end('hello') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = undici(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
const bufs = [] | ||
client.pipeline({ path: '/', method: 'GET' }, ({ statusCode, headers, body }) => { | ||
t.strictEqual(statusCode, 200) | ||
t.strictEqual(headers['content-type'], 'text/plain') | ||
return body | ||
}) | ||
.end() | ||
.on('data', (buf) => { | ||
bufs.push(buf) | ||
}) | ||
.on('end', () => { | ||
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) | ||
}) | ||
}) | ||
}) | ||
test('backpressure algorithm', (t) => { | ||
const seen = [] | ||
let total = 0 | ||
let writeMore = false | ||
@@ -77,7 +186,15 @@ class FakeClient extends EventEmitter { | ||
this.id = total++ | ||
this._full = false | ||
} | ||
get full () { | ||
return this._full | ||
} | ||
get connected () { | ||
return true | ||
} | ||
request (req, cb) { | ||
seen.push({ req, cb, client: this }) | ||
return writeMore | ||
seen.push({ req, cb, client: this, id: this.id }) | ||
} | ||
@@ -94,37 +211,39 @@ } | ||
writeMore = true | ||
pool.request({}, noop) | ||
pool.request({}, noop) | ||
const d1 = seen.shift() | ||
const d2 = seen.shift() | ||
const d1 = seen.shift() // d1 = c0 | ||
t.strictEqual(d1.id, 0) | ||
const d2 = seen.shift() // d1 = c0 | ||
t.strictEqual(d1.id, 0) | ||
t.strictEqual(d1.client, d2.client) | ||
t.strictEqual(d1.id, d2.id) | ||
writeMore = false | ||
pool.request({}, noop) | ||
pool.request({}, noop) // d3 = c0 | ||
writeMore = true | ||
pool.request({}, noop) | ||
d1.client._full = true | ||
pool.request({}, noop) // d4 = c1 | ||
const d3 = seen.shift() | ||
t.strictEqual(d3.id, 0) | ||
const d4 = seen.shift() | ||
t.strictEqual(d4.id, 1) | ||
t.strictEqual(d3.client, d2.client) | ||
t.notStrictEqual(d3.client, d4.client) | ||
t.strictEqual(d3.id, d2.id) | ||
t.notStrictEqual(d3.id, d4.id) | ||
d3.client.emit('drain') | ||
pool.request({}, noop) // d5 = c1 | ||
writeMore = false | ||
pool.request({}, noop) | ||
d1.client._full = false | ||
writeMore = true | ||
pool.request({}, noop) | ||
pool.request({}, noop) // d6 = c0 | ||
const d5 = seen.shift() | ||
t.strictEqual(d5.id, 1) | ||
const d6 = seen.shift() | ||
t.strictEqual(d6.id, 0) | ||
t.strictEqual(d5.client, d4.client) | ||
t.strictEqual(d3.client, d6.client) | ||
t.strictEqual(d5.id, d4.id) | ||
t.strictEqual(d3.id, d6.id) | ||
@@ -135,1 +254,61 @@ t.end() | ||
function noop () {} | ||
test('full', (t) => { | ||
t.plan(8 * 6) | ||
const server = createServer((req, res) => { | ||
t.strictEqual('/', req.url) | ||
t.strictEqual('GET', req.method) | ||
res.setHeader('content-type', 'text/plain') | ||
res.end('hello') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, async () => { | ||
const client = undici(`http://localhost:${server.address().port}`, { | ||
connections: 2, | ||
pipelining: 2 | ||
}) | ||
t.tearDown(client.destroy.bind(client)) | ||
for (let n = 0; n < 8; ++n) { | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
t.strictEqual(statusCode, 200) | ||
t.strictEqual(headers['content-type'], 'text/plain') | ||
const bufs = [] | ||
body.on('data', (buf) => { | ||
bufs.push(buf) | ||
}) | ||
body.on('end', () => { | ||
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) | ||
}) | ||
}) | ||
} | ||
}) | ||
}) | ||
test('invalid options throws', (t) => { | ||
t.plan(6) | ||
try { | ||
new Pool(null, { connections: 0 }) // eslint-disable-line | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid connections') | ||
} | ||
try { | ||
new Pool(null, { connections: -1 }) // eslint-disable-line | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid connections') | ||
} | ||
try { | ||
new Pool(null, { connections: true }) // eslint-disable-line | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid connections') | ||
} | ||
}) |
@@ -6,3 +6,3 @@ 'use strict' | ||
const { createServer } = require('http') | ||
const { Client } = require('..') | ||
const { Client, errors } = require('..') | ||
@@ -26,3 +26,3 @@ const server = createServer((req, res) => { | ||
client.request({ path: null, method: 'GET' }, (err, res) => { | ||
t.ok(err) | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'path must be a valid path') | ||
@@ -33,3 +33,3 @@ t.strictEqual(res, null) | ||
client.request({ path: 'aaa', method: 'GET' }, (err, res) => { | ||
t.ok(err) | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'path must be a valid path') | ||
@@ -47,3 +47,3 @@ t.strictEqual(res, null) | ||
client.request({ path: '/', method: null }, (err, res) => { | ||
t.ok(err) | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'method must be a valid method') | ||
@@ -54,3 +54,3 @@ t.strictEqual(res, null) | ||
client.request({ path: '/', method: 'WOOW' }, (err, res) => { | ||
t.ok(err) | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'method must be a valid method') | ||
@@ -68,3 +68,3 @@ t.strictEqual(res, null) | ||
client.request({ path: '/', method: 'POST', body: 42 }, (err, res) => { | ||
t.ok(err) | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'body must be a string, a Buffer or a Readable stream') | ||
@@ -75,3 +75,3 @@ t.strictEqual(res, null) | ||
client.request({ path: '/', method: 'POST', body: { hello: 'world' } }, (err, res) => { | ||
t.ok(err) | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'body must be a string, a Buffer or a Readable stream') | ||
@@ -78,0 +78,0 @@ t.strictEqual(res, null) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
191000
1
32
6048
0
524
9
7
28
- Removedend-of-stream@^1.4.1
- Removedfastq@^1.6.0
- Removedreadable-stream@^3.0.0
- Removedretimer@^2.0.0
- Removedsyncthrough@^0.5.0
- Removedend-of-stream@1.4.4(transitive)
- Removedfastq@1.17.1(transitive)
- Removedinherits@2.0.4(transitive)
- Removedlistenercount@1.0.1(transitive)
- Removedonce@1.4.0(transitive)
- Removedprocess-nextick-args@1.0.7(transitive)
- Removedreadable-stream@3.6.2(transitive)
- Removedretimer@2.0.0(transitive)
- Removedreusify@1.0.4(transitive)
- Removedsafe-buffer@5.2.1(transitive)
- Removedstring_decoder@1.3.0(transitive)
- Removedsyncthrough@0.5.0(transitive)
- Removedutil-deprecate@1.0.2(transitive)
- Removedwrappy@1.0.2(transitive)