Socket
Socket
Sign inDemoInstall

undici

Package Overview
Dependencies
Maintainers
2
Versions
212
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

undici - npm Package Compare versions

Comparing version 1.2.2 to 1.2.3

.github/workflows/bench.yml

81

benchmarks/index.js

@@ -6,2 +6,4 @@ 'use strict'

const undici = require('..')
const { kEnqueue, kGetNext } = require('../lib/symbols')
const Request = require('../lib/request')

@@ -44,3 +46,3 @@ // # Start the h2o server (in h2o repository)

suite
.add('http - keepalive - pipe', {
.add('http - keepalive', {
defer: true,

@@ -62,5 +64,17 @@ fn: deferred => {

})
.add('undici - pipeline - pipe', {
.add('http - noop', {
defer: true,
fn: deferred => {
http.get(httpOptions, response => {
response
.resume()
.on('end', () => {
deferred.resolve()
})
})
}
})
.add('undici - pipeline', {
defer: true,
fn: deferred => {
pool

@@ -81,3 +95,3 @@ .pipeline(undiciOptions, data => {

})
.add('undici - request - pipe', {
.add('undici - request', {
defer: true,

@@ -103,3 +117,3 @@ fn: deferred => {

})
.add('undici - stream - pipe', {
.add('undici - stream', {
defer: true,

@@ -125,2 +139,24 @@ fn: deferred => {

})
.add('undici - simple', {
defer: true,
fn: deferred => {
const stream = new Writable({
write (chunk, encoding, callback) {
callback()
}
})
stream.once('finish', () => {
deferred.resolve()
})
const client = pool[kGetNext]()
client[kEnqueue](new SimpleRequest(client, undiciOptions, stream))
}
})
.add('undici - noop', {
defer: true,
fn: deferred => {
const client = pool[kGetNext]()
client[kEnqueue](new NoopRequest(client, undiciOptions, deferred))
}
})
.on('cycle', event => {

@@ -130,1 +166,38 @@ console.log(String(event.target))

.run()
class NoopRequest extends Request {
constructor (client, opts, deferred) {
super(opts, client)
this.deferred = deferred
}
_onHeaders () {}
_onBody () {}
_onComplete () {
this.deferred.resolve()
}
}
class SimpleRequest extends Request {
constructor (client, opts, dst) {
super(opts, client)
this.dst = dst
this.dst.on('drain', () => {
this.resume()
})
}
_onHeaders (statusCode, headers, resume) {
this.resume = resume
}
_onBody (chunk, offset, length) {
return this.dst.write(chunk.slice(offset, offset + length))
}
_onComplete () {
this.dst.end()
}
}

1114

lib/client.js

@@ -0,25 +1,272 @@

'use strict'
const { URL } = require('url')
const net = require('net')
const tls = require('tls')
// TODO: This is not really allowed by Node but it works for now.
const { HTTPParser } = process.binding('http_parser') // eslint-disable-line
const EventEmitter = require('events')
const assert = require('assert')
const util = require('./util')
const {
Readable,
Duplex,
PassThrough,
finished
} = require('stream')
const {
ContentLengthMismatchError,
SocketTimeoutError,
InvalidArgumentError,
InvalidReturnValueError,
RequestAbortedError
RequestAbortedError,
ClientDestroyedError,
ClientClosedError,
HeadersTimeoutError,
SocketError,
InformationalError,
NotSupportedError
} = require('./errors')
const {
kUrl,
kReset,
kResume,
kConnect,
kResuming,
kWriting,
kQueue,
kServerName,
kSocketTimeout,
kRequestTimeout,
kTLSOpts,
kClosed,
kDestroyed,
kPendingIdx,
kRunningIdx,
kError,
kOnDestroyed,
kPipelining,
kRetryDelay,
kRetryTimeout,
kMaxAbortedPayload,
kSocket,
kSocketPath,
kEnqueue,
kResume
kMaxHeadersSize,
kHeadersTimeout
} = require('./symbols')
const ClientBase = require('./client-base')
const assert = require('assert')
const util = require('./util')
const makeStream = require('./client-stream')
const makeRequest = require('./client-request')
const makePipeline = require('./client-pipeline')
class Client extends ClientBase {
request (opts, callback) {
const CRLF = Buffer.from('\r\n', 'ascii')
const nodeMajorVersion = parseInt(process.version.split('.')[0].slice(1))
const insecureHTTPParser = process.execArgv.includes('--insecure-http-parser')
class Client extends EventEmitter {
constructor (url, {
maxAbortedPayload,
maxHeaderSize,
headersTimeout,
socketTimeout,
socketPath,
requestTimeout,
pipelining,
tls
} = {}) {
super()
if (typeof url === 'string') {
url = new URL(url)
}
if (!url || typeof url !== 'object') {
throw new InvalidArgumentError('invalid url')
}
if (url.port != null && url.port !== '' && !Number.isFinite(parseInt(url.port))) {
throw new InvalidArgumentError('invalid port')
}
if (socketPath != null && typeof socketPath !== 'string') {
throw new InvalidArgumentError('invalid socketPath')
}
if (url.hostname != null && typeof url.hostname !== 'string') {
throw new InvalidArgumentError('invalid hostname')
}
if (!/https?/.test(url.protocol)) {
throw new InvalidArgumentError('invalid protocol')
}
if (/\/.+/.test(url.pathname) || url.search || url.hash) {
throw new InvalidArgumentError('invalid url')
}
if (maxAbortedPayload != null && !Number.isFinite(maxAbortedPayload)) {
throw new InvalidArgumentError('invalid maxAbortedPayload')
}
if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) {
throw new InvalidArgumentError('invalid maxHeaderSize')
}
if (socketTimeout != null && !Number.isFinite(socketTimeout)) {
throw new InvalidArgumentError('invalid socketTimeout')
}
if (requestTimeout != null && !Number.isFinite(requestTimeout)) {
throw new InvalidArgumentError('invalid requestTimeout')
}
if (headersTimeout != null && !Number.isFinite(headersTimeout)) {
throw new InvalidArgumentError('invalid headersTimeout')
}
this[kSocket] = null
this[kReset] = false
this[kPipelining] = pipelining || 1
this[kMaxHeadersSize] = maxHeaderSize || 16384
this[kHeadersTimeout] = headersTimeout == null ? 30e3 : headersTimeout
this[kUrl] = url
this[kSocketPath] = socketPath
this[kSocketTimeout] = socketTimeout == null ? 30e3 : socketTimeout
this[kRequestTimeout] = requestTimeout == null ? 30e3 : requestTimeout
this[kClosed] = false
this[kDestroyed] = false
this[kServerName] = null
this[kTLSOpts] = tls
this[kRetryDelay] = 0
this[kRetryTimeout] = null
this[kOnDestroyed] = []
this[kWriting] = false
this[kResuming] = false
this[kMaxAbortedPayload] = maxAbortedPayload || 1048576
// kQueue is built up of 3 sections separated by
// the kRunningIdx and kPendingIdx indices.
// | complete | running | pending |
// ^ kRunningIdx ^ kPendingIdx ^ kQueue.length
// kRunningIdx points to the first running element.
// kPendingIdx points to the first pending element.
// This implements a fast queue with an amortized
// time of O(1).
this[kQueue] = []
this[kRunningIdx] = 0
this[kPendingIdx] = 0
}
get pipelining () {
return this[kPipelining]
}
set pipelining (value) {
this[kPipelining] = value
resume(this)
}
get connected () {
return (
this[kSocket] &&
this[kSocket].connecting !== true &&
// Older versions of Node don't set secureConnecting to false.
(this[kSocket].authorized !== false ||
this[kSocket].authorizationError
) &&
!this[kSocket].destroyed
)
}
get pending () {
return this[kQueue].length - this[kPendingIdx]
}
get running () {
return this[kPendingIdx] - this[kRunningIdx]
}
get size () {
return this[kQueue].length - this[kRunningIdx]
}
get busy () {
if (this.running >= this[kPipelining]) {
return true
}
if (this.size >= this[kPipelining]) {
return true
}
if (this[kReset] || this[kWriting]) {
return true
}
if (this[kResuming]) {
for (let n = this[kPendingIdx]; n < this[kQueue].length; n++) {
const { idempotent, body, method } = this[kQueue][n]
if (!idempotent) {
return true
}
if (method === 'HEAD') {
return true
}
if (util.isStream(body) && util.bodyLength(body) !== 0) {
return true
}
}
} else if (this.pending > 0) {
return true
}
return false
}
get destroyed () {
return this[kDestroyed]
}
get closed () {
return this[kClosed]
}
[kResume] () {
resume(this)
}
[kConnect] (cb) {
connect(this)
if (cb) {
if (this.connected) {
process.nextTick(cb)
} else {
this.once('connect', cb)
}
}
}
[kEnqueue] (request) {
try {
if (this[kDestroyed]) {
throw new ClientDestroyedError()
}
if (this[kClosed]) {
throw new ClientClosedError()
}
this[kQueue].push(request)
if (this[kResuming]) {
// Do nothing.
} else if (util.isStream(request.body)) {
this[kResuming] = true
process.nextTick(resume, this)
} else {
resume(this)
}
} catch (err) {
request.onError(err)
}
}
close (callback) {
if (callback === undefined) {
return new Promise((resolve, reject) => {
this.request(opts, (err, data) => {
this.close((err, data) => {
return err ? reject(err) : resolve(data)

@@ -34,318 +281,665 @@ })

if (!opts || typeof opts !== 'object') {
process.nextTick(callback, new InvalidArgumentError('invalid opts'), null)
if (this[kDestroyed]) {
process.nextTick(callback, new ClientDestroyedError(), null)
return
}
this[kEnqueue](opts, function (err, data) {
if (err) {
process.nextTick(callback, err, null)
return
}
this[kClosed] = true
const {
statusCode,
headers,
opaque,
resume
} = data
if (this.size === 0) {
this.destroy(callback)
} else {
this[kOnDestroyed].push(callback)
}
}
const body = new Readable({
autoDestroy: true,
read: resume,
destroy (err, callback) {
if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
}
if (err) {
resume()
}
callback(err, null)
}
})
body.destroy = this.wrap(body, body.destroy)
destroy (err, callback) {
if (typeof err === 'function') {
callback = err
err = null
}
callback(null, {
statusCode,
headers,
opaque,
body
if (callback === undefined) {
return new Promise((resolve, reject) => {
this.destroy(err, (err, data) => {
return err ? /* istanbul ignore next: should never error */ reject(err) : resolve(data)
})
})
}
return this.wrap(body, function (err, chunk) {
if (this.destroyed) {
return null
} else if (err) {
this.destroy(err)
} else {
const ret = this.push(chunk)
return this.destroyed ? null : ret
}
})
})
if (typeof callback !== 'function') {
throw new InvalidArgumentError('invalid callback')
}
if (this[kDestroyed]) {
if (this[kOnDestroyed]) {
this[kOnDestroyed].push(callback)
} else {
process.nextTick(callback, null, null)
}
return
}
clearTimeout(this[kRetryTimeout])
this[kRetryTimeout] = null
this[kClosed] = true
this[kDestroyed] = true
this[kOnDestroyed].push(callback)
const onDestroyed = () => {
const callbacks = this[kOnDestroyed]
this[kOnDestroyed] = null
for (const callback of callbacks) {
callback(null, null)
}
}
if (!this[kSocket]) {
process.nextTick(onDestroyed)
} else {
util.destroy(this[kSocket].on('close', onDestroyed), err)
}
resume(this)
}
request (opts, callback) {
return makeRequest(this, opts, callback)
}
stream (opts, factory, callback) {
return makeStream(this, opts, factory, callback)
}
pipeline (opts, handler) {
if (!opts || typeof opts !== 'object') {
return new PassThrough().destroy(new InvalidArgumentError('invalid opts'))
return makePipeline(this, opts, handler)
}
}
class Parser extends HTTPParser {
constructor (client, socket) {
/* istanbul ignore next */
if (nodeMajorVersion >= 12) {
super()
this.initialize(
HTTPParser.RESPONSE,
{},
client[kMaxHeadersSize],
insecureHTTPParser,
client[kHeadersTimeout]
)
} else {
super(HTTPParser.RESPONSE, false)
}
if (typeof handler !== 'function') {
return new PassThrough().destroy(new InvalidArgumentError('invalid handler'))
this.client = client
this.socket = socket
this.resumeSocket = () => socket.resume()
this.statusCode = null
this.headers = null
this.read = 0
}
[HTTPParser.kOnTimeout] () {
util.destroy(this.socket, new HeadersTimeoutError())
}
[HTTPParser.kOnHeaders] (rawHeaders) {
this.headers = util.parseHeaders(rawHeaders, this.headers)
}
[HTTPParser.kOnExecute] (ret) {
if (ret instanceof Error) {
const err = ret
if (typeof err.reason === 'string') {
err.message = `Parse Error: ${err.reason}`
}
util.destroy(this.socket, err)
} else {
// When the underlying `net.Socket` instance is consumed - no
// `data` events are emitted, and thus `socket.setTimeout` fires the
// callback even if the data is constantly flowing into the socket.
// See, https://github.com/nodejs/node/commit/ec2822adaad76b126b5cccdeaa1addf2376c9aa6
this.socket._unrefTimer()
}
}
const req = new Readable({
autoDestroy: true,
read () {
if (this[kResume]) {
const resume = this[kResume]
this[kResume] = null
resume()
[HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive) {
const { client, socket, resumeSocket, headers } = this
const request = client[kQueue][client[kRunningIdx]]
// TODO: Check for content-length mismatch?
assert(this.statusCode < 200)
this.headers = null
this.statusCode = statusCode
if (!shouldKeepAlive) {
client[kReset] = true
}
if (upgrade) {
util.destroy(socket, new NotSupportedError('upgrade not supported'))
return true
}
// TODO: More statusCode validation?
if (statusCode < 200) {
request.onInfo(statusCode, util.parseHeaders(rawHeaders, headers))
} else {
request.onHeaders(statusCode, util.parseHeaders(rawHeaders, headers), resumeSocket)
}
return request.method === 'HEAD' || statusCode < 200
}
[HTTPParser.kOnBody] (chunk, offset, length) {
const { client, socket, statusCode } = this
if (statusCode < 200) {
util.destroy(socket, new SocketError('unexpected request body'))
return
}
const request = client[kQueue][client[kRunningIdx]]
this.read += length
const ret = request.onBody(chunk, offset, length)
if (ret == null && this.read > client[kMaxAbortedPayload]) {
util.destroy(socket, new InformationalError('max aborted payload'))
} else if (ret === false) {
socket.pause()
}
}
[HTTPParser.kOnMessageComplete] () {
const { client, socket, statusCode, headers, resumeSocket } = this
const request = client[kQueue][client[kRunningIdx]]
assert(statusCode >= 100)
if (statusCode < 200) {
return
}
this.statusCode = null
this.headers = null
request.onComplete(headers)
this.read = 0
client[kQueue][client[kRunningIdx]++] = null
if (client[kReset]) {
// https://tools.ietf.org/html/rfc7231#section-4.3.1
// https://tools.ietf.org/html/rfc7231#section-4.3.2
// https://tools.ietf.org/html/rfc7231#section-4.3.5
// Sending a payload body on a request that does not
// expect it can cause undefined behavior on some
// servers and corrupt connection state. Do not
// re-use the connection for further requests.
util.destroy(socket, new InformationalError('request reset'))
} else {
resume(client)
}
resumeSocket()
}
destroy (err) {
const { client, socket } = this
assert(err)
assert(socket.destroyed)
this.unconsume()
// Make sure the parser's stack has unwound before deleting the
// corresponding C++ object through .close().
setImmediate(() => this.close())
if (!client.running) {
return
}
// Retry all idempotent requests except for the one
// at the head of the pipeline.
client[kQueue][client[kRunningIdx]++].onError(err)
const retryRequests = []
for (const request of client[kQueue].slice(client[kRunningIdx], client[kPendingIdx])) {
const { idempotent, body } = request
assert(idempotent && !util.isStream(body))
retryRequests.push(request)
}
client[kQueue].splice(0, client[kPendingIdx], ...retryRequests)
client[kPendingIdx] = 0
client[kRunningIdx] = 0
}
}
function connect (client) {
assert(!client[kSocket])
assert(!client[kRetryTimeout])
const { protocol, port, hostname } = client[kUrl]
const servername = client[kServerName] || (client[kTLSOpts] && client[kTLSOpts].servername)
let socket
if (protocol === 'https:') {
const tlsOpts = { ...client[kTLSOpts], servername }
socket = client[kSocketPath]
? tls.connect(client[kSocketPath], tlsOpts)
: tls.connect(port || /* istanbul ignore next */ 443, hostname, tlsOpts)
} else {
socket = client[kSocketPath]
? net.connect(client[kSocketPath])
: net.connect(port || /* istanbul ignore next */ 80, hostname)
}
client[kSocket] = socket
const parser = new Parser(client, socket)
/* istanbul ignore next */
if (nodeMajorVersion >= 12) {
assert(socket._handle)
parser.consume(socket._handle)
} else {
assert(socket._handle && socket._handle._externalStream)
parser.consume(socket._handle._externalStream)
}
socket[kError] = null
socket.setTimeout(client[kSocketTimeout], function () {
util.destroy(this, new SocketTimeoutError())
})
socket
.setNoDelay(true)
.on(protocol === 'https:' ? 'secureConnect' : 'connect', function () {
client[kReset] = false
client[kRetryDelay] = 0
client.emit('connect')
resume(client)
})
.on('data', function () {
/* istanbul ignore next */
assert(false)
})
.on('error', function (err) {
this[kError] = err
if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') {
assert(!client.running)
while (client.pending && client[kQueue][client[kPendingIdx]].servername === servername) {
const request = client[kQueue][client[kPendingIdx]++]
request.onError(err)
}
},
destroy (err, callback) {
if (err) {
if (this[kResume]) {
const resume = this[kResume]
this[kResume] = null
resume(err)
} else {
// Stop ret from scheduling more writes.
util.destroy(ret, err)
}
} else {
if (!this._readableState.endEmitted) {
// This can happen if the server doesn't care
// about the entire request body.
// TODO: Is this fine to ignore?
}
} else if (
!client.running &&
err.code !== 'ECONNRESET' &&
err.code !== 'ECONNREFUSED' &&
err.code !== 'EHOSTUNREACH' &&
err.code !== 'EHOSTDOWN' &&
err.code !== 'UND_ERR_SOCKET' &&
err.code !== 'UND_ERR_INFO'
) {
assert(client[kPendingIdx] === client[kRunningIdx])
// Error is not caused by running request and not a recoverable
// socket error.
for (const request of client[kQueue].splice(client[kRunningIdx])) {
request.onError(err)
}
callback(err)
}
})
let res
let body
.on('end', function () {
util.destroy(this, new SocketError('other side closed'))
})
.on('close', function () {
if (!this[kError]) {
this[kError] = new SocketError('closed')
}
const ret = new Duplex({
readableObjectMode: opts.objectMode,
autoDestroy: true,
read () {
if (body && body.resume) {
body.resume()
}
},
write (chunk, encoding, callback) {
assert(!req.destroyed)
if (req.push(chunk, encoding)) {
callback()
client[kSocket] = null
parser.destroy(this[kError])
if (client.destroyed) {
resume(client)
return
}
if (client.pending > 0) {
if (client[kRetryDelay]) {
client[kRetryTimeout] = setTimeout(() => {
client[kRetryTimeout] = null
connect(client)
}, client[kRetryDelay])
client[kRetryDelay] = Math.min(client[kRetryDelay] * 2, client[kSocketTimeout])
} else {
req[kResume] = callback
connect(client)
client[kRetryDelay] = 1e3
}
},
destroy (err, callback) {
if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
}
util.destroy(req, err)
util.destroy(res, err)
callback(err)
}
}).on('prefinish', () => {
// Node < 15 does not call _final in same tick.
req.push(null)
this[kResume]()
client.emit('disconnect', this[kError])
resume(client)
})
}
// TODO: Avoid copy.
opts = { ...opts, body: req }
function resume (client) {
client[kResuming] = false
while (true) {
if (client[kDestroyed]) {
const err = new ClientDestroyedError()
for (const request of client[kQueue].splice(client[kPendingIdx])) {
request.onError(err)
}
return
}
const request = this[kEnqueue](opts, function (err, data) {
if (err) {
util.destroy(ret, err)
return
if (client.size === 0) {
if (client[kClosed]) {
client.destroy(util.nop)
}
if (client[kRunningIdx] > 0) {
client[kQueue].length = 0
client[kPendingIdx] = 0
client[kRunningIdx] = 0
}
return
}
const {
statusCode,
headers,
opaque,
resume
} = data
if (client[kRunningIdx] > 256) {
client[kQueue].splice(0, client[kRunningIdx])
client[kPendingIdx] -= client[kRunningIdx]
client[kRunningIdx] = 0
}
res = new Readable({
autoDestroy: true,
read: resume,
destroy (err, callback) {
if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
}
if (err) {
util.destroy(ret, err)
resume()
}
callback(err, null)
}
})
res.destroy = this.wrap(res, res.destroy)
if (client.running >= client[kPipelining]) {
return
}
try {
body = handler({
statusCode,
headers,
opaque,
body: res
})
} catch (err) {
res.on('error', util.nop)
if (!ret.destroyed) {
ret.destroy(err)
}
if (!client.pending) {
return
}
const request = client[kQueue][client[kPendingIdx]]
if (request.aborted) {
// Request was aborted.
// TODO: Avoid splice one by one.
client[kQueue].splice(client[kPendingIdx], 1)
continue
}
if (client[kServerName] !== request.servername) {
if (client.running) {
return
}
// TODO: Should we allow !body?
if (!body || typeof body.on !== 'function') {
util.destroy(ret, new InvalidReturnValueError('expected Readable'))
client[kServerName] = request.servername
if (client[kSocket]) {
util.destroy(client[kSocket], new InformationalError('servername changed'))
return
}
}
// TODO: If body === res then avoid intermediate
// and write directly to ret.push? Or should this
// happen when body is null?
if (!client[kSocket] && !client[kRetryTimeout]) {
connect(client)
return
}
// TODO: body.destroy?
if (!client.connected) {
return
}
let ended = false
body
.on('data', function (chunk) {
if (!ret.push(chunk) && this.pause) {
this.pause()
}
if (client[kReset]) {
return
}
if (client[kWriting]) {
return
}
if (client.running && !request.idempotent) {
// Non-idempotent request cannot be retried.
// Ensure that no other requests are inflight and
// could cause failure.
return
}
if (util.isStream(request.body) && util.bodyLength(request.body) === 0) {
request.body
.on('data', function () {
assert(false)
})
.on('error', function (err) {
util.destroy(ret, err)
util.destroy(this)
request.onError(err)
})
.on('end', function () {
ended = true
ret.push(null)
util.destroy(this)
})
.on('close', function () {
if (!ended) {
util.destroy(ret, new RequestAbortedError())
}
})
request.body = null
}
return this.wrap(res, function (err, chunk) {
if (this.destroyed) {
return null
} else if (err) {
this.destroy(err)
} else {
const ret = this.push(chunk)
return this.destroyed ? null : ret
}
})
})
if (client.running && util.isStream(request.body)) {
// Request with stream body can error while other requests
// are inflight and indirectly error those as well.
// Ensure this doesn't happen by waiting for inflight
// to complete before dispatching.
ret.destroy = request.wrap(ret, ret.destroy)
// Request with stream body cannot be retried.
// Ensure that no other requests are inflight and
// could cause failure.
return
}
return ret
try {
write(client, request)
client[kPendingIdx]++
} catch (err) {
request.onError(err)
}
}
}
stream (opts, factory, callback) {
if (callback === undefined) {
return new Promise((resolve, reject) => {
this.stream(opts, factory, (err, data) => {
return err ? reject(err) : resolve(data)
})
})
}
function write (client, request) {
const { method } = request
if (typeof callback !== 'function') {
throw new InvalidArgumentError('invalid callback')
}
let contentLength = util.bodyLength(request.body, true)
if (!opts || typeof opts !== 'object') {
process.nextTick(callback, new InvalidArgumentError('invalid opts'), null)
return
}
if (contentLength === undefined) {
contentLength = request.contentLength
}
if (typeof factory !== 'function') {
process.nextTick(callback, new InvalidArgumentError('invalid factory'), null)
return
// TODO: What other methods expect a payload?
const expectsPayload = (
method === 'PUT' ||
method === 'POST' ||
method === 'PATCH'
)
if (contentLength === 0 && !expectsPayload) {
// https://tools.ietf.org/html/rfc7230#section-3.3.2
// A user agent SHOULD NOT send a Content-Length header field when
// the request message does not contain a payload body and the method
// semantics do not anticipate such a body.
contentLength = undefined
}
if (request.contentLength !== undefined && request.contentLength !== contentLength) {
throw new ContentLengthMismatchError()
}
const { body, header } = request
const socket = client[kSocket]
socket.cork()
socket.write(header)
if (contentLength !== undefined) {
socket.write(`content-length: ${contentLength}\r\n`, 'ascii')
}
if (method === 'HEAD') {
// https://github.com/mcollina/undici/issues/258
// Close after a HEAD request to interop with misbehaving servers
// that may send a body in the response.
client[kReset] = true
}
// TODO: An HTTP/1.1 user agent MUST NOT preface
// or follow a request with an extra CRLF.
// https://tools.ietf.org/html/rfc7230#section-3.5
if (!body) {
socket.write(CRLF)
if (contentLength === 0) {
socket.write(CRLF)
} else {
assert(contentLength === undefined, 'no body must not have content length')
}
} else if (body instanceof Uint8Array) {
assert(contentLength !== undefined, 'buffer body must have content length')
this[kEnqueue](opts, function (err, data) {
if (err) {
process.nextTick(callback, err, null)
socket.write(CRLF)
socket.write(body)
socket.write(CRLF)
request.body = null
client[kReset] = !expectsPayload
} else if (util.isStream(body)) {
assert(contentLength !== 0 || !client.running, 'stream body cannot be pipelined')
let finished = false
let bytesWritten = 0
const onData = function (chunk) {
assert(!finished)
const len = Buffer.byteLength(chunk)
if (!len) {
return
}
const {
statusCode,
headers,
opaque,
resume
} = data
let body
try {
body = factory({
statusCode,
headers,
opaque
})
} catch (err) {
callback(err, null)
// TODO: What if not ended and bytesWritten === contentLength?
// We should defer writing chunks.
if (contentLength !== undefined && bytesWritten + len > contentLength) {
util.destroy(this, new ContentLengthMismatchError())
return
}
if (!body) {
callback(null, null)
if (bytesWritten === 0) {
socket.write(contentLength === undefined ? 'transfer-encoding: chunked\r\n' : '\r\n', 'ascii')
client[kReset] = !expectsPayload
}
if (contentLength === undefined) {
socket.write(`\r\n${len.toString(16)}\r\n`, 'ascii')
}
bytesWritten += len
if (!socket.write(chunk) && this.pause) {
this.pause()
}
}
const onDrain = function () {
assert(!finished)
if (body.resume) {
body.resume()
}
}
const onAbort = function () {
onFinished(new RequestAbortedError())
}
const onFinished = function (err) {
if (finished) {
return
}
if (
typeof body.write !== 'function' ||
typeof body.end !== 'function' ||
typeof body.on !== 'function'
) {
callback(new InvalidReturnValueError('expected Writable'), null)
finished = true
assert(client[kWriting] && client.running <= 1)
client[kWriting] = false
if (!err) {
err = socket[kError]
}
if (!err && contentLength !== undefined && bytesWritten !== contentLength) {
err = new ContentLengthMismatchError()
}
socket
.removeListener('drain', onDrain)
.removeListener('error', onFinished)
.removeListener('close', onFinished)
body
.removeListener('data', onData)
.removeListener('end', onFinished)
.removeListener('error', onFinished)
.removeListener('close', onAbort)
request.body = null
util.destroy(body, err)
if (err) {
util.destroy(socket, err)
return
}
body.on('drain', resume)
// TODO: Avoid finished. It registers an unecessary amount of listeners.
finished(body, { readable: false }, (err) => {
body.removeListener('drain', resume)
if (err) {
util.destroy(body, err)
resume()
} else {
// TODO: destroy if body is not Readable?
if (bytesWritten === 0) {
if (contentLength === undefined && expectsPayload) {
// https://tools.ietf.org/html/rfc7230#section-3.3.2
// A user agent SHOULD send a Content-Length in a request message when
// no Transfer-Encoding is sent and the request method defines a meaning
// for an enclosed payload body.
socket.write('content-length: 0\r\n\r\n', 'ascii')
}
callback(err, null)
})
if (typeof body.destroy === 'function') {
body.destroy = this.wrap(body, body.destroy)
} else if (contentLength === undefined) {
socket.write('\r\n0\r\n', 'ascii')
}
return this.wrap(body, function (err, chunk) {
if (util.isDestroyed(this)) {
return null
} else if (err) {
util.destroy(this, err)
} else if (chunk == null) {
this.end()
} else {
const ret = this.write(chunk)
return util.isDestroyed(this) ? null : ret
}
})
})
socket.write(CRLF)
resume(client)
}
body
.on('data', onData)
.on('end', onFinished)
.on('error', onFinished)
.on('close', onAbort)
socket
.on('drain', onDrain)
.on('error', onFinished)
.on('close', onFinished)
client[kWriting] = true
} else {
/* istanbul ignore next */
assert(false)
}
socket.uncork()
}
module.exports = Client

@@ -8,3 +8,4 @@ 'use strict'

const {
kClients
kClients,
kGetNext
} = require('./symbols')

@@ -36,2 +37,7 @@

/* istanbul ignore next: use by benchmark */
[kGetNext] () {
return getNext(this)
}
stream (opts, factory, cb) {

@@ -38,0 +44,0 @@ // needed because we need the return value from client.stream

@@ -12,3 +12,9 @@ 'use strict'

const util = require('./util')
const { kRequestTimeout, kUrl } = require('./symbols')
const kAbort = Symbol('abort')
const kTimeout = Symbol('timeout')
const kResume = Symbol('resume')
const kSignal = Symbol('signal')
class Request extends AsyncResource {

@@ -25,8 +31,8 @@ constructor ({

requestTimeout
}, hostname, callback) {
}, {
[kRequestTimeout]: defaultRequestTimeout,
[kUrl]: { hostname: defaultHostname }
}) {
super('UNDICI_REQ')
assert(typeof hostname === 'string')
assert(typeof callback === 'function')
if (typeof path !== 'string' || path[0] !== '/') {

@@ -40,2 +46,10 @@ throw new InvalidArgumentError('path must be a valid path')

if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
}
requestTimeout = requestTimeout == null && defaultRequestTimeout
? defaultRequestTimeout
: requestTimeout
if (requestTimeout != null && (!Number.isInteger(requestTimeout) || requestTimeout < 0)) {

@@ -45,6 +59,2 @@ throw new InvalidArgumentError('requestTimeout must be a positive integer or zero')

if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must implement .on(name, callback)')
}
this.method = method

@@ -54,8 +64,4 @@

this.body = null
} else if (typeof body.on === 'function') {
this.body = body.on('error', (err) => {
// TODO: Ignore error if body has ended?
this.onError(err)
})
assert(this.body === body)
} else if (util.isStream(body)) {
this.body = body
} else if (body instanceof Uint8Array) {

@@ -72,11 +78,9 @@ this.body = body.length ? body : null

this.servername = servername || hostHeader || hostname
if (net.isIP(this.servername) || this.servername.startsWith('[')) {
this.servername = servername || hostHeader || null
if (net.isIP(this.servername) || /^\[/.test(this.servername)) {
this.servername = null
}
this.callback = callback
this.aborted = false
this.finished = false
this.opaque = opaque

@@ -96,2 +100,6 @@

if (!hostHeader) {
header += `host: ${defaultHostname}\r\n`
}
if (headers) {

@@ -106,3 +114,3 @@ for (const [key, val] of Object.entries(headers)) {

if (
this.contentLength == null &&
this.contentLength === undefined &&
key.length === 14 &&

@@ -126,11 +134,16 @@ key.toLowerCase() === 'content-length'

if (!hostHeader) {
header += `host: ${hostname}\r\n`
}
this.header = Buffer.from(header, 'ascii')
}
if (util.isStream(this.body)) {
// TODO: Cleanup listeners?
this.body.on('error', (err) => {
// TODO: Ignore error if body has ended?
this.onError(err)
})
}
if (signal) {
const onAbort = () => {
this[kSignal] = signal
this[kAbort] = () => {
this.onError(new RequestAbortedError())

@@ -140,9 +153,12 @@ }

if ('addEventListener' in signal) {
signal.addEventListener('abort', onAbort)
signal.addEventListener('abort', this[kAbort])
} else {
signal.once('abort', onAbort)
signal.addListener('abort', this[kAbort])
}
} else {
this[kSignal] = null
this[kAbort] = null
}
this.timeout = requestTimeout
this[kTimeout] = requestTimeout
? setTimeout((self) => {

@@ -154,69 +170,98 @@ self.onError(new RequestTimeoutError())

wrap (that, cb) {
return this.runInAsyncScope.bind(this, cb, that)
onInfo (statusCode, headers) {
if (this.aborted) {
return
}
if (this._onInfo) {
this.runInAsyncScope(this._onInfo, this, statusCode, headers)
}
}
onHeaders (statusCode, headers, resume) {
if (statusCode < 200) {
// TODO: Informational response.
if (this.aborted) {
return
}
if (this.finished) {
return
const { [kTimeout]: timeout } = this
if (timeout) {
this[kTimeout] = null
clearTimeout(timeout)
}
this.finished = true
clearTimeout(this.timeout)
this.timeout = null
this[kResume] = resume
this.res = this.runInAsyncScope(this.callback, this, null, {
statusCode,
headers,
opaque: this.opaque,
resume
})
assert(!this.res || typeof this.res === 'function')
this.runInAsyncScope(this._onHeaders, this, statusCode, headers, resume)
}
onBody (chunk, offset, length) {
if (this.res) {
return this.res(null, chunk.slice(offset, offset + length))
if (this.aborted) {
return null
}
return this.runInAsyncScope(this._onBody, this, chunk, offset, length)
}
onComplete (trailers) {
// TODO: Trailers?
if (this.res) {
const res = this.res
this.res = null
res(null, null)
if (this.aborted) {
return
}
}
onError (err, sync) {
if (util.isStream(this.body)) {
// TODO: If this.body.destroy doesn't exists or doesn't emit 'error' or
// 'close', it can halt execution in client.
const body = this.body
const { [kSignal]: signal, body } = this
if (body) {
this.body = null
util.destroy(body, err)
util.destroy(body)
}
if (this.res) {
const res = this.res
this.res = null
res(err, null)
if (signal) {
this[kSignal] = null
if ('removeEventListener' in signal) {
signal.removeEventListener('abort', this[kAbort])
} else {
signal.removeListener('abort', this[kAbort])
}
}
if (this.finished) {
this.runInAsyncScope(this._onComplete, this, trailers)
}
onError (err) {
if (this.aborted) {
return
}
this.finished = true
this.aborted = true
clearTimeout(this.timeout)
this.timeout = null
const {
body,
[kTimeout]: timeout,
[kSignal]: signal,
[kResume]: resume
} = this
this.runInAsyncScope(this.callback, this, err, null)
if (resume) {
resume()
}
if (timeout) {
this[kTimeout] = null
clearTimeout(timeout)
}
if (body) {
this.body = null
util.destroy(body, err)
}
if (signal) {
this[kSignal] = null
if ('removeEventListener' in signal) {
signal.removeEventListener('abort', this[kAbort])
} else {
signal.removeListener('abort', this[kAbort])
}
}
this.runInAsyncScope(this._onError, this, err)
}

@@ -223,0 +268,0 @@ }

module.exports = {
kUrl: Symbol('url'),
kWriting: Symbol('writing'),
kResuming: Symbol('resuming'),
kQueue: Symbol('queue'),
kConnect: Symbol('connect'),
kSocketTimeout: Symbol('socket timeout'),

@@ -11,2 +13,3 @@ kRequestTimeout: Symbol('request timeout'),

kReset: Symbol('reset'),
kGetNext: Symbol('getNext'),
kDestroyed: Symbol('destroyed'),

@@ -18,3 +21,2 @@ kMaxHeadersSize: Symbol('maxHeaderSize'),

kResume: Symbol('resume'),
kEnd: Symbol('end'),
kError: Symbol('error'),

@@ -21,0 +23,0 @@ kOnDestroyed: Symbol('destroy callbacks'),

@@ -18,3 +18,5 @@ 'use strict'

const state = body._readableState
return state && state.ended ? state.length : undefined
return state && state.ended === true && Number.isFinite(state.length)
? state.length
: undefined
}

@@ -32,12 +34,6 @@

function destroy (stream, err) {
if (!stream) {
return stream
if (!isStream(stream) || isDestroyed(stream)) {
return
}
assert(typeof stream.on === 'function')
if (isDestroyed(stream)) {
return stream
}
if (typeof stream.destroy === 'function') {

@@ -54,4 +50,2 @@ stream.destroy(err)

}
return stream
}

@@ -58,0 +52,0 @@

{
"name": "undici",
"version": "1.2.2",
"version": "1.2.3",
"description": "An HTTP/1.1 client, written from scratch for Node.js",

@@ -9,3 +9,4 @@ "main": "index.js",

"test": "tap test/*.js --no-coverage",
"coverage": "standard | snazzy && tap test/*.js"
"coverage": "standard | snazzy && tap test/*.js",
"bench": "npx concurrently -k -s first \"node benchmarks/server.js\" \"node -e 'setTimeout(() => {}, 1000)' && node benchmarks\""
},

@@ -33,2 +34,3 @@ "repository": {

"benchmark": "^2.1.4",
"concurrently": "^5.2.0",
"https-pem": "^2.0.0",

@@ -35,0 +37,0 @@ "pre-commit": "^1.2.2",

@@ -26,6 +26,6 @@ # undici

```
http - keepalive - pipe x 5,521 ops/sec ±3.37% (73 runs sampled)
undici - pipeline - pipe x 9,292 ops/sec ±4.28% (79 runs sampled)
undici - request - pipe x 11,949 ops/sec ±0.99% (85 runs sampled)
undici - stream - pipe x 12,223 ops/sec ±0.76% (85 runs sampled)
http - keepalive x 5,521 ops/sec ±3.37% (73 runs sampled)
undici - pipeline x 9,292 ops/sec ±4.28% (79 runs sampled)
undici - request x 11,949 ops/sec ±0.99% (85 runs sampled)
undici - stream x 12,223 ops/sec ±0.76% (85 runs sampled)
```

@@ -32,0 +32,0 @@

@@ -203,1 +203,27 @@ 'use strict'

writeBodyStartedWithBody(new Uint8Array([42]), 'Uint8Array')
test('cleanup listener', (t) => {
t.plan(4)
const abortController = new AbortController()
abortController.signal.addEventListener = (name) => t.strictEqual(name, 'abort')
abortController.signal.removeEventListener = (name) => t.strictEqual(name, 'abort')
const server = createServer((req, res) => {
res.writeHead(200, { 'content-type': 'text/plain' })
res.write('hello')
res.end('world')
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => {
t.error(err)
response.body.on('end', () => {
t.pass()
}).resume()
})
})
})

@@ -214,1 +214,27 @@ 'use strict'

writeBodyStartedWithBody(new Uint8Array([42]), 'Uint8Array')
test('cleanup listener', (t) => {
t.plan(4)
const ee = new EventEmitter()
ee.addListener = (name) => t.strictEqual(name, 'abort')
ee.removeListener = (name) => t.strictEqual(name, 'abort')
const server = createServer((req, res) => {
res.writeHead(200, { 'content-type': 'text/plain' })
res.write('hello')
res.end('world')
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => {
t.error(err)
response.body.on('end', () => {
t.pass()
}).resume()
})
})
})

@@ -9,6 +9,4 @@ 'use strict'

const {
kSocket,
kEnqueue
} = require('../lib/symbols')
const { kSocket } = require('../lib/symbols')
const { InvalidArgumentError } = require('../lib/errors')

@@ -769,3 +767,3 @@ test('GET errors and reconnect with pipelining 1', (t) => {

test('invalid opts', (t) => {
t.plan(6)
t.plan(2)

@@ -779,16 +777,2 @@ const client = new Client('http://localhost:5000')

})
client[kEnqueue](null, (err) => {
t.ok(err instanceof errors.InvalidArgumentError)
})
client[kEnqueue]({ path: '/', method: 'GET', signal: 1 }, (err) => {
t.ok(err instanceof errors.InvalidArgumentError)
})
client[kEnqueue]({ path: '/', method: 'GET', signal: {} }, (err) => {
t.ok(err instanceof errors.InvalidArgumentError)
})
try {
client[kEnqueue]({ path: '/', method: 'GET', signal: {} }, null)
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
}
})

@@ -862,1 +846,19 @@

})
test('invalid signal', (t) => {
t.plan(3)
const client = new Client('http://localhost:3333')
t.teardown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET', signal: {} }, (err) => {
t.ok(err instanceof InvalidArgumentError)
})
client.pipeline({ path: '/', method: 'GET', signal: {} }, () => {})
.on('error', (err) => {
t.ok(err instanceof InvalidArgumentError)
})
client.stream({ path: '/', method: 'GET', signal: {} }, () => {}, (err) => {
t.ok(err instanceof InvalidArgumentError)
})
})

@@ -817,1 +817,26 @@ 'use strict'

})
test('pipeline invalid opts', (t) => {
t.plan(2)
const server = createServer((req, res) => {
res.end(JSON.stringify({ asd: 1 }))
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
client.close((err) => {
t.error(err)
})
client
.pipeline({ path: '/', method: 'GET', objectMode: true }, ({ body }) => {
t.fail()
})
.on('error', (err) => {
t.ok(err)
})
})
})

@@ -66,59 +66,2 @@ 'use strict'

test('20 times HEAD with pipelining 10', (t) => {
const num = 20
t.plan(3 * num + 1)
let count = 0
let countGreaterThanOne = false
const server = createServer((req, res) => {
count++
setTimeout(function () {
countGreaterThanOne = countGreaterThanOne || count > 1
res.end(req.url)
}, 10)
})
t.tearDown(server.close.bind(server))
// needed to check for a warning on the maxListeners on the socket
process.on('warning', t.fail)
t.tearDown(() => {
process.removeListener('warning', t.fail)
})
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 10
})
t.tearDown(client.close.bind(client))
for (let i = 0; i < num; i++) {
makeRequest(i)
}
function makeRequest (i) {
makeHeadRequestAndExpectUrl(client, i, t, () => {
count--
if (i === num - 1) {
t.ok(countGreaterThanOne, 'seen more than one parallel request')
}
})
return !client.busy
}
})
})
function makeHeadRequestAndExpectUrl (client, i, t, cb) {
return client.request({ path: '/' + i, method: 'HEAD' }, (err, { statusCode, headers, body }) => {
cb()
t.error(err)
t.strictEqual(statusCode, 200)
body
.resume()
.on('end', () => {
t.pass()
})
})
}
test('A client should enqueue as much as twice its pipelining factor', (t) => {

@@ -125,0 +68,0 @@ const num = 10

@@ -7,2 +7,3 @@ 'use strict'

const { PassThrough } = require('stream')
const EE = require('events')

@@ -190,3 +191,3 @@ test('stream get', (t) => {

test('stream response resume back pressure and non standard error', (t) => {
t.plan(5)
t.plan(6)

@@ -206,2 +207,3 @@ const server = createServer((req, res) => {

const pt = new PassThrough()
client.stream({

@@ -212,7 +214,6 @@ path: '/',

}, () => {
const pt = new PassThrough()
pt.on('data', () => {
pt.emit('error', new Error('kaboom'))
}).once('error', (err) => {
t.ok(err)
t.strictEqual(err.message, 'kaboom')
})

@@ -222,2 +223,3 @@ return pt

t.ok(err)
t.strictEqual(pt.destroyed, true)
})

@@ -255,3 +257,3 @@

const pt = new PassThrough()
const pt = new PassThrough({ autoDestroy: false })
client.stream({

@@ -305,4 +307,4 @@ path: '/',

test('stream waits only for writable side', (t) => {
t.plan(1)
test('stream destroy if not readable', (t) => {
t.plan(2)

@@ -314,2 +316,4 @@ const server = createServer((req, res) => {

const pt = new PassThrough()
pt.readable = false
server.listen(0, () => {

@@ -323,5 +327,6 @@ const client = new Client(`http://localhost:${server.address().port}`)

}, () => {
throw new Error('kaboom')
return pt
}, (err) => {
t.strictEqual(err.message, 'kaboom')
t.error(err)
t.strictEqual(pt.destroyed, true)
})

@@ -402,1 +407,94 @@ })

})
test('stream factory abort', (t) => {
t.plan(1)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
const signal = new EE()
client.stream({
path: '/',
method: 'GET',
signal
}, () => {
signal.emit('abort')
return new PassThrough()
}, (err) => {
t.ok(err instanceof errors.RequestAbortedError)
})
})
})
test('stream factory throw', (t) => {
t.plan(3)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
client.stream({
path: '/',
method: 'GET'
}, () => {
throw new Error('asd')
}, (err) => {
t.strictEqual(err.message, 'asd')
})
client.stream({
path: '/',
method: 'GET'
}, () => {
throw new Error('asd')
}, (err) => {
t.strictEqual(err.message, 'asd')
})
client.stream({
path: '/',
method: 'GET'
}, () => {
return new PassThrough()
}, (err) => {
t.error(err)
})
client.on('disconnect', () => {
t.fail()
})
})
})
test('stream CONNECT throw', (t) => {
t.plan(1)
const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
client.stream({
path: '/',
method: 'CONNECT'
}, () => {
}, (err) => {
t.ok(err instanceof errors.NotSupportedError)
})
client.on('disconnect', () => {
t.fail()
})
})
})

@@ -39,4 +39,5 @@ 'use strict'

headers: reqHeaders
}, (err, { statusCode, headers, body }) => {
}, (err, data) => {
t.error(err)
const { statusCode, headers, body } = data
t.strictEqual(statusCode, 200)

@@ -43,0 +44,0 @@ t.strictEqual(headers['content-type'], 'text/plain')

@@ -223,6 +223,6 @@ 'use strict'

})
client.request({}, (err) => {
client.request({ path: '/', method: 'GET' }, (err) => {
t.ok(err instanceof errors.ClientClosedError)
client.destroy()
client.request({}, (err) => {
client.request({ path: '/', method: 'GET' }, (err) => {
t.ok(err instanceof errors.ClientDestroyedError)

@@ -229,0 +229,0 @@ })

@@ -7,2 +7,3 @@ 'use strict'

const { Readable } = require('stream')
const { kConnect } = require('../lib/symbols')

@@ -96,1 +97,49 @@ test('GET and HEAD with body should reset connection', (t) => {

})
test('HEAD should reset connection', (t) => {
t.plan(9)
const server = createServer((req, res) => {
res.end('asd')
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
client.on('disconnect', () => {
t.pass()
})
client.request({
path: '/',
method: 'HEAD'
}, (err, data) => {
t.error(err)
data.body.resume()
})
t.strictEqual(client.busy, true)
client.request({
path: '/',
method: 'HEAD'
}, (err, data) => {
t.error(err)
data.body.resume()
client.on('disconnect', () => {
client[kConnect](() => {
client.request({
path: '/',
method: 'HEAD'
}, (err, data) => {
t.error(err)
data.body.resume()
})
t.strictEqual(client.busy, true)
})
})
})
t.strictEqual(client.busy, true)
})
})

@@ -43,3 +43,6 @@ 'use strict'

const server = net.createServer((socket) => {
socket.write('HTTP/1.1 101 Switching Protocols\r\n\r\n')
socket.write('HTTP/1.1 101 Switching Protocols\r\n')
socket.write('Upgrade: TLS/1.0, HTTP/1.1\r\n')
socket.write('Connection: Upgrade\r\n')
socket.write('\r\n')
})

@@ -53,3 +56,7 @@ t.teardown(server.close.bind(server))

path: '/',
method: 'GET'
method: 'GET',
headers: {
Connection: 'upgrade',
Upgrade: 'example/1, foo/2'
}
}, (err) => {

@@ -56,0 +63,0 @@ t.ok(err instanceof errors.NotSupportedError)

@@ -6,5 +6,6 @@ 'use strict'

const { createServer } = require('http')
const { kConnect } = require('../lib/symbols')
test('pipeline pipelining', (t) => {
t.plan(7)
t.plan(6)

@@ -27,23 +28,74 @@ const server = createServer((req, res) => {

client.pipeline({
method: 'GET',
path: '/'
}, ({ body }) => body)
.end()
.resume()
.on('end', () => {
t.strictEqual(client.running, 0)
client.pipeline({
method: 'GET',
path: '/'
}, ({ body }) => body).end().resume()
t.strictEqual(client.busy, false)
client.pipeline({
method: 'GET',
path: '/'
}, ({ body }) => body).end().resume()
t.strictEqual(client.busy, true)
t.strictEqual(client.running, 2)
client[kConnect](() => {
t.strictEqual(client.running, 0)
client.pipeline({
method: 'GET',
path: '/'
}, ({ body }) => body).end().resume()
t.strictEqual(client.busy, false)
client.pipeline({
method: 'GET',
path: '/'
}, ({ body }) => body).end().resume()
t.strictEqual(client.busy, true)
t.strictEqual(client.running, 2)
})
})
})
test('pipeline pipelining retry', (t) => {
t.plan(6)
let count = 0
const server = createServer((req, res) => {
if (count++ === 0) {
res.destroy()
} else {
res.end()
}
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 3
})
t.teardown(client.destroy.bind(client))
client.once('disconnect', () => {
t.pass()
client.on('disconnect', () => {
t.fail()
})
})
client[kConnect](() => {
client.pipeline({
method: 'GET',
path: '/'
}, ({ body }) => body)
.on('error', (err) => {
t.ok(err)
})
.end()
.resume()
t.strictDeepEqual(client.running, 1)
client.pipeline({
method: 'GET',
path: '/'
}, ({ body }) => body).end().resume()
t.strictDeepEqual(client.running, 2)
client.pipeline({
method: 'GET',
path: '/'
}, ({ body }) => body).end().resume()
t.strictDeepEqual(client.running, 3)
client.close(() => {
t.pass()
})
})
})
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc