Comparing version 1.0.3 to 1.1.0
'use strict' | ||
const { PassThrough } = require('stream') | ||
const { Writable } = require('stream') | ||
const http = require('http') | ||
@@ -47,3 +47,7 @@ const Benchmark = require('benchmark') | ||
http.get(httpOptions, response => { | ||
const stream = new PassThrough() | ||
const stream = new Writable({ | ||
write (chunk, encoding, callback) { | ||
callback() | ||
} | ||
}) | ||
stream.once('finish', () => { | ||
@@ -65,3 +69,7 @@ deferred.resolve() | ||
const stream = new PassThrough() | ||
const stream = new Writable({ | ||
write (chunk, encoding, callback) { | ||
callback() | ||
} | ||
}) | ||
stream.once('finish', () => { | ||
@@ -86,3 +94,7 @@ deferred.resolve() | ||
.end() | ||
.pipe(new PassThrough()) | ||
.pipe(new Writable({ | ||
write (chunk, encoding, callback) { | ||
callback() | ||
} | ||
})) | ||
.on('error', (err) => { | ||
@@ -100,3 +112,7 @@ throw err | ||
pool.stream(undiciOptions, () => { | ||
const stream = new PassThrough() | ||
const stream = new Writable({ | ||
write (chunk, encoding, callback) { | ||
callback() | ||
} | ||
}) | ||
stream.once('finish', () => { | ||
@@ -103,0 +119,0 @@ deferred.resolve() |
@@ -6,3 +6,4 @@ 'use strict' | ||
const tls = require('tls') | ||
const { HTTPParser } = require('http-parser-js') | ||
// TODO: This is not really allowed by Node but it works for now. | ||
const { HTTPParser } = process.binding('http_parser') // eslint-disable-line | ||
const EventEmitter = require('events') | ||
@@ -17,2 +18,3 @@ const Request = require('./request') | ||
ClientClosedError, | ||
HeadersTimeoutError, | ||
SocketError, | ||
@@ -39,6 +41,7 @@ NotSupportedError | ||
kMaxAbortedPayload, | ||
kParser, | ||
kSocket, | ||
kSocketPath, | ||
kEnqueue, | ||
kClient | ||
kMaxHeadersSize, | ||
kHeadersTimeout | ||
} = require('./symbols') | ||
@@ -52,6 +55,12 @@ | ||
const nodeMajorVersion = parseInt(process.version.split('.')[0].slice(1)) | ||
const insecureHTTPParser = process.execArgv.includes('--insecure-http-parser') | ||
class ClientBase extends EventEmitter { | ||
constructor (url, { | ||
maxAbortedPayload, | ||
maxHeaderSize, | ||
headersTimeout, | ||
socketTimeout, | ||
socketPath, | ||
requestTimeout, | ||
@@ -75,2 +84,6 @@ pipelining, | ||
if (socketPath != null && typeof socketPath !== 'string') { | ||
throw new InvalidArgumentError('invalid socketPath') | ||
} | ||
if (url.hostname != null && typeof url.hostname !== 'string') { | ||
@@ -92,2 +105,6 @@ throw new InvalidArgumentError('invalid hostname') | ||
if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) { | ||
throw new InvalidArgumentError('invalid maxHeaderSize') | ||
} | ||
if (socketTimeout != null && !Number.isFinite(socketTimeout)) { | ||
@@ -101,5 +118,12 @@ throw new InvalidArgumentError('invalid socketTimeout') | ||
if (headersTimeout != null && !Number.isFinite(headersTimeout)) { | ||
throw new InvalidArgumentError('invalid headersTimeout') | ||
} | ||
this[kSocket] = null | ||
this[kPipelining] = pipelining || 1 | ||
this[kMaxHeadersSize] = maxHeaderSize || 16384 | ||
this[kHeadersTimeout] = headersTimeout == null ? 30e3 : headersTimeout | ||
this[kUrl] = url | ||
this[kSocketPath] = socketPath | ||
this[kSocketTimeout] = socketTimeout == null ? 30e3 : socketTimeout | ||
@@ -115,3 +139,3 @@ this[kRequestTimeout] = requestTimeout == null ? 30e3 : requestTimeout | ||
this[kWriting] = false | ||
this[kMaxAbortedPayload] = maxAbortedPayload || 1e6 | ||
this[kMaxAbortedPayload] = maxAbortedPayload || 1048576 | ||
@@ -165,4 +189,15 @@ // kQueue is built up of 3 sections separated by | ||
get full () { | ||
return this.size > this[kPipelining] | ||
get busy () { | ||
if (this.size >= this[kPipelining]) { | ||
return true | ||
} | ||
for (let n = this[kRunningIdx]; n < this[kQueue].length; ++n) { | ||
const { idempotent, streaming } = this[kQueue][n] | ||
if (!idempotent || streaming) { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
@@ -183,35 +218,30 @@ | ||
if (!opts || typeof opts !== 'object') { | ||
process.nextTick(callback, new InvalidArgumentError('invalid opts'), null) | ||
return | ||
} | ||
try { | ||
if (!opts || typeof opts !== 'object') { | ||
throw new InvalidArgumentError('invalid opts') | ||
} | ||
if (this[kDestroyed]) { | ||
process.nextTick(callback, new ClientDestroyedError(), null) | ||
return | ||
} | ||
if (this[kDestroyed]) { | ||
throw new ClientDestroyedError() | ||
} | ||
if (this[kClosed]) { | ||
process.nextTick(callback, new ClientClosedError(), null) | ||
return | ||
} | ||
if (this[kClosed]) { | ||
throw new ClientClosedError() | ||
} | ||
if (opts.requestTimeout == null && this[kRequestTimeout]) { | ||
// TODO: Avoid copy. | ||
opts = { ...opts, requestTimeout: this[kRequestTimeout] } | ||
} | ||
if (opts.requestTimeout == null && this[kRequestTimeout]) { | ||
// TODO: Avoid copy. | ||
opts = { ...opts, requestTimeout: this[kRequestTimeout] } | ||
} | ||
let request | ||
try { | ||
request = new Request(opts, this[kUrl].hostname, callback) | ||
const request = new Request(opts, this[kUrl].hostname, callback) | ||
this[kQueue].push(request) | ||
resume(this) | ||
return request | ||
} catch (err) { | ||
process.nextTick(callback, err, null) | ||
return | ||
} | ||
this[kQueue].push(request) | ||
resume(this) | ||
return request | ||
} | ||
@@ -287,2 +317,5 @@ | ||
} else { | ||
// There is a delay between socket.destroy() and socket emitting 'close'. | ||
// This means that some progress progress is still possible in the time | ||
// between. | ||
this[kSocket] | ||
@@ -301,3 +334,15 @@ .on('close', onDestroyed) | ||
constructor (client, socket) { | ||
super(HTTPParser.RESPONSE) | ||
/* istanbul ignore next */ | ||
if (nodeMajorVersion >= 12) { | ||
super() | ||
this.initialize( | ||
HTTPParser.RESPONSE, | ||
{}, | ||
client[kMaxHeadersSize], | ||
insecureHTTPParser, | ||
client[kHeadersTimeout] | ||
) | ||
} else { | ||
super(HTTPParser.RESPONSE, false) | ||
} | ||
@@ -307,65 +352,72 @@ this.client = client | ||
this.resumeSocket = () => socket.resume() | ||
this.statusCode = null | ||
this.headers = null | ||
this.read = 0 | ||
this.body = null | ||
} | ||
/* istanbul ignore next: we don't support trailers yet */ | ||
[HTTPParser.kOnHeaders] () { | ||
// TODO: Handle trailers. | ||
[HTTPParser.kOnTimeout] () { | ||
const { socket } = this | ||
socket.destroy(new HeadersTimeoutError()) | ||
} | ||
[HTTPParser.kOnHeadersComplete] ({ statusCode, headers }) { | ||
const { client, resumeSocket } = this | ||
const request = client[kQueue][client[kRunningIdx]] | ||
const { signal, opaque } = request | ||
const skipBody = request.method === 'HEAD' | ||
[HTTPParser.kOnHeaders] (rawHeaders) { | ||
this.headers = parseHeaders(rawHeaders, this.headers) | ||
} | ||
assert(!this.read) | ||
assert(!this.body) | ||
[HTTPParser.kOnExecute] (ret) { | ||
const { socket } = this | ||
if (statusCode === 101) { | ||
request.invoke(new NotSupportedError('101 response not supported')) | ||
return true | ||
if (ret instanceof Error) { | ||
const err = ret | ||
if (typeof err.reason === 'string') { | ||
err.message = `Parse Error: ${err.reason}` | ||
} | ||
socket.destroy(err) | ||
} else { | ||
// When the underlying `net.Socket` instance is consumed - no | ||
// `data` events are emitted, and thus `socket.setTimeout` fires the | ||
// callback even if the data is constantly flowing into the socket. | ||
// See, https://github.com/nodejs/node/commit/ec2822adaad76b126b5cccdeaa1addf2376c9aa6 | ||
socket._unrefTimer() | ||
} | ||
} | ||
if (statusCode < 200) { | ||
// TODO: Informational response. | ||
return true | ||
} | ||
[HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method, | ||
url, statusCode, statusMessage, upgrade, shouldKeepAlive) { | ||
const { client, socket, resumeSocket, headers } = this | ||
const request = client[kQueue][client[kRunningIdx]] | ||
let body = request.invoke(null, { | ||
statusCode, | ||
headers: parseHeaders(headers), | ||
opaque, | ||
resume: resumeSocket | ||
}) | ||
// TODO: What if !shouldKeepAlive? | ||
// TODO: What if upgrade? | ||
// TODO: What if request.method === 'CONNECT'? | ||
if (body && skipBody) { | ||
body(null, null) | ||
body = null | ||
} | ||
assert(this.statusCode < 200) | ||
if (body) { | ||
this.body = body | ||
this.headers = null | ||
this.statusCode = statusCode | ||
if (signal) { | ||
signal.once('error', body) | ||
} | ||
} else { | ||
this.next() | ||
if (statusCode === 101) { | ||
// TODO: Switching Protocols. | ||
socket.destroy(new NotSupportedError('101 response not supported')) | ||
return true | ||
} | ||
return skipBody | ||
request.headers(statusCode, parseHeaders(rawHeaders, headers), resumeSocket) | ||
return request.method === 'HEAD' || statusCode < 200 | ||
} | ||
[HTTPParser.kOnBody] (chunk, offset, length) { | ||
this.read += length | ||
const { client, socket, statusCode } = this | ||
const request = client[kQueue][client[kRunningIdx]] | ||
const { client, socket, body, read } = this | ||
assert(statusCode >= 200) | ||
const ret = body | ||
? body(null, chunk.slice(offset, offset + length)) | ||
: null | ||
this.read += length | ||
if (ret == null && read > client[kMaxAbortedPayload]) { | ||
const ret = request.push(chunk, offset, length) | ||
if (ret == null && this.read > client[kMaxAbortedPayload]) { | ||
// TODO: Provide a descriptive error? | ||
socket.destroy() | ||
@@ -378,69 +430,49 @@ } else if (ret === false) { | ||
[HTTPParser.kOnMessageComplete] () { | ||
const { body } = this | ||
const { client, socket, statusCode, headers } = this | ||
const request = client[kQueue][client[kRunningIdx]] | ||
this.read = 0 | ||
this.body = null | ||
this.statusCode = null | ||
this.headers = null | ||
if (body) { | ||
body(null, null) | ||
this.next() | ||
request.complete(headers) | ||
if (statusCode >= 200) { | ||
this.read = 0 | ||
client[kQueue][client[kRunningIdx]++] = null | ||
resume(client) | ||
} | ||
} | ||
next () { | ||
const { client, resumeSocket } = this | ||
resumeSocket() | ||
client[kQueue][client[kRunningIdx]++] = null | ||
resume(client) | ||
socket.resume() | ||
} | ||
destroy (err) { | ||
const { client, body } = this | ||
const { client } = this | ||
assert(err) | ||
this.unconsume() | ||
// Make sure the parser's stack has unwound before deleting the | ||
// corresponding C++ object through .close(). | ||
setImmediate(() => this.close()) | ||
if (client[kRunningIdx] >= client[kPendingIdx]) { | ||
assert(!body) | ||
return | ||
} | ||
this.read = 0 | ||
this.body = null | ||
// Retry all idempotent requests except for the one | ||
// at the head of the pipeline. | ||
client[kQueue][client[kRunningIdx]++].error(err) | ||
const retryRequests = [] | ||
const errorRequests = [] | ||
errorRequests.push(client[kQueue][client[kRunningIdx]++]) | ||
for (const request of client[kQueue].slice(client[kRunningIdx], client[kPendingIdx])) { | ||
const { idempotent, body } = request | ||
/* istanbul ignore else: can't happen because of guard in resume */ | ||
/* istanbul ignore next: can't happen because of guard in resume */ | ||
if (idempotent && (!body || typeof body.pipe !== 'function')) { | ||
retryRequests.push(request) | ||
} else { | ||
errorRequests.push(request) | ||
} | ||
const { idempotent, streaming } = request | ||
assert(idempotent && !streaming) | ||
retryRequests.push(request) | ||
} | ||
client[kQueue].splice(0, client[kPendingIdx], ...retryRequests) | ||
client[kPendingIdx] = 0 | ||
client[kRunningIdx] = 0 | ||
if (body) { | ||
body(err, null) | ||
} | ||
for (const request of errorRequests) { | ||
request.invoke(err, null) | ||
} | ||
resume(client) | ||
} | ||
@@ -455,13 +487,28 @@ } | ||
const servername = client[kServerName] || (client[kTLSOpts] && client[kTLSOpts].servername) | ||
const socket = protocol === 'https:' | ||
? tls.connect(port || /* istanbul ignore next */ 443, hostname, { | ||
...client[kTLSOpts], | ||
servername | ||
}) | ||
: net.connect(port || /* istanbul ignore next */ 80, hostname) | ||
let socket | ||
if (protocol === 'https:') { | ||
const tlsOpts = { ...client[kTLSOpts], servername } | ||
socket = client[kSocketPath] | ||
? tls.connect(client[kSocketPath], tlsOpts) | ||
: tls.connect(port || /* istanbul ignore next */ 443, hostname, tlsOpts) | ||
} else { | ||
socket = client[kSocketPath] | ||
? net.connect(client[kSocketPath]) | ||
: net.connect(port || /* istanbul ignore next */ 80, hostname) | ||
} | ||
client[kSocket] = socket | ||
socket[kClient] = client | ||
socket[kParser] = new Parser(client, socket) | ||
const parser = new Parser(client, socket) | ||
/* istanbul ignore next */ | ||
if (nodeMajorVersion >= 12) { | ||
assert(socket._handle) | ||
parser.consume(socket._handle) | ||
} else { | ||
assert(socket._handle && socket._handle._externalStream) | ||
parser.consume(socket._handle._externalStream) | ||
} | ||
socket[kClosed] = false | ||
@@ -475,4 +522,2 @@ socket[kError] = null | ||
.on(protocol === 'https:' ? 'secureConnect' : 'connect', function () { | ||
const client = this[kClient] | ||
client[kRetryDelay] = 0 | ||
@@ -482,9 +527,4 @@ client.emit('connect') | ||
}) | ||
.on('data', function (chunk) { | ||
const parser = this[kParser] | ||
const err = parser.execute(chunk) | ||
if (err instanceof Error && !this.destroyed) { | ||
this.destroy(err) | ||
} | ||
.on('data', function () { | ||
assert(false) | ||
}) | ||
@@ -495,3 +535,3 @@ .on('error', function (err) { | ||
while (client.pending && client[kQueue][client[kPendingIdx]].servername === servername) { | ||
client[kQueue][client[kPendingIdx]++].invoke(err, null) | ||
client[kQueue][client[kPendingIdx]++].error(err) | ||
} | ||
@@ -506,6 +546,7 @@ } else if ( | ||
) { | ||
assert(client[kPendingIdx] === client[kRunningIdx]) | ||
// Error is not caused by running request and not a recoverable | ||
// socket error. | ||
for (const request of client[kQueue].splice(client[kPendingIdx])) { | ||
request.invoke(err, null) | ||
for (const request of client[kQueue].splice(client[kRunningIdx])) { | ||
request.error(err) | ||
} | ||
@@ -520,15 +561,10 @@ } | ||
.on('close', function () { | ||
const client = this[kClient] | ||
const parser = this[kParser] | ||
this[kClosed] = true | ||
if (!socket[kError]) { | ||
socket[kError] = new SocketError('closed') | ||
if (!this[kError]) { | ||
this[kError] = new SocketError('closed') | ||
} | ||
const err = socket[kError] | ||
parser.destroy(this[kError]) | ||
parser.destroy(err) | ||
if (client.destroyed) { | ||
@@ -539,10 +575,2 @@ resume(client) | ||
// reset events | ||
client[kSocket] | ||
.removeAllListeners('data') | ||
.removeAllListeners('end') | ||
.removeAllListeners('finish') | ||
.removeAllListeners('error') | ||
client[kSocket] | ||
.on('error', nop) | ||
client[kSocket] = null | ||
@@ -563,3 +591,5 @@ | ||
client.emit('disconnect', err) | ||
client.emit('disconnect', this[kError]) | ||
resume(client) | ||
}) | ||
@@ -571,5 +601,4 @@ } | ||
if (client[kDestroyed]) { | ||
const requests = client[kQueue].splice(client[kPendingIdx]) | ||
for (const request of requests) { | ||
request.invoke(new ClientDestroyedError(), null) | ||
for (const request of client[kQueue].splice(client[kPendingIdx])) { | ||
request.error(new ClientDestroyedError()) | ||
} | ||
@@ -607,3 +636,3 @@ return | ||
if (!request.callback) { | ||
if (request.finished) { | ||
// Request was aborted. | ||
@@ -623,3 +652,5 @@ // TODO: Avoid splice one by one. | ||
if (client[kSocket]) { | ||
// TODO: Provide a descriptive error? | ||
client[kSocket].destroy() | ||
return | ||
} | ||
@@ -663,18 +694,15 @@ } | ||
write(client, request) | ||
// Release memory for no longer required properties. | ||
request.headers = null | ||
request.body = null | ||
} | ||
} | ||
function write (client, { | ||
header, | ||
body, | ||
streaming, | ||
chunked, | ||
signal | ||
}) { | ||
const socket = client[kSocket] | ||
function write (client, request) { | ||
const { | ||
header, | ||
body, | ||
streaming, | ||
chunked | ||
} = request | ||
let socket = client[kSocket] | ||
socket.cork() | ||
@@ -685,3 +713,2 @@ socket.write(header) | ||
socket.write(CRLF) | ||
socket.uncork() | ||
} else if (!streaming) { | ||
@@ -691,15 +718,14 @@ socket.write(CRLF) | ||
socket.write(CRLF) | ||
socket.uncork() | ||
} else { | ||
if (chunked) { | ||
socket.write(TE_CHUNKED) | ||
} else { | ||
socket.write(CRLF) | ||
} | ||
socket.write(chunked ? TE_CHUNKED : CRLF) | ||
const onData = (chunk) => { | ||
if (chunked) { | ||
if (socket && chunked) { | ||
socket.write(`\r\n${Buffer.byteLength(chunk).toString(16)}\r\n`, 'ascii') | ||
} | ||
if (!socket.write(chunk)) { | ||
// TODO: If body.pause doesn't exists or doesn't stop 'data' events, it might cause | ||
// excessive memory usage. | ||
if (socket && !socket.write(chunk) && body.pause) { | ||
body.pause() | ||
@@ -709,3 +735,5 @@ } | ||
const onDrain = () => { | ||
body.resume() | ||
if (body.resume) { | ||
body.resume() | ||
} | ||
} | ||
@@ -716,8 +744,8 @@ const onAbort = () => { | ||
const onFinished = (err) => { | ||
if (!socket) { | ||
return | ||
} | ||
err = err || socket[kError] | ||
if (signal) { | ||
signal.removeListener('error', onFinished) | ||
} | ||
socket | ||
@@ -732,21 +760,15 @@ .removeListener('drain', onDrain) | ||
.removeListener('close', onAbort) | ||
.on('error', nop) | ||
if (err) { | ||
if (typeof body.destroy === 'function' && !body.destroyed) { | ||
body.destroy(err) | ||
} | ||
if (typeof body.destroy === 'function' && !body.destroyed) { | ||
body.destroy(err) | ||
} | ||
if (!socket.destroyed) { | ||
assert(client.running) | ||
socket.destroy(err) | ||
} | ||
} else { | ||
if (chunked) { | ||
socket.write(TE_CHUNKED_EOF) | ||
} else { | ||
socket.write(CRLF) | ||
} | ||
if (!err) { | ||
socket.write(chunked ? TE_CHUNKED_EOF : CRLF) | ||
} else if (!socket.destroyed) { | ||
socket.destroy(err) | ||
} | ||
socket = null | ||
client[kWriting] = false | ||
@@ -756,6 +778,2 @@ resume(client) | ||
if (signal) { | ||
signal.on('error', onFinished) | ||
} | ||
body | ||
@@ -771,10 +789,14 @@ .on('data', onData) | ||
.on('close', onFinished) | ||
.uncork() | ||
client[kWriting] = true | ||
} | ||
socket.uncork() | ||
} | ||
function parseHeaders (headers) { | ||
const obj = {} | ||
function parseHeaders (headers, obj) { | ||
obj = obj || {} | ||
if (!headers) { | ||
return obj | ||
} | ||
for (var i = 0; i < headers.length; i += 2) { | ||
@@ -781,0 +803,0 @@ var key = headers[i].toLowerCase() |
@@ -118,4 +118,7 @@ const { | ||
} else { | ||
assert(this._readableState.endEmitted) | ||
assert(!this[kResume]) | ||
if (!this._readableState.endEmitted) { | ||
// This can happen if the server doesn't care | ||
// about the entire request body. | ||
// TODO: Is this fine to ignore? | ||
} | ||
} | ||
@@ -215,3 +218,3 @@ | ||
// TODO: Should we allow !body? | ||
if (!body || typeof body.pipe !== 'function') { | ||
if (!body || typeof body.on !== 'function') { | ||
if (!ret.destroyed) { | ||
@@ -322,2 +325,4 @@ ret.destroy(new InvalidReturnValueError('expected Readable')) | ||
typeof body.write !== 'function' || | ||
typeof body.end !== 'function' || | ||
typeof body.on !== 'function' || | ||
typeof body.destroy !== 'function' || | ||
@@ -324,0 +329,0 @@ typeof body.destroyed !== 'boolean' |
@@ -11,2 +11,12 @@ 'use strict' | ||
class HeadersTimeoutError extends UndiciError { | ||
constructor (message) { | ||
super(message) | ||
Error.captureStackTrace(this, HeadersTimeoutError) | ||
this.name = 'HeadersTimeoutError' | ||
this.message = message || 'Headers Timeout Error' | ||
this.code = 'UND_ERR_HEADERS_TIMEOUT' | ||
} | ||
} | ||
class SocketTimeoutError extends UndiciError { | ||
@@ -105,2 +115,3 @@ constructor (message) { | ||
SocketTimeoutError, | ||
HeadersTimeoutError, | ||
RequestTimeoutError, | ||
@@ -107,0 +118,0 @@ InvalidArgumentError, |
@@ -92,3 +92,3 @@ 'use strict' | ||
for (const client of pool[kClients]) { | ||
if (client.full) { | ||
if (client.busy) { | ||
continue | ||
@@ -95,0 +95,0 @@ } |
@@ -6,16 +6,9 @@ 'use strict' | ||
InvalidArgumentError, | ||
NotSupportedError, | ||
RequestAbortedError, | ||
RequestTimeoutError | ||
} = require('./errors') | ||
const EE = require('events') | ||
const assert = require('assert') | ||
const net = require('net') | ||
function isValidBody (body) { | ||
return body == null || | ||
body instanceof Uint8Array || | ||
typeof body === 'string' || | ||
typeof body.pipe === 'function' | ||
} | ||
class Request extends AsyncResource { | ||
@@ -54,3 +47,12 @@ constructor ({ | ||
if (!isValidBody(body)) { | ||
if (method === 'CONNECT') { | ||
throw new NotSupportedError('CONNECT method not supported') | ||
} | ||
if ( | ||
body != null && | ||
!(body instanceof Uint8Array) && | ||
typeof body !== 'string' && | ||
typeof body.on !== 'function' | ||
) { | ||
throw new InvalidArgumentError('body must be a string, a Buffer or a Readable stream') | ||
@@ -61,7 +63,5 @@ } | ||
this.signal = null | ||
this.method = method | ||
this.streaming = body && typeof body.pipe === 'function' | ||
this.streaming = body && typeof body.on === 'function' | ||
@@ -83,2 +83,4 @@ this.body = typeof body === 'string' | ||
this.finished = false | ||
this.opaque = opaque | ||
@@ -92,3 +94,3 @@ | ||
this.body.on('error', (err) => { | ||
this.invoke(err, null) | ||
this.error(err) | ||
}) | ||
@@ -125,9 +127,4 @@ } | ||
if (signal) { | ||
/* istanbul ignore else: can't happen but kept in case of refactoring */ | ||
if (!this.signal) { | ||
this.signal = new EE() | ||
} | ||
const onAbort = () => { | ||
this.signal.emit('error', new RequestAbortedError()) | ||
this.error(new RequestAbortedError()) | ||
} | ||
@@ -143,49 +140,74 @@ | ||
if (requestTimeout) { | ||
if (!this.signal) { | ||
this.signal = new EE() | ||
} | ||
this.timeout = setTimeout((self) => { | ||
self.error(new RequestTimeoutError()) | ||
}, requestTimeout, this) | ||
} | ||
} | ||
const onTimeout = () => { | ||
this.signal.emit('error', new RequestTimeoutError()) | ||
} | ||
wrap (that, cb) { | ||
return this.runInAsyncScope.bind(this, cb, that) | ||
} | ||
this.timeout = setTimeout(onTimeout, requestTimeout) | ||
headers (statusCode, headers, resume) { | ||
if (statusCode < 200) { | ||
// TODO: Informational response. | ||
return | ||
} | ||
if (this.signal) { | ||
this.signal.on('error', (err) => { | ||
assert(err) | ||
this.invoke(err, null) | ||
}) | ||
if (this.finished) { | ||
return | ||
} | ||
this.finished = true | ||
clearTimeout(this.timeout) | ||
this.timeout = null | ||
this.res = this.runInAsyncScope(this.callback, this, null, { | ||
statusCode, | ||
headers, | ||
opaque: this.opaque, | ||
resume | ||
}) | ||
assert(!this.res || typeof this.res === 'function') | ||
} | ||
wrap (that, cb) { | ||
return this.runInAsyncScope.bind(this, cb, that) | ||
push (chunk, offset, length) { | ||
if (this.res) { | ||
return this.res(null, chunk.slice(offset, offset + length)) | ||
} | ||
} | ||
invoke (err, val) { | ||
const { callback } = this | ||
complete (trailers) { | ||
// TODO: Trailers? | ||
if (!callback) { | ||
return | ||
if (this.res) { | ||
this.res(null, null) | ||
this.res = null | ||
} | ||
} | ||
if ( | ||
this.body && | ||
typeof this.body.destroy === 'function' && | ||
!this.body.destroyed | ||
) { | ||
this.body.destroy(err) | ||
error (err) { | ||
if (this.body) { | ||
// TODO: If this.body.destroy doesn't exists or doesn't emit 'error' or | ||
// 'close', it can halt execution in client. | ||
if (typeof this.body.destroy === 'function' && !this.body.destroyed) { | ||
this.body.destroy(err) | ||
} | ||
this.body = null | ||
} | ||
if (this.res) { | ||
this.res(err, null) | ||
this.res = null | ||
} | ||
if (this.finished) { | ||
return | ||
} | ||
this.finished = true | ||
clearTimeout(this.timeout) | ||
this.timeout = null | ||
this.body = null | ||
this.servername = null | ||
this.callback = null | ||
this.opaque = null | ||
this.headers = null | ||
return this.runInAsyncScope(callback, this, err, val) | ||
this.runInAsyncScope(this.callback, this, err, null) | ||
} | ||
@@ -192,0 +214,0 @@ } |
@@ -11,2 +11,4 @@ module.exports = { | ||
kDestroyed: Symbol('destroyed'), | ||
kMaxHeadersSize: Symbol('maxHeaderSize'), | ||
kHeadersTimeout: Symbol('maxHeaderSize'), | ||
kRunningIdx: Symbol('running index'), | ||
@@ -19,9 +21,8 @@ kPendingIdx: Symbol('pending index'), | ||
kRetryDelay: Symbol('retry delay'), | ||
kSocketPath: Symbol('socket path'), | ||
kSocket: Symbol('socket'), | ||
kParser: Symbol('parser'), | ||
kClients: Symbol('clients'), | ||
kRetryTimeout: Symbol('retry timeout'), | ||
kClient: Symbol('client'), | ||
kEnqueue: Symbol('enqueue'), | ||
kMaxAbortedPayload: Symbol('max aborted payload') | ||
} |
{ | ||
"name": "undici", | ||
"version": "1.0.3", | ||
"version": "1.1.0", | ||
"description": "An HTTP/1.1 client, written from scratch for Node.js", | ||
@@ -39,5 +39,2 @@ "main": "index.js", | ||
}, | ||
"dependencies": { | ||
"http-parser-js": "^0.5.2" | ||
}, | ||
"pre-commit": [ | ||
@@ -44,0 +41,0 @@ "coverage" |
@@ -5,3 +5,3 @@ # undici | ||
An HTTP/1.1 client, written from scratch for Node.js. | ||
A HTTP/1.1 client, written from scratch for Node.js. | ||
@@ -23,10 +23,10 @@ > Undici means eleven in Italian. 1.1 -> 11 -> Eleven -> Undici. | ||
Machine: 2.7 GHz Quad-Core Intel Core i7<br/> | ||
Configuration: Node v14.2, HTTP/1.1 without TLS, 100 connections | ||
Machine: 2.8GHz AMD EPYC 7402P<br/> | ||
Configuration: Node v14.4, HTTP/1.1 without TLS, 100 connections, Linux 5.4.12-1-lts | ||
``` | ||
http - keepalive - pipe x 6,545 ops/sec ±12.47% (64 runs sampled) | ||
undici - pipeline - pipe x 9,560 ops/sec ±3.68% (77 runs sampled) | ||
undici - request - pipe x 9,797 ops/sec ±6.80% (77 runs sampled) | ||
undici - stream - pipe x 11,599 ops/sec ±0.89% (78 runs sampled) | ||
http - keepalive - pipe x 5,768 ops/sec ±4.17% (71 runs sampled) | ||
undici - pipeline - pipe x 7,151 ops/sec ±2.59% (80 runs sampled) | ||
undici - request - pipe x 11,618 ops/sec ±4.43% (72 runs sampled) | ||
undici - stream - pipe x 12,592 ops/sec ±1.03% (81 runs sampled) | ||
``` | ||
@@ -46,2 +46,3 @@ | ||
It should only include the protocol, hostname, and the port. | ||
Options: | ||
@@ -53,2 +54,5 @@ | ||
- `socketPath`, an IPC endpoint, either Unix domain socket or Windows named pipe. | ||
Default: `null`, | ||
- `requestTimeout`, the timeout after which a request will time out, in | ||
@@ -62,3 +66,3 @@ milliseconds. Monitors time between request being enqueued and receiving | ||
will error other inflight requests in the pipeline. | ||
Default: `1e6` bytes (1MiB). | ||
Default: `1048576` bytes (1MiB). | ||
@@ -72,7 +76,15 @@ - `pipelining`, the amount of concurrent requests to be sent over the | ||
[`tls.connect`](https://nodejs.org/api/tls.html#tls_tls_connect_options_callback). | ||
Default: `null`, | ||
- `maxHeaderSize`, the maximum length of request headers in bytes. | ||
Default: `16384` (16KiB). | ||
- `headersTimeout`, the amount of time the parser will wait to receive the complete | ||
HTTP headers (Node 14 and above only). | ||
Default: `30e3` milliseconds (30s). | ||
<a name='request'></a> | ||
#### `client.request(opts, callback(err, data))` | ||
Performs an HTTP request. | ||
Performs a HTTP request. | ||
@@ -169,3 +181,3 @@ Options: | ||
const client = new Client'http://localhost:3000') | ||
const client = new Client('http://localhost:3000') | ||
const abortController = new AbortController() | ||
@@ -357,4 +369,5 @@ | ||
Destroy the client abruptly with the given `err`. All the pending and running | ||
requests will be aborted and error. Waits until socket is closed before | ||
invoking the callback. | ||
requests will be asynchronously aborted and error. Waits until socket is closed | ||
before invoking the callback. Since this operation is asynchronously dispatched | ||
there might still be some progress on dispatched requests. | ||
@@ -385,7 +398,6 @@ Returns a promise if no callback is provided. | ||
#### `client.full` | ||
#### `client.busy` | ||
True if `client.size` is greater than the `client.pipelining` factor. | ||
Keeping a client full ensures that once a inflight requests finishes | ||
the the pipeline will schedule new one and keep the pipeline saturated. | ||
True if pipeline is saturated or blocked. Indicicates whether dispatching | ||
further requests is meaningful. | ||
@@ -482,3 +494,3 @@ #### `client.closed` | ||
#### Upgrade | ||
#### Switching Protocols | ||
@@ -504,2 +516,9 @@ Undici does not support the the `Upgrade` request header field. A | ||
### CONNECT | ||
Undici doea not support the http `CONNECT` method. Dispatching a `CONNECT` | ||
request will cause an `UND_ERR_NOT_SUPPORTED` error. | ||
Refs: https://tools.ietf.org/html/rfc7231#section-4.3.6 | ||
### Pipelining | ||
@@ -506,0 +525,0 @@ |
@@ -152,10 +152,3 @@ 'use strict' | ||
t.error(err) | ||
const buf1 = buf.slice(0, buf.length / 2) | ||
const buf2 = buf.slice(buf.length / 2) | ||
// we split the file so that it's received in 2 chunks | ||
// and it should restore the state on the second | ||
res.write(buf1) | ||
setTimeout(() => { | ||
res.end(buf2) | ||
}, 10) | ||
res.write('asd') | ||
}) | ||
@@ -162,0 +155,0 @@ }) |
@@ -10,3 +10,2 @@ 'use strict' | ||
const { | ||
kParser, | ||
kSocket, | ||
@@ -246,3 +245,3 @@ kEnqueue | ||
test('invalid options throws', (t) => { | ||
t.plan(24) | ||
t.plan(28) | ||
@@ -288,2 +287,11 @@ try { | ||
new Client(new URL('http://localhost:200'), { // eslint-disable-line | ||
headersTimeout: 'asd' | ||
}) // eslint-disable-line | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid headersTimeout') | ||
} | ||
try { | ||
new Client(new URL('http://localhost:200'), { // eslint-disable-line | ||
socketTimeout: 'asd' | ||
@@ -324,2 +332,11 @@ }) | ||
try { | ||
new Client(new URL('http://localhost:200'), { // eslint-disable-line | ||
maxHeaderSize: 'asd' | ||
}) | ||
} catch (err) { | ||
t.ok(err instanceof errors.InvalidArgumentError) | ||
t.strictEqual(err.message, 'invalid maxHeaderSize') | ||
} | ||
try { | ||
new Client(1) // eslint-disable-line | ||
@@ -470,43 +487,2 @@ } catch (err) { | ||
test('reset parser', (t) => { | ||
t.plan(6) | ||
const server = createServer() | ||
let res2 | ||
server.on('request', (req, res) => { | ||
res2 = res | ||
res.write('asd') | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.destroy.bind(client)) | ||
client.request({ path: '/', method: 'GET' }, (err, { body }) => { | ||
t.error(err) | ||
res2.destroy() | ||
body.resume() | ||
body.on('error', err => { | ||
t.ok(err) | ||
}) | ||
}) | ||
client.once('disconnect', () => { | ||
client.request({ path: '/', method: 'GET' }, (err, { body }) => { | ||
t.error(err) | ||
res2.destroy() | ||
body.resume() | ||
body.on('error', err => { | ||
t.ok(err) | ||
}) | ||
}) | ||
client.on('connect', () => { | ||
t.ok(!client[kSocket][kParser].chunk) | ||
t.ok(!client[kSocket][kParser].offset) | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('validate request body', (t) => { | ||
@@ -513,0 +489,0 @@ t.plan(6) |
@@ -42,5 +42,52 @@ 'use strict' | ||
return pipeline(body, new PassThrough(), () => {}) | ||
}, (err) => { | ||
}), | ||
new Writable({ | ||
write (chunk, encoding, callback) { | ||
res += chunk.toString() | ||
callback() | ||
}, | ||
final (callback) { | ||
t.strictEqual(res, buf1 + buf2) | ||
callback() | ||
} | ||
}), | ||
(err) => { | ||
t.error(err) | ||
} | ||
) | ||
}) | ||
}) | ||
test('pipeline ignore request body', (t) => { | ||
t.plan(2) | ||
let done | ||
const server = createServer((req, res) => { | ||
res.write('asd') | ||
res.end() | ||
done() | ||
}) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
let res = '' | ||
const buf1 = Buffer.alloc(1e3).toString() | ||
const buf2 = Buffer.alloc(1e6).toString() | ||
pipeline( | ||
new Readable({ | ||
read () { | ||
this.push(buf1) | ||
this.push(buf2) | ||
done = () => this.push(null) | ||
} | ||
}), | ||
client.pipeline({ | ||
path: '/', | ||
method: 'PUT' | ||
}, ({ body }) => { | ||
return pipeline(body, new PassThrough(), () => {}) | ||
}), | ||
new Writable({ | ||
@@ -52,3 +99,3 @@ write (chunk, encoding, callback) { | ||
final (callback) { | ||
t.strictEqual(buf1 + buf2, res) | ||
t.strictEqual(res, 'asd') | ||
callback() | ||
@@ -138,4 +185,2 @@ } | ||
return pipeline(body, pt, () => {}) | ||
}, (err) => { | ||
t.error(err) | ||
}), | ||
@@ -181,4 +226,2 @@ new PassThrough(), | ||
return pipeline(body, pt, () => {}) | ||
}, (err) => { | ||
t.error(err) | ||
}), | ||
@@ -436,3 +479,3 @@ new PassThrough(), | ||
if (process.versions.node.split('.')[0] < 13) { | ||
t.ok(err instanceof errors.RequestAbortedError) | ||
t.ok(err) | ||
} else { | ||
@@ -439,0 +482,0 @@ t.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE') |
@@ -105,3 +105,3 @@ 'use strict' | ||
}) | ||
return !client.full | ||
return !client.busy | ||
} | ||
@@ -128,3 +128,3 @@ }) | ||
// x * 6 + 1 t.ok + 5 drain | ||
t.plan(num * 6 + 1 + 5) | ||
t.plan(num * 6 + 1 + 5 + 2) | ||
@@ -150,11 +150,13 @@ let count = 0 | ||
for (; sent < 2;) { | ||
t.notOk(client.full, 'client is not full') | ||
t.notOk(client.size > client.pipelining, 'client is not full') | ||
makeRequest() | ||
t.ok(!client.full, 'we can send more requests') | ||
t.ok(client.size <= client.pipelining, 'we can send more requests') | ||
} | ||
t.notOk(client.full, 'client is full') | ||
t.ok(client.busy, 'client is busy') | ||
t.notOk(client.size > client.pipelining, 'client is full') | ||
makeRequest() | ||
t.ok(client.full, 'we must stop now') | ||
t.ok(client.full, 'client is full') | ||
t.ok(client.busy, 'we must stop now') | ||
t.ok(client.busy, 'client is busy') | ||
t.ok(client.size > client.pipelining, 'client is full') | ||
@@ -164,3 +166,3 @@ function makeRequest () { | ||
count-- | ||
process.nextTick(() => { | ||
setImmediate(() => { | ||
if (client.size === 0) { | ||
@@ -170,3 +172,3 @@ t.ok(countGreaterThanOne, 'seen more than one parallel request') | ||
for (; sent < start + 2 && sent < num;) { | ||
t.notOk(client.full, 'client is not full') | ||
t.notOk(client.size > client.pipelining, 'client is not full') | ||
t.ok(makeRequest()) | ||
@@ -177,3 +179,3 @@ } | ||
}) | ||
return !client.full | ||
return client.size <= client.pipelining | ||
} | ||
@@ -184,3 +186,3 @@ }) | ||
test('pipeline 1 is 1 active request', (t) => { | ||
t.plan(8) | ||
t.plan(9) | ||
@@ -222,3 +224,4 @@ let res2 | ||
}) | ||
t.ok(!client.full) | ||
t.ok(client.size <= client.pipelining) | ||
t.ok(client.busy) | ||
t.strictEqual(client.size, 1) | ||
@@ -225,0 +228,0 @@ }) |
@@ -9,5 +9,6 @@ 'use strict' | ||
const { kSocket } = require('../lib/symbols') | ||
const EE = require('events') | ||
test('basic get', (t) => { | ||
t.plan(7) | ||
t.plan(14) | ||
@@ -39,2 +40,14 @@ const server = createServer((req, res) => { | ||
}) | ||
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
t.strictEqual(statusCode, 200) | ||
t.strictEqual(headers['content-type'], 'text/plain') | ||
const bufs = [] | ||
body.on('data', (buf) => { | ||
bufs.push(buf) | ||
}) | ||
body.on('end', () => { | ||
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) | ||
}) | ||
}) | ||
}) | ||
@@ -44,3 +57,3 @@ }) | ||
test('basic head', (t) => { | ||
t.plan(7) | ||
t.plan(14) | ||
@@ -70,2 +83,13 @@ const server = createServer((req, res) => { | ||
}) | ||
client.request({ path: '/', method: 'HEAD' }, (err, { statusCode, headers, body }) => { | ||
t.error(err) | ||
t.strictEqual(statusCode, 200) | ||
t.strictEqual(headers['content-type'], 'text/plain') | ||
body | ||
.resume() | ||
.on('end', () => { | ||
t.pass() | ||
}) | ||
}) | ||
}) | ||
@@ -295,2 +319,50 @@ }) | ||
test('basic POST with custom stream', (t) => { | ||
t.plan(7) | ||
const expected = readFileSync(__filename, 'utf8') | ||
const server = createServer(postServer(t, expected)) | ||
t.tearDown(server.close.bind(server)) | ||
server.listen(0, () => { | ||
const client = new Client(`http://localhost:${server.address().port}`) | ||
t.tearDown(client.close.bind(client)) | ||
const body = new EE() | ||
body.pipe = () => {} | ||
client.request({ | ||
path: '/', | ||
method: 'POST', | ||
headers: { | ||
'content-length': Buffer.byteLength(expected) | ||
}, | ||
requestTimeout: 0, | ||
body | ||
}, (err, data) => { | ||
t.error(err) | ||
t.strictEqual(data.statusCode, 200) | ||
const bufs = [] | ||
data.body.on('data', (buf) => { | ||
bufs.push(buf) | ||
}) | ||
data.body.on('end', () => { | ||
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) | ||
}) | ||
}) | ||
t.strictDeepEqual(client.busy, true) | ||
body.on('close', () => { | ||
body.emit('end') | ||
}) | ||
client.on('connect', () => { | ||
setImmediate(() => { | ||
body.emit('data', expected) | ||
body.emit('close') | ||
}) | ||
}) | ||
}) | ||
}) | ||
test('basic POST with transfer encoding: chunked', (t) => { | ||
@@ -499,3 +571,3 @@ t.plan(6) | ||
test('only one streaming req at a time', (t) => { | ||
t.plan(4) | ||
t.plan(5) | ||
@@ -542,2 +614,3 @@ const server = createServer((req, res) => { | ||
}) | ||
t.strictEqual(client.busy, true) | ||
}) | ||
@@ -544,0 +617,0 @@ }) |
@@ -113,3 +113,3 @@ 'use strict' | ||
}) | ||
return !client.full | ||
return client.size <= client.pipelining | ||
} | ||
@@ -145,3 +145,3 @@ }) | ||
}) | ||
return !client.full | ||
return client.size <= client.pipelining | ||
} | ||
@@ -178,3 +178,3 @@ }) | ||
}) | ||
return !client.full | ||
return client.size <= client.pipelining | ||
} | ||
@@ -181,0 +181,0 @@ }) |
@@ -40,3 +40,3 @@ 'use strict' | ||
test('error 101', (t) => { | ||
t.plan(1) | ||
t.plan(2) | ||
@@ -57,3 +57,6 @@ const server = net.createServer((socket) => { | ||
}) | ||
client.on('disconnect', () => { | ||
t.pass() | ||
}) | ||
}) | ||
}) |
@@ -181,7 +181,7 @@ 'use strict' | ||
this.id = total++ | ||
this._full = false | ||
this._busy = false | ||
} | ||
get full () { | ||
return this._full | ||
get busy () { | ||
return this._busy | ||
} | ||
@@ -218,3 +218,3 @@ | ||
d1.client._full = true | ||
d1.client._busy = true | ||
@@ -233,3 +233,3 @@ pool.request({}, noop) // d4 = c1 | ||
d1.client._full = false | ||
d1.client._busy = false | ||
@@ -251,3 +251,3 @@ pool.request({}, noop) // d6 = c0 | ||
test('full', (t) => { | ||
test('busy', (t) => { | ||
t.plan(8 * 6) | ||
@@ -254,0 +254,0 @@ |
@@ -54,3 +54,3 @@ 'use strict' | ||
const client = new Client('https://140.82.118.3') | ||
t.tearDown(client.close.bind(client)) | ||
t.tearDown(client.destroy.bind(client)) | ||
@@ -57,0 +57,0 @@ let didDisconnect = false |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
210214
0
36
6640
542
30
- Removedhttp-parser-js@^0.5.2
- Removedhttp-parser-js@0.5.8(transitive)