Big News: Socket Selected for OpenAI's Cybersecurity Grant Program.Details
Socket
Book a DemoSign in
Socket

undici-thread-interceptor

Package Overview
Dependencies
Maintainers
8
Versions
44
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

undici-thread-interceptor - npm Package Compare versions

Comparing version
0.10.3
to
0.11.0
+123
lib/message-port-streams.js
'use strict'
const { Writable, Readable, pipeline } = require('node:stream')
class MessagePortWritable extends Writable {
#otherSideDestroyed = false
constructor ({ port }) {
super({ decodeStrings: false })
this.messagePort = port
this._callback = null
this.messagePort.on('message', (control) => {
if (control.more) {
const callback = this._callback
this._callback = null
if (callback) {
callback()
}
} else if (control.err) {
this.#otherSideDestroyed = true
this.destroy(control.err)
}
})
this.messagePort.on('close', () => {
if (!this.destroyed && !this.writableFinished) {
this.destroy(new Error('message port closed'))
}
})
}
_write (chunk, encoding, callback) {
this.messagePort.postMessage({ chunks: [chunk] })
this._callback = callback
}
_writev (chunks, callback) {
const toWrite = new Array(chunks.length)
for (let i = 0; i < chunks.length; i++) {
toWrite[i] = chunks[i].chunk
}
this.messagePort.postMessage({ chunks: toWrite })
this._callback = callback
}
_destroy (err, callback) {
if (!this.#otherSideDestroyed) {
if (err) {
this.messagePort.postMessage({ err })
} else {
this.messagePort.postMessage({ fin: true })
}
}
setImmediate(() => {
this.messagePort.close()
callback(err)
})
}
static asTransferable ({ body, worker }) {
const channel = new MessageChannel()
const stream = new MessagePortWritable({
port: channel.port1
})
// We cork the writable side so that we can fill the stream with all data ready to be read
stream.cork()
pipeline(body, stream, () => {
// nothing do do here, we consume the stream and ignore errors
})
process.nextTick(() => {
stream.uncork()
})
return { port: channel.port2, transferList: [channel.port2], stream }
}
}
module.exports.MessagePortWritable = MessagePortWritable
class MessagePortReadable extends Readable {
#otherSideDestroyed = false
constructor ({ port }) {
super({ decodeStrings: false })
this.messagePort = port
this.messagePort.on('message', (msg) => {
if (Array.isArray(msg.chunks)) {
for (const c of msg.chunks) {
this.push(c)
}
} else if (msg.fin) {
this.push(null)
} else if (msg.err) {
this.#otherSideDestroyed = true
this.destroy(msg.err)
}
})
this.messagePort.on('close', () => {
if (!this.destroyed && !this.readableEnded) {
this.destroy(new Error('message port closed'))
}
})
}
_read () {
this.messagePort.postMessage({ more: true })
}
_destroy (err, callback) {
if (err && !this.#otherSideDestroyed) {
this.messagePort.postMessage({ err })
}
setImmediate(() => {
this.messagePort.close()
callback(err)
})
}
}
module.exports.MessagePortReadable = MessagePortReadable
'use strict'
const { workerData } = require('node:worker_threads')
const { MessagePortReadable } = require('../../../lib/message-port-streams')
const readable = new MessagePortReadable({
port: workerData.port
})
readable.destroy(new Error('kaboom'))
readable.on('error', () => {})
'use strict'
const { parentPort } = require('node:worker_threads')
const { MessagePortReadable } = require('../../../lib/message-port-streams')
parentPort.once('message', ({ port }) => {
const readable = new MessagePortReadable({
port
})
const chunks = []
readable.on('data', (chunk) => {
chunks.push(chunk)
})
readable.on('end', () => {
parentPort.postMessage({ chunks })
})
})
'use strict'
const { workerData, parentPort } = require('node:worker_threads')
const { MessagePortReadable } = require('../../../lib/message-port-streams')
const readable = new MessagePortReadable({
port: workerData.port
})
const chunks = []
readable.on('data', (chunk) => {
chunks.push(chunk)
})
readable.on('end', () => {
parentPort.postMessage({ chunks })
})
'use strict'
throw new Error('kaboom')
'use strict'
const { workerData } = require('node:worker_threads')
const { MessagePortWritable } = require('../../../lib/message-port-streams')
const writable = new MessagePortWritable({
port: workerData.port
})
writable.destroy(new Error('kaboom'))
writable.on('error', () => {})
'use strict'
const { workerData } = require('node:worker_threads')
const { MessagePortWritable } = require('../../../lib/message-port-streams')
const writable = new MessagePortWritable({
port: workerData.port
})
writable.cork()
writable.write('Hello, A!')
writable.write('Hello, B!')
writable.write('Hello, C!')
writable.uncork()
writable.write('Hello, D!')
writable.cork()
writable.write('Hello, E!')
writable.write('Hello, F!')
writable.write('Hello, G!')
writable.uncork()
writable.end()
'use strict'
const { workerData } = require('node:worker_threads')
const { MessagePortWritable } = require('../../../lib/message-port-streams')
const writable = new MessagePortWritable({
port: workerData.port
})
writable.write('Hello, World!')
writable.end()
'use strict'
const { test } = require('node:test')
const { join } = require('node:path')
const { Worker, MessageChannel } = require('node:worker_threads')
const { MessagePortWritable, MessagePortReadable } = require('../lib/message-port-streams')
const { once } = require('node:events')
const { Readable } = require('node:stream')
test('producer to consumer', async (t) => {
const channel = new MessageChannel()
const readable = new MessagePortReadable({
port: channel.port1
})
const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'producer.js'), {
workerData: { port: channel.port2 },
transferList: [channel.port2]
})
const exited = once(worker, 'exit')
for await (const chunk of readable) {
t.assert.equal(chunk.toString(), 'Hello, World!')
}
await exited
})
test('consumer to producer', async (t) => {
const channel = new MessageChannel()
const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'consumer.js'), {
workerData: { port: channel.port1 },
transferList: [channel.port1]
})
const writable = new MessagePortWritable({ port: channel.port2, worker })
writable.write('Hello, World!')
writable.end()
const [{ chunks }] = await once(worker, 'message')
t.assert.deepEqual(chunks, [Buffer.from('Hello, World!')])
await once(worker, 'exit')
})
test('writev', async (t) => {
const channel = new MessageChannel()
const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'producer-writev.js'), {
workerData: { port: channel.port2 },
transferList: [channel.port2]
})
const readable = new MessagePortReadable({
port: channel.port1
})
const expected = [
'Hello, A!Hello, B!Hello, C!',
'Hello, D!Hello, E!Hello, F!Hello, G!'
]
const exited = once(worker, 'exit')
for await (const chunk of readable) {
t.assert.equal(chunk.toString(), expected.shift())
}
await exited
})
test('producer error', async (t) => {
const channel = new MessageChannel()
const readable = new MessagePortReadable({
port: channel.port1
})
const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'producer-error.js'), {
workerData: { port: channel.port2 },
transferList: [channel.port2]
})
let closeEmitted = false
readable.on('close', () => {
closeEmitted = true
})
const exited = once(worker, 'exit')
const [err] = await once(readable, 'error')
t.assert.equal(err.message, 'kaboom')
t.assert.equal(closeEmitted, true)
await exited
})
test('consumer error', async (t) => {
const channel = new MessageChannel()
const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'consumer-error.js'), {
workerData: { port: channel.port1 },
transferList: [channel.port1]
})
const writable = new MessagePortWritable({ port: channel.port2, worker })
let closeEmitted = false
writable.on('close', () => {
closeEmitted = true
})
const exited = once(worker, 'exit')
const [err] = await once(writable, 'error')
t.assert.equal(err.message, 'kaboom')
t.assert.equal(closeEmitted, true)
await exited
})
test('readable crash', async (t) => {
const channel = new MessageChannel()
const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'crash.js'), {
workerData: { port: channel.port2 },
transferList: [channel.port2]
})
const readable = new MessagePortReadable({
port: channel.port1
})
const exited = once(worker, 'exit').catch((err) => {
t.assert.strictEqual(err.message, 'kaboom')
})
const err = await once(readable, 'error')
t.assert.strictEqual(err[0].message, 'message port closed')
await exited
})
test('writable crash', async (t) => {
const channel = new MessageChannel()
const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'crash.js'), {
workerData: { port: channel.port2 },
transferList: [channel.port2]
})
const writable = new MessagePortWritable({
port: channel.port1
})
const exited = once(worker, 'exit').catch((err) => {
t.assert.strictEqual(err.message, 'kaboom')
})
const err = await once(writable, 'error')
t.assert.strictEqual(err[0].message, 'message port closed')
await exited
})
test('MessagePortWritable.asTransferable(stream, worker)', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'streams', 'consumer-transferable.js'))
const body = new Readable({
read () {
this.push('Hello, World!')
this.push(null)
}
})
const { port, transferList } = MessagePortWritable.asTransferable({
body,
worker
})
worker.postMessage({ port }, transferList)
const [{ chunks }] = await once(worker, 'message')
t.assert.deepEqual(chunks, [Buffer.from('Hello, World!')])
await once(worker, 'exit')
})
+9
-3

@@ -17,8 +17,14 @@ import { join } from 'path'

async function performRequest () {
const res = await request('http://myserver.local', {
dispatcher: agent,
})
await res.body.text()
}
console.time('request')
const responses = []
for (let i = 0; i < 100000; i++) {
responses.push(request('http://myserver.local', {
dispatcher: agent,
}))
responses.push(performRequest())
}

@@ -25,0 +31,0 @@ await Promise.all(responses)

+105
-37

@@ -12,5 +12,8 @@ 'use strict'

const WrapHandler = require('./lib/wrap-handler')
const { MessagePortWritable, MessagePortReadable } = require('./lib/message-port-streams')
const kAddress = Symbol('undici-thread-interceptor.address')
const MAX_BODY = 32 * 1024
function createThreadInterceptor (opts) {

@@ -76,13 +79,14 @@ const routes = new Map()

if (newOpts.body?.[Symbol.asyncIterator]) {
collectBodyAndDispatch(newOpts, handler).then(() => {
port.postMessage({ type: 'request', id, opts: newOpts, threadId })
}, (err) => {
clearTimeout(handle)
hooks.fireOnClientError(newOpts, null, err)
handler.onResponseError(controller, err)
if (typeof newOpts.body?.resume === 'function' || newOpts.body?.[Symbol.asyncIterator]) {
const body = newOpts.body
delete newOpts.body
const transferable = MessagePortWritable.asTransferable({
body
})
port.postMessage({ type: 'request', id, opts: newOpts, port: transferable.port, threadId }, transferable.transferList)
} else {
port.postMessage({ type: 'request', id, opts: newOpts, threadId })
}
const inflights = portInflights.get(port)

@@ -124,4 +128,5 @@

// but we should consider adding a test for this in the future
/* c8 ignore next 4 */
/* c8 ignore next 6 */
if (controller.aborted) {
res.port?.close()
handler.onResponseError(controller, controller.reason)

@@ -131,2 +136,4 @@ return

} catch (err) {
// No need to close the transferable port here, because it cannot happen
// for requests with a body
handler.onResponseError(controller, err)

@@ -136,6 +143,32 @@ return

handler.onResponseData(controller, res.rawPayload)
handler.onResponseEnd(controller, [])
if (res.port) {
const body = new MessagePortReadable({
port: res.port
})
hooks.fireOnClientResponseEnd(newOpts, res, clientCtx)
controller.on('resume', () => {
body.resume()
})
controller.on('pause', () => {
body.pause()
})
body.on('data', (chunk) => {
handler.onResponseData(controller, chunk)
})
body.on('end', () => {
handler.onResponseEnd(controller, [])
hooks.fireOnClientResponseEnd(newOpts, res, clientCtx)
})
body.on('error', (err) => {
handler.onResponseError(controller, err)
})
} else {
handler.onResponseData(controller, res.body)
handler.onResponseEnd(controller, [])
hooks.fireOnClientResponseEnd(newOpts, res, clientCtx)
}
}))

@@ -290,4 +323,11 @@

if (msg.type === 'request') {
const { id, opts } = msg
const { id, opts, port: bodyPort } = msg
let bodyReadable
if (bodyPort) {
bodyReadable = new MessagePortReadable({
port: bodyPort
})
}
const headers = {}

@@ -306,6 +346,7 @@

query: opts.query,
body: opts.body instanceof Uint8Array ? Buffer.from(opts.body) : opts.body,
body: opts.body || bodyReadable,
payloadAsStream: true
}
interceptor.hooks.fireOnServerRequest(injectOpts, () => {
const onInject = (err, res) => {
const onInject = async (err, res) => {
if (err) {

@@ -317,20 +358,48 @@ interceptor.hooks.fireOnServerError(injectOpts, res, err)

const newRes = {
headers: res.headers,
statusCode: res.statusCode,
}
const length = res.headers['content-length']
const parsedLength = length === undefined ? MAX_BODY : Number(length)
if (res.headers['content-type']?.indexOf('application/json') === 0) {
// TODO(mcollina): maybe use a fast path also for HTML
// fast path because it's utf-8, use a string
newRes.rawPayload = res.payload
let newRes
let forwardRes
let transferList
if (parsedLength < MAX_BODY) {
try {
const body = await collectBody(res.stream())
newRes = {
headers: res.headers,
statusCode: res.statusCode,
body
}
forwardRes = {
type: 'response',
id,
res: newRes,
}
} catch (err) {
forwardRes = {
type: 'response',
id,
err
}
}
} else {
// slow path, buffer
newRes.rawPayload = res.rawPayload
}
const transferable = MessagePortWritable.asTransferable({
body: res.stream()
})
transferList = transferable.transferList
const forwardRes = {
type: 'response',
id,
res: newRes,
newRes = {
headers: res.headers,
statusCode: res.statusCode,
port: transferable.port,
}
forwardRes = {
type: 'response',
id,
res: newRes,
}
}

@@ -342,3 +411,3 @@

// that sent the request
this.postMessage(forwardRes)
this.postMessage(forwardRes, transferList)
}

@@ -375,15 +444,14 @@

async function collectBodyAndDispatch (opts) {
async function collectBody (stream) {
const data = []
for await (const chunk of opts.body) {
for await (const chunk of stream) {
data.push(chunk)
}
if (typeof data[0] === 'string') {
opts.body = data.join('')
} else if (data[0] instanceof Buffer || data[0] instanceof Uint8Array) {
opts.body = Buffer.concat(data)
/* c8 ignore next 7 */
if (data[0] instanceof Buffer || data[0] instanceof Uint8Array) {
return Buffer.concat(data)
} else {
throw new Error('Cannot transfer streams of objects')
throw new Error('Cannot transfer streams of strings or objects')
}

@@ -390,0 +458,0 @@ }

'use strict'
class DispatchController {
const { EventEmitter } = require('node:events')
class DispatchController extends EventEmitter {
#paused = false

@@ -22,2 +24,3 @@ #reason = null

this.#paused = true
this.emit('pause')
}

@@ -27,2 +30,3 @@

this.#paused = false
this.emit('resume')
}

@@ -29,0 +33,0 @@

{
"name": "undici-thread-interceptor",
"version": "0.10.3",
"version": "0.11.0",
"description": "An Undici interceptor that routes requests over a worker thread",

@@ -29,5 +29,5 @@ "main": "index.js",

"hyperid": "^3.2.0",
"light-my-request": "^6.0.0",
"light-my-request": "^6.5.1",
"undici": "^7.0.0"
}
}

@@ -435,1 +435,65 @@ 'use strict'

})
test('big stream using backpressure', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())
const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)
const agent = new Agent().compose(interceptor)
const { statusCode, body } = await request('http://myserver.local/big', {
dispatcher: agent,
})
strictEqual(statusCode, 200)
let size = 0
body.on('readable', () => {
let chunk
while ((chunk = body.read()) !== null) {
size += chunk.length
}
})
await once(body, 'end')
strictEqual(size, 1024 * 1024 * 100)
})
test('handles an error within a stream response with a content length', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())
const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)
const agent = new Agent().compose(interceptor)
await rejects(request('http://myserver.local/stream-error', {
dispatcher: agent,
}))
})
test('handle an error with a stream response response without content length', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())
const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)
const agent = new Agent().compose(interceptor)
const res = await request('http://myserver.local/stream-error-2', {
dispatcher: agent,
})
strictEqual(res.statusCode, 200)
await rejects(res.body.text())
})

@@ -18,4 +18,3 @@ 'use strict'

cb()
// cb(null, 'done')
}
})

@@ -15,6 +15,5 @@ 'use strict'

port: parentPort,
onServerResponse: (_req, res) => {
const payload = Buffer.from(res.rawPayload).toString()
console.log('onServerResponse called', JSON.stringify(payload))
onServerResponse: (req) => {
console.log('onServerResponse called', req.url)
}
})

@@ -47,2 +47,49 @@ 'use strict'

app.get('/big', (req, reply) => {
let i = 0
const big = new Readable({
read () {
if (++i > 100) {
this.push(null)
return
}
this.push(Buffer.alloc(1024 * 1024, 'x'))
},
})
return big
})
app.get('/stream-error', (req, reply) => {
// The content-lengh header is necessary to make sure that
// the mesh network collects the whole body
reply.header('content-length', 1024)
let called = false
reply.send(new Readable({
read () {
if (!called) {
called = true
this.push('hello')
setTimeout(() => {
this.destroy(new Error('kaboom'))
}, 1000)
}
},
}))
})
app.get('/stream-error-2', (req, reply) => {
let called = false
reply.send(new Readable({
read () {
if (!called) {
called = true
this.push('hello')
setTimeout(() => {
this.destroy(new Error('kaboom'))
}, 1000)
}
},
}))
})
app.post('/echo-body', (req, reply) => {

@@ -49,0 +96,0 @@ reply.send(req.body)

@@ -94,4 +94,4 @@ 'use strict'

domain: '.local',
onClientResponse: (_req, res) => {
hookCalled = Buffer.from(res.rawPayload).toString()
onClientResponse: (req) => {
hookCalled = req.path
}

@@ -107,3 +107,3 @@ })

strictEqual(statusCode, 200)
deepStrictEqual(hookCalled, '{"hello":"world"}')
strictEqual(hookCalled, '/')
})

@@ -119,3 +119,3 @@

onClientResponseEnd: (_req, res) => {
hookCalled = Buffer.from(res.rawPayload).toString()
hookCalled = true
}

@@ -131,3 +131,3 @@ })

strictEqual(statusCode, 200)
deepStrictEqual(hookCalled, '{"hello":"world"}')
strictEqual(hookCalled, true)
})

@@ -182,3 +182,3 @@

await sleep(300)
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"}}'])
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"},"payloadAsStream":true}'])
})

@@ -208,3 +208,3 @@

await sleep(300)
deepStrictEqual(lines, ['onServerResponse called "{\\"hello\\":\\"world\\"}"'])
deepStrictEqual(lines, ['onServerResponse called /'])
})

@@ -257,3 +257,3 @@

await sleep(300)
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"}}', 'onServerResponse called: propagated'])
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"},"payloadAsStream":true}', 'onServerResponse called: propagated'])
})

@@ -260,0 +260,0 @@

@@ -70,3 +70,3 @@ 'use strict'

await rejects(request('http://myserver.local/echo-body', {
const res = await request('http://myserver.local/echo-body', {
dispatcher: agent,

@@ -82,3 +82,10 @@ method: 'POST',

}),
}))
})
strictEqual(res.statusCode, 400)
deepStrictEqual(await res.body.json(), {
statusCode: 400,
error: 'Bad Request',
message: 'kaboom',
})
})

@@ -110,3 +117,4 @@

test('POST errors with streams of objects', async (t) => {
// Unskip when https://github.com/nodejs/node/pull/55270 is released
test.skip('POST errors with streams of objects', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))

@@ -128,4 +136,29 @@ t.after(() => worker.terminate())

},
body: Readable.from([{ hello: 'world' }]),
body: Readable.from([{ hello: 'world' }])
}))
})
test('correctly handles aborted requests', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())
const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)
const agent = new Agent().compose(interceptor)
const abortController = new AbortController()
setImmediate(() => abortController.abort())
await rejects(request('http://myserver.local/unfinished-business', {
dispatcher: agent,
signal: abortController.signal,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: JSON.stringify({ hello: 'world' })
}))
})