undici-thread-interceptor
Advanced tools
| 'use strict' | ||
| const { Writable, Readable, pipeline } = require('node:stream') | ||
| class MessagePortWritable extends Writable { | ||
| #otherSideDestroyed = false | ||
| constructor ({ port }) { | ||
| super({ decodeStrings: false }) | ||
| this.messagePort = port | ||
| this._callback = null | ||
| this.messagePort.on('message', (control) => { | ||
| if (control.more) { | ||
| const callback = this._callback | ||
| this._callback = null | ||
| if (callback) { | ||
| callback() | ||
| } | ||
| } else if (control.err) { | ||
| this.#otherSideDestroyed = true | ||
| this.destroy(control.err) | ||
| } | ||
| }) | ||
| this.messagePort.on('close', () => { | ||
| if (!this.destroyed && !this.writableFinished) { | ||
| this.destroy(new Error('message port closed')) | ||
| } | ||
| }) | ||
| } | ||
| _write (chunk, encoding, callback) { | ||
| this.messagePort.postMessage({ chunks: [chunk] }) | ||
| this._callback = callback | ||
| } | ||
| _writev (chunks, callback) { | ||
| const toWrite = new Array(chunks.length) | ||
| for (let i = 0; i < chunks.length; i++) { | ||
| toWrite[i] = chunks[i].chunk | ||
| } | ||
| this.messagePort.postMessage({ chunks: toWrite }) | ||
| this._callback = callback | ||
| } | ||
| _destroy (err, callback) { | ||
| if (!this.#otherSideDestroyed) { | ||
| if (err) { | ||
| this.messagePort.postMessage({ err }) | ||
| } else { | ||
| this.messagePort.postMessage({ fin: true }) | ||
| } | ||
| } | ||
| setImmediate(() => { | ||
| this.messagePort.close() | ||
| callback(err) | ||
| }) | ||
| } | ||
| static asTransferable ({ body, worker }) { | ||
| const channel = new MessageChannel() | ||
| const stream = new MessagePortWritable({ | ||
| port: channel.port1 | ||
| }) | ||
| // We cork the writable side so that we can fill the stream with all data ready to be read | ||
| stream.cork() | ||
| pipeline(body, stream, () => { | ||
| // nothing do do here, we consume the stream and ignore errors | ||
| }) | ||
| process.nextTick(() => { | ||
| stream.uncork() | ||
| }) | ||
| return { port: channel.port2, transferList: [channel.port2], stream } | ||
| } | ||
| } | ||
| module.exports.MessagePortWritable = MessagePortWritable | ||
| class MessagePortReadable extends Readable { | ||
| #otherSideDestroyed = false | ||
| constructor ({ port }) { | ||
| super({ decodeStrings: false }) | ||
| this.messagePort = port | ||
| this.messagePort.on('message', (msg) => { | ||
| if (Array.isArray(msg.chunks)) { | ||
| for (const c of msg.chunks) { | ||
| this.push(c) | ||
| } | ||
| } else if (msg.fin) { | ||
| this.push(null) | ||
| } else if (msg.err) { | ||
| this.#otherSideDestroyed = true | ||
| this.destroy(msg.err) | ||
| } | ||
| }) | ||
| this.messagePort.on('close', () => { | ||
| if (!this.destroyed && !this.readableEnded) { | ||
| this.destroy(new Error('message port closed')) | ||
| } | ||
| }) | ||
| } | ||
| _read () { | ||
| this.messagePort.postMessage({ more: true }) | ||
| } | ||
| _destroy (err, callback) { | ||
| if (err && !this.#otherSideDestroyed) { | ||
| this.messagePort.postMessage({ err }) | ||
| } | ||
| setImmediate(() => { | ||
| this.messagePort.close() | ||
| callback(err) | ||
| }) | ||
| } | ||
| } | ||
| module.exports.MessagePortReadable = MessagePortReadable |
| 'use strict' | ||
| const { workerData } = require('node:worker_threads') | ||
| const { MessagePortReadable } = require('../../../lib/message-port-streams') | ||
| const readable = new MessagePortReadable({ | ||
| port: workerData.port | ||
| }) | ||
| readable.destroy(new Error('kaboom')) | ||
| readable.on('error', () => {}) |
| 'use strict' | ||
| const { parentPort } = require('node:worker_threads') | ||
| const { MessagePortReadable } = require('../../../lib/message-port-streams') | ||
| parentPort.once('message', ({ port }) => { | ||
| const readable = new MessagePortReadable({ | ||
| port | ||
| }) | ||
| const chunks = [] | ||
| readable.on('data', (chunk) => { | ||
| chunks.push(chunk) | ||
| }) | ||
| readable.on('end', () => { | ||
| parentPort.postMessage({ chunks }) | ||
| }) | ||
| }) |
| 'use strict' | ||
| const { workerData, parentPort } = require('node:worker_threads') | ||
| const { MessagePortReadable } = require('../../../lib/message-port-streams') | ||
| const readable = new MessagePortReadable({ | ||
| port: workerData.port | ||
| }) | ||
| const chunks = [] | ||
| readable.on('data', (chunk) => { | ||
| chunks.push(chunk) | ||
| }) | ||
| readable.on('end', () => { | ||
| parentPort.postMessage({ chunks }) | ||
| }) |
| 'use strict' | ||
| throw new Error('kaboom') |
| 'use strict' | ||
| const { workerData } = require('node:worker_threads') | ||
| const { MessagePortWritable } = require('../../../lib/message-port-streams') | ||
| const writable = new MessagePortWritable({ | ||
| port: workerData.port | ||
| }) | ||
| writable.destroy(new Error('kaboom')) | ||
| writable.on('error', () => {}) |
| 'use strict' | ||
| const { workerData } = require('node:worker_threads') | ||
| const { MessagePortWritable } = require('../../../lib/message-port-streams') | ||
| const writable = new MessagePortWritable({ | ||
| port: workerData.port | ||
| }) | ||
| writable.cork() | ||
| writable.write('Hello, A!') | ||
| writable.write('Hello, B!') | ||
| writable.write('Hello, C!') | ||
| writable.uncork() | ||
| writable.write('Hello, D!') | ||
| writable.cork() | ||
| writable.write('Hello, E!') | ||
| writable.write('Hello, F!') | ||
| writable.write('Hello, G!') | ||
| writable.uncork() | ||
| writable.end() |
| 'use strict' | ||
| const { workerData } = require('node:worker_threads') | ||
| const { MessagePortWritable } = require('../../../lib/message-port-streams') | ||
| const writable = new MessagePortWritable({ | ||
| port: workerData.port | ||
| }) | ||
| writable.write('Hello, World!') | ||
| writable.end() |
| 'use strict' | ||
| const { test } = require('node:test') | ||
| const { join } = require('node:path') | ||
| const { Worker, MessageChannel } = require('node:worker_threads') | ||
| const { MessagePortWritable, MessagePortReadable } = require('../lib/message-port-streams') | ||
| const { once } = require('node:events') | ||
| const { Readable } = require('node:stream') | ||
| test('producer to consumer', async (t) => { | ||
| const channel = new MessageChannel() | ||
| const readable = new MessagePortReadable({ | ||
| port: channel.port1 | ||
| }) | ||
| const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'producer.js'), { | ||
| workerData: { port: channel.port2 }, | ||
| transferList: [channel.port2] | ||
| }) | ||
| const exited = once(worker, 'exit') | ||
| for await (const chunk of readable) { | ||
| t.assert.equal(chunk.toString(), 'Hello, World!') | ||
| } | ||
| await exited | ||
| }) | ||
| test('consumer to producer', async (t) => { | ||
| const channel = new MessageChannel() | ||
| const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'consumer.js'), { | ||
| workerData: { port: channel.port1 }, | ||
| transferList: [channel.port1] | ||
| }) | ||
| const writable = new MessagePortWritable({ port: channel.port2, worker }) | ||
| writable.write('Hello, World!') | ||
| writable.end() | ||
| const [{ chunks }] = await once(worker, 'message') | ||
| t.assert.deepEqual(chunks, [Buffer.from('Hello, World!')]) | ||
| await once(worker, 'exit') | ||
| }) | ||
| test('writev', async (t) => { | ||
| const channel = new MessageChannel() | ||
| const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'producer-writev.js'), { | ||
| workerData: { port: channel.port2 }, | ||
| transferList: [channel.port2] | ||
| }) | ||
| const readable = new MessagePortReadable({ | ||
| port: channel.port1 | ||
| }) | ||
| const expected = [ | ||
| 'Hello, A!Hello, B!Hello, C!', | ||
| 'Hello, D!Hello, E!Hello, F!Hello, G!' | ||
| ] | ||
| const exited = once(worker, 'exit') | ||
| for await (const chunk of readable) { | ||
| t.assert.equal(chunk.toString(), expected.shift()) | ||
| } | ||
| await exited | ||
| }) | ||
| test('producer error', async (t) => { | ||
| const channel = new MessageChannel() | ||
| const readable = new MessagePortReadable({ | ||
| port: channel.port1 | ||
| }) | ||
| const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'producer-error.js'), { | ||
| workerData: { port: channel.port2 }, | ||
| transferList: [channel.port2] | ||
| }) | ||
| let closeEmitted = false | ||
| readable.on('close', () => { | ||
| closeEmitted = true | ||
| }) | ||
| const exited = once(worker, 'exit') | ||
| const [err] = await once(readable, 'error') | ||
| t.assert.equal(err.message, 'kaboom') | ||
| t.assert.equal(closeEmitted, true) | ||
| await exited | ||
| }) | ||
| test('consumer error', async (t) => { | ||
| const channel = new MessageChannel() | ||
| const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'consumer-error.js'), { | ||
| workerData: { port: channel.port1 }, | ||
| transferList: [channel.port1] | ||
| }) | ||
| const writable = new MessagePortWritable({ port: channel.port2, worker }) | ||
| let closeEmitted = false | ||
| writable.on('close', () => { | ||
| closeEmitted = true | ||
| }) | ||
| const exited = once(worker, 'exit') | ||
| const [err] = await once(writable, 'error') | ||
| t.assert.equal(err.message, 'kaboom') | ||
| t.assert.equal(closeEmitted, true) | ||
| await exited | ||
| }) | ||
| test('readable crash', async (t) => { | ||
| const channel = new MessageChannel() | ||
| const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'crash.js'), { | ||
| workerData: { port: channel.port2 }, | ||
| transferList: [channel.port2] | ||
| }) | ||
| const readable = new MessagePortReadable({ | ||
| port: channel.port1 | ||
| }) | ||
| const exited = once(worker, 'exit').catch((err) => { | ||
| t.assert.strictEqual(err.message, 'kaboom') | ||
| }) | ||
| const err = await once(readable, 'error') | ||
| t.assert.strictEqual(err[0].message, 'message port closed') | ||
| await exited | ||
| }) | ||
| test('writable crash', async (t) => { | ||
| const channel = new MessageChannel() | ||
| const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'crash.js'), { | ||
| workerData: { port: channel.port2 }, | ||
| transferList: [channel.port2] | ||
| }) | ||
| const writable = new MessagePortWritable({ | ||
| port: channel.port1 | ||
| }) | ||
| const exited = once(worker, 'exit').catch((err) => { | ||
| t.assert.strictEqual(err.message, 'kaboom') | ||
| }) | ||
| const err = await once(writable, 'error') | ||
| t.assert.strictEqual(err[0].message, 'message port closed') | ||
| await exited | ||
| }) | ||
| test('MessagePortWritable.asTransferable(stream, worker)', async (t) => { | ||
| const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'consumer-transferable.js')) | ||
| const body = new Readable({ | ||
| read () { | ||
| this.push('Hello, World!') | ||
| this.push(null) | ||
| } | ||
| }) | ||
| const { port, transferList } = MessagePortWritable.asTransferable({ | ||
| body, | ||
| worker | ||
| }) | ||
| worker.postMessage({ port }, transferList) | ||
| const [{ chunks }] = await once(worker, 'message') | ||
| t.assert.deepEqual(chunks, [Buffer.from('Hello, World!')]) | ||
| await once(worker, 'exit') | ||
| }) |
+9
-3
@@ -17,8 +17,14 @@ import { join } from 'path' | ||
| async function performRequest () { | ||
| const res = await request('http://myserver.local', { | ||
| dispatcher: agent, | ||
| }) | ||
| await res.body.text() | ||
| } | ||
| console.time('request') | ||
| const responses = [] | ||
| for (let i = 0; i < 100000; i++) { | ||
| responses.push(request('http://myserver.local', { | ||
| dispatcher: agent, | ||
| })) | ||
| responses.push(performRequest()) | ||
| } | ||
@@ -25,0 +31,0 @@ await Promise.all(responses) |
+105
-37
@@ -12,5 +12,8 @@ 'use strict' | ||
| const WrapHandler = require('./lib/wrap-handler') | ||
| const { MessagePortWritable, MessagePortReadable } = require('./lib/message-port-streams') | ||
| const kAddress = Symbol('undici-thread-interceptor.address') | ||
| const MAX_BODY = 32 * 1024 | ||
| function createThreadInterceptor (opts) { | ||
@@ -76,13 +79,14 @@ const routes = new Map() | ||
| if (newOpts.body?.[Symbol.asyncIterator]) { | ||
| collectBodyAndDispatch(newOpts, handler).then(() => { | ||
| port.postMessage({ type: 'request', id, opts: newOpts, threadId }) | ||
| }, (err) => { | ||
| clearTimeout(handle) | ||
| hooks.fireOnClientError(newOpts, null, err) | ||
| handler.onResponseError(controller, err) | ||
| if (typeof newOpts.body?.resume === 'function' || newOpts.body?.[Symbol.asyncIterator]) { | ||
| const body = newOpts.body | ||
| delete newOpts.body | ||
| const transferable = MessagePortWritable.asTransferable({ | ||
| body | ||
| }) | ||
| port.postMessage({ type: 'request', id, opts: newOpts, port: transferable.port, threadId }, transferable.transferList) | ||
| } else { | ||
| port.postMessage({ type: 'request', id, opts: newOpts, threadId }) | ||
| } | ||
| const inflights = portInflights.get(port) | ||
@@ -124,4 +128,5 @@ | ||
| // but we should consider adding a test for this in the future | ||
| /* c8 ignore next 4 */ | ||
| /* c8 ignore next 6 */ | ||
| if (controller.aborted) { | ||
| res.port?.close() | ||
| handler.onResponseError(controller, controller.reason) | ||
@@ -131,2 +136,4 @@ return | ||
| } catch (err) { | ||
| // No need to close the transferable port here, because it cannot happen | ||
| // for requests with a body | ||
| handler.onResponseError(controller, err) | ||
@@ -136,6 +143,32 @@ return | ||
| handler.onResponseData(controller, res.rawPayload) | ||
| handler.onResponseEnd(controller, []) | ||
| if (res.port) { | ||
| const body = new MessagePortReadable({ | ||
| port: res.port | ||
| }) | ||
| hooks.fireOnClientResponseEnd(newOpts, res, clientCtx) | ||
| controller.on('resume', () => { | ||
| body.resume() | ||
| }) | ||
| controller.on('pause', () => { | ||
| body.pause() | ||
| }) | ||
| body.on('data', (chunk) => { | ||
| handler.onResponseData(controller, chunk) | ||
| }) | ||
| body.on('end', () => { | ||
| handler.onResponseEnd(controller, []) | ||
| hooks.fireOnClientResponseEnd(newOpts, res, clientCtx) | ||
| }) | ||
| body.on('error', (err) => { | ||
| handler.onResponseError(controller, err) | ||
| }) | ||
| } else { | ||
| handler.onResponseData(controller, res.body) | ||
| handler.onResponseEnd(controller, []) | ||
| hooks.fireOnClientResponseEnd(newOpts, res, clientCtx) | ||
| } | ||
| })) | ||
@@ -290,4 +323,11 @@ | ||
| if (msg.type === 'request') { | ||
| const { id, opts } = msg | ||
| const { id, opts, port: bodyPort } = msg | ||
| let bodyReadable | ||
| if (bodyPort) { | ||
| bodyReadable = new MessagePortReadable({ | ||
| port: bodyPort | ||
| }) | ||
| } | ||
| const headers = {} | ||
@@ -306,6 +346,7 @@ | ||
| query: opts.query, | ||
| body: opts.body instanceof Uint8Array ? Buffer.from(opts.body) : opts.body, | ||
| body: opts.body || bodyReadable, | ||
| payloadAsStream: true | ||
| } | ||
| interceptor.hooks.fireOnServerRequest(injectOpts, () => { | ||
| const onInject = (err, res) => { | ||
| const onInject = async (err, res) => { | ||
| if (err) { | ||
@@ -317,20 +358,48 @@ interceptor.hooks.fireOnServerError(injectOpts, res, err) | ||
| const newRes = { | ||
| headers: res.headers, | ||
| statusCode: res.statusCode, | ||
| } | ||
| const length = res.headers['content-length'] | ||
| const parsedLength = length === undefined ? MAX_BODY : Number(length) | ||
| if (res.headers['content-type']?.indexOf('application/json') === 0) { | ||
| // TODO(mcollina): maybe use a fast path also for HTML | ||
| // fast path because it's utf-8, use a string | ||
| newRes.rawPayload = res.payload | ||
| let newRes | ||
| let forwardRes | ||
| let transferList | ||
| if (parsedLength < MAX_BODY) { | ||
| try { | ||
| const body = await collectBody(res.stream()) | ||
| newRes = { | ||
| headers: res.headers, | ||
| statusCode: res.statusCode, | ||
| body | ||
| } | ||
| forwardRes = { | ||
| type: 'response', | ||
| id, | ||
| res: newRes, | ||
| } | ||
| } catch (err) { | ||
| forwardRes = { | ||
| type: 'response', | ||
| id, | ||
| err | ||
| } | ||
| } | ||
| } else { | ||
| // slow path, buffer | ||
| newRes.rawPayload = res.rawPayload | ||
| } | ||
| const transferable = MessagePortWritable.asTransferable({ | ||
| body: res.stream() | ||
| }) | ||
| transferList = transferable.transferList | ||
| const forwardRes = { | ||
| type: 'response', | ||
| id, | ||
| res: newRes, | ||
| newRes = { | ||
| headers: res.headers, | ||
| statusCode: res.statusCode, | ||
| port: transferable.port, | ||
| } | ||
| forwardRes = { | ||
| type: 'response', | ||
| id, | ||
| res: newRes, | ||
| } | ||
| } | ||
@@ -342,3 +411,3 @@ | ||
| // that sent the request | ||
| this.postMessage(forwardRes) | ||
| this.postMessage(forwardRes, transferList) | ||
| } | ||
@@ -375,15 +444,14 @@ | ||
| async function collectBodyAndDispatch (opts) { | ||
| async function collectBody (stream) { | ||
| const data = [] | ||
| for await (const chunk of opts.body) { | ||
| for await (const chunk of stream) { | ||
| data.push(chunk) | ||
| } | ||
| if (typeof data[0] === 'string') { | ||
| opts.body = data.join('') | ||
| } else if (data[0] instanceof Buffer || data[0] instanceof Uint8Array) { | ||
| opts.body = Buffer.concat(data) | ||
| /* c8 ignore next 7 */ | ||
| if (data[0] instanceof Buffer || data[0] instanceof Uint8Array) { | ||
| return Buffer.concat(data) | ||
| } else { | ||
| throw new Error('Cannot transfer streams of objects') | ||
| throw new Error('Cannot transfer streams of strings or objects') | ||
| } | ||
@@ -390,0 +458,0 @@ } |
| 'use strict' | ||
| class DispatchController { | ||
| const { EventEmitter } = require('node:events') | ||
| class DispatchController extends EventEmitter { | ||
| #paused = false | ||
@@ -22,2 +24,3 @@ #reason = null | ||
| this.#paused = true | ||
| this.emit('pause') | ||
| } | ||
@@ -27,2 +30,3 @@ | ||
| this.#paused = false | ||
| this.emit('resume') | ||
| } | ||
@@ -29,0 +33,0 @@ |
+2
-2
| { | ||
| "name": "undici-thread-interceptor", | ||
| "version": "0.10.3", | ||
| "version": "0.11.0", | ||
| "description": "An Undici interceptor that routes requests over a worker thread", | ||
@@ -29,5 +29,5 @@ "main": "index.js", | ||
| "hyperid": "^3.2.0", | ||
| "light-my-request": "^6.0.0", | ||
| "light-my-request": "^6.5.1", | ||
| "undici": "^7.0.0" | ||
| } | ||
| } |
+64
-0
@@ -435,1 +435,65 @@ 'use strict' | ||
| }) | ||
| test('big stream using backpressure', async (t) => { | ||
| const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
| t.after(() => worker.terminate()) | ||
| const interceptor = createThreadInterceptor({ | ||
| domain: '.local', | ||
| }) | ||
| interceptor.route('myserver', worker) | ||
| const agent = new Agent().compose(interceptor) | ||
| const { statusCode, body } = await request('http://myserver.local/big', { | ||
| dispatcher: agent, | ||
| }) | ||
| strictEqual(statusCode, 200) | ||
| let size = 0 | ||
| body.on('readable', () => { | ||
| let chunk | ||
| while ((chunk = body.read()) !== null) { | ||
| size += chunk.length | ||
| } | ||
| }) | ||
| await once(body, 'end') | ||
| strictEqual(size, 1024 * 1024 * 100) | ||
| }) | ||
| test('handles an error within a stream response with a content length', async (t) => { | ||
| const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
| t.after(() => worker.terminate()) | ||
| const interceptor = createThreadInterceptor({ | ||
| domain: '.local', | ||
| }) | ||
| interceptor.route('myserver', worker) | ||
| const agent = new Agent().compose(interceptor) | ||
| await rejects(request('http://myserver.local/stream-error', { | ||
| dispatcher: agent, | ||
| })) | ||
| }) | ||
| test('handle an error with a stream response response without content length', async (t) => { | ||
| const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
| t.after(() => worker.terminate()) | ||
| const interceptor = createThreadInterceptor({ | ||
| domain: '.local', | ||
| }) | ||
| interceptor.route('myserver', worker) | ||
| const agent = new Agent().compose(interceptor) | ||
| const res = await request('http://myserver.local/stream-error-2', { | ||
| dispatcher: agent, | ||
| }) | ||
| strictEqual(res.statusCode, 200) | ||
| await rejects(res.body.text()) | ||
| }) |
@@ -18,4 +18,3 @@ 'use strict' | ||
| cb() | ||
| // cb(null, 'done') | ||
| } | ||
| }) |
@@ -15,6 +15,5 @@ 'use strict' | ||
| port: parentPort, | ||
| onServerResponse: (_req, res) => { | ||
| const payload = Buffer.from(res.rawPayload).toString() | ||
| console.log('onServerResponse called', JSON.stringify(payload)) | ||
| onServerResponse: (req) => { | ||
| console.log('onServerResponse called', req.url) | ||
| } | ||
| }) |
@@ -47,2 +47,49 @@ 'use strict' | ||
| app.get('/big', (req, reply) => { | ||
| let i = 0 | ||
| const big = new Readable({ | ||
| read () { | ||
| if (++i > 100) { | ||
| this.push(null) | ||
| return | ||
| } | ||
| this.push(Buffer.alloc(1024 * 1024, 'x')) | ||
| }, | ||
| }) | ||
| return big | ||
| }) | ||
| app.get('/stream-error', (req, reply) => { | ||
| // The content-lengh header is necessary to make sure that | ||
| // the mesh network collects the whole body | ||
| reply.header('content-length', 1024) | ||
| let called = false | ||
| reply.send(new Readable({ | ||
| read () { | ||
| if (!called) { | ||
| called = true | ||
| this.push('hello') | ||
| setTimeout(() => { | ||
| this.destroy(new Error('kaboom')) | ||
| }, 1000) | ||
| } | ||
| }, | ||
| })) | ||
| }) | ||
| app.get('/stream-error-2', (req, reply) => { | ||
| let called = false | ||
| reply.send(new Readable({ | ||
| read () { | ||
| if (!called) { | ||
| called = true | ||
| this.push('hello') | ||
| setTimeout(() => { | ||
| this.destroy(new Error('kaboom')) | ||
| }, 1000) | ||
| } | ||
| }, | ||
| })) | ||
| }) | ||
| app.post('/echo-body', (req, reply) => { | ||
@@ -49,0 +96,0 @@ reply.send(req.body) |
@@ -94,4 +94,4 @@ 'use strict' | ||
| domain: '.local', | ||
| onClientResponse: (_req, res) => { | ||
| hookCalled = Buffer.from(res.rawPayload).toString() | ||
| onClientResponse: (req) => { | ||
| hookCalled = req.path | ||
| } | ||
@@ -107,3 +107,3 @@ }) | ||
| strictEqual(statusCode, 200) | ||
| deepStrictEqual(hookCalled, '{"hello":"world"}') | ||
| strictEqual(hookCalled, '/') | ||
| }) | ||
@@ -119,3 +119,3 @@ | ||
| onClientResponseEnd: (_req, res) => { | ||
| hookCalled = Buffer.from(res.rawPayload).toString() | ||
| hookCalled = true | ||
| } | ||
@@ -131,3 +131,3 @@ }) | ||
| strictEqual(statusCode, 200) | ||
| deepStrictEqual(hookCalled, '{"hello":"world"}') | ||
| strictEqual(hookCalled, true) | ||
| }) | ||
@@ -182,3 +182,3 @@ | ||
| await sleep(300) | ||
| deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"}}']) | ||
| deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"},"payloadAsStream":true}']) | ||
| }) | ||
@@ -208,3 +208,3 @@ | ||
| await sleep(300) | ||
| deepStrictEqual(lines, ['onServerResponse called "{\\"hello\\":\\"world\\"}"']) | ||
| deepStrictEqual(lines, ['onServerResponse called /']) | ||
| }) | ||
@@ -257,3 +257,3 @@ | ||
| await sleep(300) | ||
| deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"}}', 'onServerResponse called: propagated']) | ||
| deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"},"payloadAsStream":true}', 'onServerResponse called: propagated']) | ||
| }) | ||
@@ -260,0 +260,0 @@ |
+37
-4
@@ -70,3 +70,3 @@ 'use strict' | ||
| await rejects(request('http://myserver.local/echo-body', { | ||
| const res = await request('http://myserver.local/echo-body', { | ||
| dispatcher: agent, | ||
@@ -82,3 +82,10 @@ method: 'POST', | ||
| }), | ||
| })) | ||
| }) | ||
| strictEqual(res.statusCode, 400) | ||
| deepStrictEqual(await res.body.json(), { | ||
| statusCode: 400, | ||
| error: 'Bad Request', | ||
| message: 'kaboom', | ||
| }) | ||
| }) | ||
@@ -110,3 +117,4 @@ | ||
| test('POST errors with streams of objects', async (t) => { | ||
| // Unskip when https://github.com/nodejs/node/pull/55270 is released | ||
| test.skip('POST errors with streams of objects', async (t) => { | ||
| const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
@@ -128,4 +136,29 @@ t.after(() => worker.terminate()) | ||
| }, | ||
| body: Readable.from([{ hello: 'world' }]), | ||
| body: Readable.from([{ hello: 'world' }]) | ||
| })) | ||
| }) | ||
| test('correctly handles aborted requests', async (t) => { | ||
| const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
| t.after(() => worker.terminate()) | ||
| const interceptor = createThreadInterceptor({ | ||
| domain: '.local', | ||
| }) | ||
| interceptor.route('myserver', worker) | ||
| const agent = new Agent().compose(interceptor) | ||
| const abortController = new AbortController() | ||
| setImmediate(() => abortController.abort()) | ||
| await rejects(request('http://myserver.local/unfinished-business', { | ||
| dispatcher: agent, | ||
| signal: abortController.signal, | ||
| method: 'POST', | ||
| headers: { | ||
| 'content-type': 'application/json', | ||
| }, | ||
| body: JSON.stringify({ hello: 'world' }) | ||
| })) | ||
| }) |
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
URL strings
Supply chain riskPackage contains fragments of external URLs or IP addresses, which the package may be accessing at runtime.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
Long strings
Supply chain riskContains long string literals, which may be a sign of obfuscated or packed code.
Found 1 instance in 1 package
URL strings
Supply chain riskPackage contains fragments of external URLs or IP addresses, which the package may be accessing at runtime.
Found 1 instance in 1 package
89677
20.54%49
22.5%2493
24.84%2
-33.33%Updated