Socket
Socket
Sign inDemoInstall

undici

Package Overview
Dependencies
Maintainers
2
Versions
212
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

undici - npm Package Compare versions

Comparing version 1.0.3 to 1.1.0

test/headers-timeout.js

26

benchmarks/index.js
'use strict'
const { PassThrough } = require('stream')
const { Writable } = require('stream')
const http = require('http')

@@ -47,3 +47,7 @@ const Benchmark = require('benchmark')

http.get(httpOptions, response => {
const stream = new PassThrough()
const stream = new Writable({
write (chunk, encoding, callback) {
callback()
}
})
stream.once('finish', () => {

@@ -65,3 +69,7 @@ deferred.resolve()

const stream = new PassThrough()
const stream = new Writable({
write (chunk, encoding, callback) {
callback()
}
})
stream.once('finish', () => {

@@ -86,3 +94,7 @@ deferred.resolve()

.end()
.pipe(new PassThrough())
.pipe(new Writable({
write (chunk, encoding, callback) {
callback()
}
}))
.on('error', (err) => {

@@ -100,3 +112,7 @@ throw err

pool.stream(undiciOptions, () => {
const stream = new PassThrough()
const stream = new Writable({
write (chunk, encoding, callback) {
callback()
}
})
stream.once('finish', () => {

@@ -103,0 +119,0 @@ deferred.resolve()

436

lib/client-base.js

@@ -6,3 +6,4 @@ 'use strict'

const tls = require('tls')
const { HTTPParser } = require('http-parser-js')
// TODO: This is not really allowed by Node but it works for now.
const { HTTPParser } = process.binding('http_parser') // eslint-disable-line
const EventEmitter = require('events')

@@ -17,2 +18,3 @@ const Request = require('./request')

ClientClosedError,
HeadersTimeoutError,
SocketError,

@@ -39,6 +41,7 @@ NotSupportedError

kMaxAbortedPayload,
kParser,
kSocket,
kSocketPath,
kEnqueue,
kClient
kMaxHeadersSize,
kHeadersTimeout
} = require('./symbols')

@@ -52,6 +55,12 @@

const nodeMajorVersion = parseInt(process.version.split('.')[0].slice(1))
const insecureHTTPParser = process.execArgv.includes('--insecure-http-parser')
class ClientBase extends EventEmitter {
constructor (url, {
maxAbortedPayload,
maxHeaderSize,
headersTimeout,
socketTimeout,
socketPath,
requestTimeout,

@@ -75,2 +84,6 @@ pipelining,

if (socketPath != null && typeof socketPath !== 'string') {
throw new InvalidArgumentError('invalid socketPath')
}
if (url.hostname != null && typeof url.hostname !== 'string') {

@@ -92,2 +105,6 @@ throw new InvalidArgumentError('invalid hostname')

if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) {
throw new InvalidArgumentError('invalid maxHeaderSize')
}
if (socketTimeout != null && !Number.isFinite(socketTimeout)) {

@@ -101,5 +118,12 @@ throw new InvalidArgumentError('invalid socketTimeout')

if (headersTimeout != null && !Number.isFinite(headersTimeout)) {
throw new InvalidArgumentError('invalid headersTimeout')
}
this[kSocket] = null
this[kPipelining] = pipelining || 1
this[kMaxHeadersSize] = maxHeaderSize || 16384
this[kHeadersTimeout] = headersTimeout == null ? 30e3 : headersTimeout
this[kUrl] = url
this[kSocketPath] = socketPath
this[kSocketTimeout] = socketTimeout == null ? 30e3 : socketTimeout

@@ -115,3 +139,3 @@ this[kRequestTimeout] = requestTimeout == null ? 30e3 : requestTimeout

this[kWriting] = false
this[kMaxAbortedPayload] = maxAbortedPayload || 1e6
this[kMaxAbortedPayload] = maxAbortedPayload || 1048576

@@ -165,4 +189,15 @@ // kQueue is built up of 3 sections separated by

get full () {
return this.size > this[kPipelining]
get busy () {
if (this.size >= this[kPipelining]) {
return true
}
for (let n = this[kRunningIdx]; n < this[kQueue].length; ++n) {
const { idempotent, streaming } = this[kQueue][n]
if (!idempotent || streaming) {
return true
}
}
return false
}

@@ -183,35 +218,30 @@

if (!opts || typeof opts !== 'object') {
process.nextTick(callback, new InvalidArgumentError('invalid opts'), null)
return
}
try {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('invalid opts')
}
if (this[kDestroyed]) {
process.nextTick(callback, new ClientDestroyedError(), null)
return
}
if (this[kDestroyed]) {
throw new ClientDestroyedError()
}
if (this[kClosed]) {
process.nextTick(callback, new ClientClosedError(), null)
return
}
if (this[kClosed]) {
throw new ClientClosedError()
}
if (opts.requestTimeout == null && this[kRequestTimeout]) {
// TODO: Avoid copy.
opts = { ...opts, requestTimeout: this[kRequestTimeout] }
}
if (opts.requestTimeout == null && this[kRequestTimeout]) {
// TODO: Avoid copy.
opts = { ...opts, requestTimeout: this[kRequestTimeout] }
}
let request
try {
request = new Request(opts, this[kUrl].hostname, callback)
const request = new Request(opts, this[kUrl].hostname, callback)
this[kQueue].push(request)
resume(this)
return request
} catch (err) {
process.nextTick(callback, err, null)
return
}
this[kQueue].push(request)
resume(this)
return request
}

@@ -287,2 +317,5 @@

} else {
// There is a delay between socket.destroy() and socket emitting 'close'.
// This means that some progress progress is still possible in the time
// between.
this[kSocket]

@@ -301,3 +334,15 @@ .on('close', onDestroyed)

constructor (client, socket) {
super(HTTPParser.RESPONSE)
/* istanbul ignore next */
if (nodeMajorVersion >= 12) {
super()
this.initialize(
HTTPParser.RESPONSE,
{},
client[kMaxHeadersSize],
insecureHTTPParser,
client[kHeadersTimeout]
)
} else {
super(HTTPParser.RESPONSE, false)
}

@@ -307,65 +352,72 @@ this.client = client

this.resumeSocket = () => socket.resume()
this.statusCode = null
this.headers = null
this.read = 0
this.body = null
}
/* istanbul ignore next: we don't support trailers yet */
[HTTPParser.kOnHeaders] () {
// TODO: Handle trailers.
[HTTPParser.kOnTimeout] () {
const { socket } = this
socket.destroy(new HeadersTimeoutError())
}
[HTTPParser.kOnHeadersComplete] ({ statusCode, headers }) {
const { client, resumeSocket } = this
const request = client[kQueue][client[kRunningIdx]]
const { signal, opaque } = request
const skipBody = request.method === 'HEAD'
[HTTPParser.kOnHeaders] (rawHeaders) {
this.headers = parseHeaders(rawHeaders, this.headers)
}
assert(!this.read)
assert(!this.body)
[HTTPParser.kOnExecute] (ret) {
const { socket } = this
if (statusCode === 101) {
request.invoke(new NotSupportedError('101 response not supported'))
return true
if (ret instanceof Error) {
const err = ret
if (typeof err.reason === 'string') {
err.message = `Parse Error: ${err.reason}`
}
socket.destroy(err)
} else {
// When the underlying `net.Socket` instance is consumed - no
// `data` events are emitted, and thus `socket.setTimeout` fires the
// callback even if the data is constantly flowing into the socket.
// See, https://github.com/nodejs/node/commit/ec2822adaad76b126b5cccdeaa1addf2376c9aa6
socket._unrefTimer()
}
}
if (statusCode < 200) {
// TODO: Informational response.
return true
}
[HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive) {
const { client, socket, resumeSocket, headers } = this
const request = client[kQueue][client[kRunningIdx]]
let body = request.invoke(null, {
statusCode,
headers: parseHeaders(headers),
opaque,
resume: resumeSocket
})
// TODO: What if !shouldKeepAlive?
// TODO: What if upgrade?
// TODO: What if request.method === 'CONNECT'?
if (body && skipBody) {
body(null, null)
body = null
}
assert(this.statusCode < 200)
if (body) {
this.body = body
this.headers = null
this.statusCode = statusCode
if (signal) {
signal.once('error', body)
}
} else {
this.next()
if (statusCode === 101) {
// TODO: Switching Protocols.
socket.destroy(new NotSupportedError('101 response not supported'))
return true
}
return skipBody
request.headers(statusCode, parseHeaders(rawHeaders, headers), resumeSocket)
return request.method === 'HEAD' || statusCode < 200
}
[HTTPParser.kOnBody] (chunk, offset, length) {
this.read += length
const { client, socket, statusCode } = this
const request = client[kQueue][client[kRunningIdx]]
const { client, socket, body, read } = this
assert(statusCode >= 200)
const ret = body
? body(null, chunk.slice(offset, offset + length))
: null
this.read += length
if (ret == null && read > client[kMaxAbortedPayload]) {
const ret = request.push(chunk, offset, length)
if (ret == null && this.read > client[kMaxAbortedPayload]) {
// TODO: Provide a descriptive error?
socket.destroy()

@@ -378,69 +430,49 @@ } else if (ret === false) {

[HTTPParser.kOnMessageComplete] () {
const { body } = this
const { client, socket, statusCode, headers } = this
const request = client[kQueue][client[kRunningIdx]]
this.read = 0
this.body = null
this.statusCode = null
this.headers = null
if (body) {
body(null, null)
this.next()
request.complete(headers)
if (statusCode >= 200) {
this.read = 0
client[kQueue][client[kRunningIdx]++] = null
resume(client)
}
}
next () {
const { client, resumeSocket } = this
resumeSocket()
client[kQueue][client[kRunningIdx]++] = null
resume(client)
socket.resume()
}
destroy (err) {
const { client, body } = this
const { client } = this
assert(err)
this.unconsume()
// Make sure the parser's stack has unwound before deleting the
// corresponding C++ object through .close().
setImmediate(() => this.close())
if (client[kRunningIdx] >= client[kPendingIdx]) {
assert(!body)
return
}
this.read = 0
this.body = null
// Retry all idempotent requests except for the one
// at the head of the pipeline.
client[kQueue][client[kRunningIdx]++].error(err)
const retryRequests = []
const errorRequests = []
errorRequests.push(client[kQueue][client[kRunningIdx]++])
for (const request of client[kQueue].slice(client[kRunningIdx], client[kPendingIdx])) {
const { idempotent, body } = request
/* istanbul ignore else: can't happen because of guard in resume */
/* istanbul ignore next: can't happen because of guard in resume */
if (idempotent && (!body || typeof body.pipe !== 'function')) {
retryRequests.push(request)
} else {
errorRequests.push(request)
}
const { idempotent, streaming } = request
assert(idempotent && !streaming)
retryRequests.push(request)
}
client[kQueue].splice(0, client[kPendingIdx], ...retryRequests)
client[kPendingIdx] = 0
client[kRunningIdx] = 0
if (body) {
body(err, null)
}
for (const request of errorRequests) {
request.invoke(err, null)
}
resume(client)
}

@@ -455,13 +487,28 @@ }

const servername = client[kServerName] || (client[kTLSOpts] && client[kTLSOpts].servername)
const socket = protocol === 'https:'
? tls.connect(port || /* istanbul ignore next */ 443, hostname, {
...client[kTLSOpts],
servername
})
: net.connect(port || /* istanbul ignore next */ 80, hostname)
let socket
if (protocol === 'https:') {
const tlsOpts = { ...client[kTLSOpts], servername }
socket = client[kSocketPath]
? tls.connect(client[kSocketPath], tlsOpts)
: tls.connect(port || /* istanbul ignore next */ 443, hostname, tlsOpts)
} else {
socket = client[kSocketPath]
? net.connect(client[kSocketPath])
: net.connect(port || /* istanbul ignore next */ 80, hostname)
}
client[kSocket] = socket
socket[kClient] = client
socket[kParser] = new Parser(client, socket)
const parser = new Parser(client, socket)
/* istanbul ignore next */
if (nodeMajorVersion >= 12) {
assert(socket._handle)
parser.consume(socket._handle)
} else {
assert(socket._handle && socket._handle._externalStream)
parser.consume(socket._handle._externalStream)
}
socket[kClosed] = false

@@ -475,4 +522,2 @@ socket[kError] = null

.on(protocol === 'https:' ? 'secureConnect' : 'connect', function () {
const client = this[kClient]
client[kRetryDelay] = 0

@@ -482,9 +527,4 @@ client.emit('connect')

})
.on('data', function (chunk) {
const parser = this[kParser]
const err = parser.execute(chunk)
if (err instanceof Error && !this.destroyed) {
this.destroy(err)
}
.on('data', function () {
assert(false)
})

@@ -495,3 +535,3 @@ .on('error', function (err) {

while (client.pending && client[kQueue][client[kPendingIdx]].servername === servername) {
client[kQueue][client[kPendingIdx]++].invoke(err, null)
client[kQueue][client[kPendingIdx]++].error(err)
}

@@ -506,6 +546,7 @@ } else if (

) {
assert(client[kPendingIdx] === client[kRunningIdx])
// Error is not caused by running request and not a recoverable
// socket error.
for (const request of client[kQueue].splice(client[kPendingIdx])) {
request.invoke(err, null)
for (const request of client[kQueue].splice(client[kRunningIdx])) {
request.error(err)
}

@@ -520,15 +561,10 @@ }

.on('close', function () {
const client = this[kClient]
const parser = this[kParser]
this[kClosed] = true
if (!socket[kError]) {
socket[kError] = new SocketError('closed')
if (!this[kError]) {
this[kError] = new SocketError('closed')
}
const err = socket[kError]
parser.destroy(this[kError])
parser.destroy(err)
if (client.destroyed) {

@@ -539,10 +575,2 @@ resume(client)

// reset events
client[kSocket]
.removeAllListeners('data')
.removeAllListeners('end')
.removeAllListeners('finish')
.removeAllListeners('error')
client[kSocket]
.on('error', nop)
client[kSocket] = null

@@ -563,3 +591,5 @@

client.emit('disconnect', err)
client.emit('disconnect', this[kError])
resume(client)
})

@@ -571,5 +601,4 @@ }

if (client[kDestroyed]) {
const requests = client[kQueue].splice(client[kPendingIdx])
for (const request of requests) {
request.invoke(new ClientDestroyedError(), null)
for (const request of client[kQueue].splice(client[kPendingIdx])) {
request.error(new ClientDestroyedError())
}

@@ -607,3 +636,3 @@ return

if (!request.callback) {
if (request.finished) {
// Request was aborted.

@@ -623,3 +652,5 @@ // TODO: Avoid splice one by one.

if (client[kSocket]) {
// TODO: Provide a descriptive error?
client[kSocket].destroy()
return
}

@@ -663,18 +694,15 @@ }

write(client, request)
// Release memory for no longer required properties.
request.headers = null
request.body = null
}
}
function write (client, {
header,
body,
streaming,
chunked,
signal
}) {
const socket = client[kSocket]
function write (client, request) {
const {
header,
body,
streaming,
chunked
} = request
let socket = client[kSocket]
socket.cork()

@@ -685,3 +713,2 @@ socket.write(header)

socket.write(CRLF)
socket.uncork()
} else if (!streaming) {

@@ -691,15 +718,14 @@ socket.write(CRLF)

socket.write(CRLF)
socket.uncork()
} else {
if (chunked) {
socket.write(TE_CHUNKED)
} else {
socket.write(CRLF)
}
socket.write(chunked ? TE_CHUNKED : CRLF)
const onData = (chunk) => {
if (chunked) {
if (socket && chunked) {
socket.write(`\r\n${Buffer.byteLength(chunk).toString(16)}\r\n`, 'ascii')
}
if (!socket.write(chunk)) {
// TODO: If body.pause doesn't exists or doesn't stop 'data' events, it might cause
// excessive memory usage.
if (socket && !socket.write(chunk) && body.pause) {
body.pause()

@@ -709,3 +735,5 @@ }

const onDrain = () => {
body.resume()
if (body.resume) {
body.resume()
}
}

@@ -716,8 +744,8 @@ const onAbort = () => {

const onFinished = (err) => {
if (!socket) {
return
}
err = err || socket[kError]
if (signal) {
signal.removeListener('error', onFinished)
}
socket

@@ -732,21 +760,15 @@ .removeListener('drain', onDrain)

.removeListener('close', onAbort)
.on('error', nop)
if (err) {
if (typeof body.destroy === 'function' && !body.destroyed) {
body.destroy(err)
}
if (typeof body.destroy === 'function' && !body.destroyed) {
body.destroy(err)
}
if (!socket.destroyed) {
assert(client.running)
socket.destroy(err)
}
} else {
if (chunked) {
socket.write(TE_CHUNKED_EOF)
} else {
socket.write(CRLF)
}
if (!err) {
socket.write(chunked ? TE_CHUNKED_EOF : CRLF)
} else if (!socket.destroyed) {
socket.destroy(err)
}
socket = null
client[kWriting] = false

@@ -756,6 +778,2 @@ resume(client)

if (signal) {
signal.on('error', onFinished)
}
body

@@ -771,10 +789,14 @@ .on('data', onData)

.on('close', onFinished)
.uncork()
client[kWriting] = true
}
socket.uncork()
}
function parseHeaders (headers) {
const obj = {}
function parseHeaders (headers, obj) {
obj = obj || {}
if (!headers) {
return obj
}
for (var i = 0; i < headers.length; i += 2) {

@@ -781,0 +803,0 @@ var key = headers[i].toLowerCase()

@@ -118,4 +118,7 @@ const {

} else {
assert(this._readableState.endEmitted)
assert(!this[kResume])
if (!this._readableState.endEmitted) {
// This can happen if the server doesn't care
// about the entire request body.
// TODO: Is this fine to ignore?
}
}

@@ -215,3 +218,3 @@

// TODO: Should we allow !body?
if (!body || typeof body.pipe !== 'function') {
if (!body || typeof body.on !== 'function') {
if (!ret.destroyed) {

@@ -322,2 +325,4 @@ ret.destroy(new InvalidReturnValueError('expected Readable'))

typeof body.write !== 'function' ||
typeof body.end !== 'function' ||
typeof body.on !== 'function' ||
typeof body.destroy !== 'function' ||

@@ -324,0 +329,0 @@ typeof body.destroyed !== 'boolean'

@@ -11,2 +11,12 @@ 'use strict'

class HeadersTimeoutError extends UndiciError {
constructor (message) {
super(message)
Error.captureStackTrace(this, HeadersTimeoutError)
this.name = 'HeadersTimeoutError'
this.message = message || 'Headers Timeout Error'
this.code = 'UND_ERR_HEADERS_TIMEOUT'
}
}
class SocketTimeoutError extends UndiciError {

@@ -105,2 +115,3 @@ constructor (message) {

SocketTimeoutError,
HeadersTimeoutError,
RequestTimeoutError,

@@ -107,0 +118,0 @@ InvalidArgumentError,

@@ -92,3 +92,3 @@ 'use strict'

for (const client of pool[kClients]) {
if (client.full) {
if (client.busy) {
continue

@@ -95,0 +95,0 @@ }

@@ -6,16 +6,9 @@ 'use strict'

InvalidArgumentError,
NotSupportedError,
RequestAbortedError,
RequestTimeoutError
} = require('./errors')
const EE = require('events')
const assert = require('assert')
const net = require('net')
function isValidBody (body) {
return body == null ||
body instanceof Uint8Array ||
typeof body === 'string' ||
typeof body.pipe === 'function'
}
class Request extends AsyncResource {

@@ -54,3 +47,12 @@ constructor ({

if (!isValidBody(body)) {
if (method === 'CONNECT') {
throw new NotSupportedError('CONNECT method not supported')
}
if (
body != null &&
!(body instanceof Uint8Array) &&
typeof body !== 'string' &&
typeof body.on !== 'function'
) {
throw new InvalidArgumentError('body must be a string, a Buffer or a Readable stream')

@@ -61,7 +63,5 @@ }

this.signal = null
this.method = method
this.streaming = body && typeof body.pipe === 'function'
this.streaming = body && typeof body.on === 'function'

@@ -83,2 +83,4 @@ this.body = typeof body === 'string'

this.finished = false
this.opaque = opaque

@@ -92,3 +94,3 @@

this.body.on('error', (err) => {
this.invoke(err, null)
this.error(err)
})

@@ -125,9 +127,4 @@ }

if (signal) {
/* istanbul ignore else: can't happen but kept in case of refactoring */
if (!this.signal) {
this.signal = new EE()
}
const onAbort = () => {
this.signal.emit('error', new RequestAbortedError())
this.error(new RequestAbortedError())
}

@@ -143,49 +140,74 @@

if (requestTimeout) {
if (!this.signal) {
this.signal = new EE()
}
this.timeout = setTimeout((self) => {
self.error(new RequestTimeoutError())
}, requestTimeout, this)
}
}
const onTimeout = () => {
this.signal.emit('error', new RequestTimeoutError())
}
wrap (that, cb) {
return this.runInAsyncScope.bind(this, cb, that)
}
this.timeout = setTimeout(onTimeout, requestTimeout)
headers (statusCode, headers, resume) {
if (statusCode < 200) {
// TODO: Informational response.
return
}
if (this.signal) {
this.signal.on('error', (err) => {
assert(err)
this.invoke(err, null)
})
if (this.finished) {
return
}
this.finished = true
clearTimeout(this.timeout)
this.timeout = null
this.res = this.runInAsyncScope(this.callback, this, null, {
statusCode,
headers,
opaque: this.opaque,
resume
})
assert(!this.res || typeof this.res === 'function')
}
wrap (that, cb) {
return this.runInAsyncScope.bind(this, cb, that)
push (chunk, offset, length) {
if (this.res) {
return this.res(null, chunk.slice(offset, offset + length))
}
}
invoke (err, val) {
const { callback } = this
complete (trailers) {
// TODO: Trailers?
if (!callback) {
return
if (this.res) {
this.res(null, null)
this.res = null
}
}
if (
this.body &&
typeof this.body.destroy === 'function' &&
!this.body.destroyed
) {
this.body.destroy(err)
error (err) {
if (this.body) {
// TODO: If this.body.destroy doesn't exists or doesn't emit 'error' or
// 'close', it can halt execution in client.
if (typeof this.body.destroy === 'function' && !this.body.destroyed) {
this.body.destroy(err)
}
this.body = null
}
if (this.res) {
this.res(err, null)
this.res = null
}
if (this.finished) {
return
}
this.finished = true
clearTimeout(this.timeout)
this.timeout = null
this.body = null
this.servername = null
this.callback = null
this.opaque = null
this.headers = null
return this.runInAsyncScope(callback, this, err, val)
this.runInAsyncScope(this.callback, this, err, null)
}

@@ -192,0 +214,0 @@ }

@@ -11,2 +11,4 @@ module.exports = {

kDestroyed: Symbol('destroyed'),
kMaxHeadersSize: Symbol('maxHeaderSize'),
kHeadersTimeout: Symbol('maxHeaderSize'),
kRunningIdx: Symbol('running index'),

@@ -19,9 +21,8 @@ kPendingIdx: Symbol('pending index'),

kRetryDelay: Symbol('retry delay'),
kSocketPath: Symbol('socket path'),
kSocket: Symbol('socket'),
kParser: Symbol('parser'),
kClients: Symbol('clients'),
kRetryTimeout: Symbol('retry timeout'),
kClient: Symbol('client'),
kEnqueue: Symbol('enqueue'),
kMaxAbortedPayload: Symbol('max aborted payload')
}
{
"name": "undici",
"version": "1.0.3",
"version": "1.1.0",
"description": "An HTTP/1.1 client, written from scratch for Node.js",

@@ -39,5 +39,2 @@ "main": "index.js",

},
"dependencies": {
"http-parser-js": "^0.5.2"
},
"pre-commit": [

@@ -44,0 +41,0 @@ "coverage"

@@ -5,3 +5,3 @@ # undici

An HTTP/1.1 client, written from scratch for Node.js.
A HTTP/1.1 client, written from scratch for Node.js.

@@ -23,10 +23,10 @@ > Undici means eleven in Italian. 1.1 -> 11 -> Eleven -> Undici.

Machine: 2.7 GHz Quad-Core Intel Core i7<br/>
Configuration: Node v14.2, HTTP/1.1 without TLS, 100 connections
Machine: 2.8GHz AMD EPYC 7402P<br/>
Configuration: Node v14.4, HTTP/1.1 without TLS, 100 connections, Linux 5.4.12-1-lts
```
http - keepalive - pipe x 6,545 ops/sec ±12.47% (64 runs sampled)
undici - pipeline - pipe x 9,560 ops/sec ±3.68% (77 runs sampled)
undici - request - pipe x 9,797 ops/sec ±6.80% (77 runs sampled)
undici - stream - pipe x 11,599 ops/sec ±0.89% (78 runs sampled)
http - keepalive - pipe x 5,768 ops/sec ±4.17% (71 runs sampled)
undici - pipeline - pipe x 7,151 ops/sec ±2.59% (80 runs sampled)
undici - request - pipe x 11,618 ops/sec ±4.43% (72 runs sampled)
undici - stream - pipe x 12,592 ops/sec ±1.03% (81 runs sampled)
```

@@ -46,2 +46,3 @@

It should only include the protocol, hostname, and the port.
Options:

@@ -53,2 +54,5 @@

- `socketPath`, an IPC endpoint, either Unix domain socket or Windows named pipe.
Default: `null`,
- `requestTimeout`, the timeout after which a request will time out, in

@@ -62,3 +66,3 @@ milliseconds. Monitors time between request being enqueued and receiving

will error other inflight requests in the pipeline.
Default: `1e6` bytes (1MiB).
Default: `1048576` bytes (1MiB).

@@ -72,7 +76,15 @@ - `pipelining`, the amount of concurrent requests to be sent over the

[`tls.connect`](https://nodejs.org/api/tls.html#tls_tls_connect_options_callback).
Default: `null`,
- `maxHeaderSize`, the maximum length of request headers in bytes.
Default: `16384` (16KiB).
- `headersTimeout`, the amount of time the parser will wait to receive the complete
HTTP headers (Node 14 and above only).
Default: `30e3` milliseconds (30s).
<a name='request'></a>
#### `client.request(opts, callback(err, data))`
Performs an HTTP request.
Performs a HTTP request.

@@ -169,3 +181,3 @@ Options:

const client = new Client'http://localhost:3000')
const client = new Client('http://localhost:3000')
const abortController = new AbortController()

@@ -357,4 +369,5 @@

Destroy the client abruptly with the given `err`. All the pending and running
requests will be aborted and error. Waits until socket is closed before
invoking the callback.
requests will be asynchronously aborted and error. Waits until socket is closed
before invoking the callback. Since this operation is asynchronously dispatched
there might still be some progress on dispatched requests.

@@ -385,7 +398,6 @@ Returns a promise if no callback is provided.

#### `client.full`
#### `client.busy`
True if `client.size` is greater than the `client.pipelining` factor.
Keeping a client full ensures that once a inflight requests finishes
the the pipeline will schedule new one and keep the pipeline saturated.
True if pipeline is saturated or blocked. Indicicates whether dispatching
further requests is meaningful.

@@ -482,3 +494,3 @@ #### `client.closed`

#### Upgrade
#### Switching Protocols

@@ -504,2 +516,9 @@ Undici does not support the the `Upgrade` request header field. A

### CONNECT
Undici doea not support the http `CONNECT` method. Dispatching a `CONNECT`
request will cause an `UND_ERR_NOT_SUPPORTED` error.
Refs: https://tools.ietf.org/html/rfc7231#section-4.3.6
### Pipelining

@@ -506,0 +525,0 @@

@@ -152,10 +152,3 @@ 'use strict'

t.error(err)
const buf1 = buf.slice(0, buf.length / 2)
const buf2 = buf.slice(buf.length / 2)
// we split the file so that it's received in 2 chunks
// and it should restore the state on the second
res.write(buf1)
setTimeout(() => {
res.end(buf2)
}, 10)
res.write('asd')
})

@@ -162,0 +155,0 @@ })

@@ -10,3 +10,2 @@ 'use strict'

const {
kParser,
kSocket,

@@ -246,3 +245,3 @@ kEnqueue

test('invalid options throws', (t) => {
t.plan(24)
t.plan(28)

@@ -288,2 +287,11 @@ try {

new Client(new URL('http://localhost:200'), { // eslint-disable-line
headersTimeout: 'asd'
}) // eslint-disable-line
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid headersTimeout')
}
try {
new Client(new URL('http://localhost:200'), { // eslint-disable-line
socketTimeout: 'asd'

@@ -324,2 +332,11 @@ })

try {
new Client(new URL('http://localhost:200'), { // eslint-disable-line
maxHeaderSize: 'asd'
})
} catch (err) {
t.ok(err instanceof errors.InvalidArgumentError)
t.strictEqual(err.message, 'invalid maxHeaderSize')
}
try {
new Client(1) // eslint-disable-line

@@ -470,43 +487,2 @@ } catch (err) {

test('reset parser', (t) => {
t.plan(6)
const server = createServer()
let res2
server.on('request', (req, res) => {
res2 = res
res.write('asd')
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET' }, (err, { body }) => {
t.error(err)
res2.destroy()
body.resume()
body.on('error', err => {
t.ok(err)
})
})
client.once('disconnect', () => {
client.request({ path: '/', method: 'GET' }, (err, { body }) => {
t.error(err)
res2.destroy()
body.resume()
body.on('error', err => {
t.ok(err)
})
})
client.on('connect', () => {
t.ok(!client[kSocket][kParser].chunk)
t.ok(!client[kSocket][kParser].offset)
})
})
})
})
test('validate request body', (t) => {

@@ -513,0 +489,0 @@ t.plan(6)

@@ -42,5 +42,52 @@ 'use strict'

return pipeline(body, new PassThrough(), () => {})
}, (err) => {
}),
new Writable({
write (chunk, encoding, callback) {
res += chunk.toString()
callback()
},
final (callback) {
t.strictEqual(res, buf1 + buf2)
callback()
}
}),
(err) => {
t.error(err)
}
)
})
})
test('pipeline ignore request body', (t) => {
t.plan(2)
let done
const server = createServer((req, res) => {
res.write('asd')
res.end()
done()
})
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
let res = ''
const buf1 = Buffer.alloc(1e3).toString()
const buf2 = Buffer.alloc(1e6).toString()
pipeline(
new Readable({
read () {
this.push(buf1)
this.push(buf2)
done = () => this.push(null)
}
}),
client.pipeline({
path: '/',
method: 'PUT'
}, ({ body }) => {
return pipeline(body, new PassThrough(), () => {})
}),
new Writable({

@@ -52,3 +99,3 @@ write (chunk, encoding, callback) {

final (callback) {
t.strictEqual(buf1 + buf2, res)
t.strictEqual(res, 'asd')
callback()

@@ -138,4 +185,2 @@ }

return pipeline(body, pt, () => {})
}, (err) => {
t.error(err)
}),

@@ -181,4 +226,2 @@ new PassThrough(),

return pipeline(body, pt, () => {})
}, (err) => {
t.error(err)
}),

@@ -436,3 +479,3 @@ new PassThrough(),

if (process.versions.node.split('.')[0] < 13) {
t.ok(err instanceof errors.RequestAbortedError)
t.ok(err)
} else {

@@ -439,0 +482,0 @@ t.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE')

@@ -105,3 +105,3 @@ 'use strict'

})
return !client.full
return !client.busy
}

@@ -128,3 +128,3 @@ })

// x * 6 + 1 t.ok + 5 drain
t.plan(num * 6 + 1 + 5)
t.plan(num * 6 + 1 + 5 + 2)

@@ -150,11 +150,13 @@ let count = 0

for (; sent < 2;) {
t.notOk(client.full, 'client is not full')
t.notOk(client.size > client.pipelining, 'client is not full')
makeRequest()
t.ok(!client.full, 'we can send more requests')
t.ok(client.size <= client.pipelining, 'we can send more requests')
}
t.notOk(client.full, 'client is full')
t.ok(client.busy, 'client is busy')
t.notOk(client.size > client.pipelining, 'client is full')
makeRequest()
t.ok(client.full, 'we must stop now')
t.ok(client.full, 'client is full')
t.ok(client.busy, 'we must stop now')
t.ok(client.busy, 'client is busy')
t.ok(client.size > client.pipelining, 'client is full')

@@ -164,3 +166,3 @@ function makeRequest () {

count--
process.nextTick(() => {
setImmediate(() => {
if (client.size === 0) {

@@ -170,3 +172,3 @@ t.ok(countGreaterThanOne, 'seen more than one parallel request')

for (; sent < start + 2 && sent < num;) {
t.notOk(client.full, 'client is not full')
t.notOk(client.size > client.pipelining, 'client is not full')
t.ok(makeRequest())

@@ -177,3 +179,3 @@ }

})
return !client.full
return client.size <= client.pipelining
}

@@ -184,3 +186,3 @@ })

test('pipeline 1 is 1 active request', (t) => {
t.plan(8)
t.plan(9)

@@ -222,3 +224,4 @@ let res2

})
t.ok(!client.full)
t.ok(client.size <= client.pipelining)
t.ok(client.busy)
t.strictEqual(client.size, 1)

@@ -225,0 +228,0 @@ })

@@ -9,5 +9,6 @@ 'use strict'

const { kSocket } = require('../lib/symbols')
const EE = require('events')
test('basic get', (t) => {
t.plan(7)
t.plan(14)

@@ -39,2 +40,14 @@ const server = createServer((req, res) => {

})
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
t.strictEqual(statusCode, 200)
t.strictEqual(headers['content-type'], 'text/plain')
const bufs = []
body.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8'))
})
})
})

@@ -44,3 +57,3 @@ })

test('basic head', (t) => {
t.plan(7)
t.plan(14)

@@ -70,2 +83,13 @@ const server = createServer((req, res) => {

})
client.request({ path: '/', method: 'HEAD' }, (err, { statusCode, headers, body }) => {
t.error(err)
t.strictEqual(statusCode, 200)
t.strictEqual(headers['content-type'], 'text/plain')
body
.resume()
.on('end', () => {
t.pass()
})
})
})

@@ -295,2 +319,50 @@ })

test('basic POST with custom stream', (t) => {
t.plan(7)
const expected = readFileSync(__filename, 'utf8')
const server = createServer(postServer(t, expected))
t.tearDown(server.close.bind(server))
server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
const body = new EE()
body.pipe = () => {}
client.request({
path: '/',
method: 'POST',
headers: {
'content-length': Buffer.byteLength(expected)
},
requestTimeout: 0,
body
}, (err, data) => {
t.error(err)
t.strictEqual(data.statusCode, 200)
const bufs = []
data.body.on('data', (buf) => {
bufs.push(buf)
})
data.body.on('end', () => {
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8'))
})
})
t.strictDeepEqual(client.busy, true)
body.on('close', () => {
body.emit('end')
})
client.on('connect', () => {
setImmediate(() => {
body.emit('data', expected)
body.emit('close')
})
})
})
})
test('basic POST with transfer encoding: chunked', (t) => {

@@ -499,3 +571,3 @@ t.plan(6)

test('only one streaming req at a time', (t) => {
t.plan(4)
t.plan(5)

@@ -542,2 +614,3 @@ const server = createServer((req, res) => {

})
t.strictEqual(client.busy, true)
})

@@ -544,0 +617,0 @@ })

@@ -113,3 +113,3 @@ 'use strict'

})
return !client.full
return client.size <= client.pipelining
}

@@ -145,3 +145,3 @@ })

})
return !client.full
return client.size <= client.pipelining
}

@@ -178,3 +178,3 @@ })

})
return !client.full
return client.size <= client.pipelining
}

@@ -181,0 +181,0 @@ })

@@ -40,3 +40,3 @@ 'use strict'

test('error 101', (t) => {
t.plan(1)
t.plan(2)

@@ -57,3 +57,6 @@ const server = net.createServer((socket) => {

})
client.on('disconnect', () => {
t.pass()
})
})
})

@@ -181,7 +181,7 @@ 'use strict'

this.id = total++
this._full = false
this._busy = false
}
get full () {
return this._full
get busy () {
return this._busy
}

@@ -218,3 +218,3 @@

d1.client._full = true
d1.client._busy = true

@@ -233,3 +233,3 @@ pool.request({}, noop) // d4 = c1

d1.client._full = false
d1.client._busy = false

@@ -251,3 +251,3 @@ pool.request({}, noop) // d6 = c0

test('full', (t) => {
test('busy', (t) => {
t.plan(8 * 6)

@@ -254,0 +254,0 @@

@@ -54,3 +54,3 @@ 'use strict'

const client = new Client('https://140.82.118.3')
t.tearDown(client.close.bind(client))
t.tearDown(client.destroy.bind(client))

@@ -57,0 +57,0 @@ let didDisconnect = false

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc