Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

undici-thread-interceptor

Package Overview
Dependencies
Maintainers
8
Versions
22
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

undici-thread-interceptor - npm Package Compare versions

Comparing version 0.10.3 to 0.11.0

lib/message-port-streams.js

142

index.js

@@ -12,5 +12,8 @@ 'use strict'

const WrapHandler = require('./lib/wrap-handler')
const { MessagePortWritable, MessagePortReadable } = require('./lib/message-port-streams')
const kAddress = Symbol('undici-thread-interceptor.address')
const MAX_BODY = 32 * 1024
function createThreadInterceptor (opts) {

@@ -76,13 +79,14 @@ const routes = new Map()

if (newOpts.body?.[Symbol.asyncIterator]) {
collectBodyAndDispatch(newOpts, handler).then(() => {
port.postMessage({ type: 'request', id, opts: newOpts, threadId })
}, (err) => {
clearTimeout(handle)
hooks.fireOnClientError(newOpts, null, err)
handler.onResponseError(controller, err)
if (typeof newOpts.body?.resume === 'function' || newOpts.body?.[Symbol.asyncIterator]) {
const body = newOpts.body
delete newOpts.body
const transferable = MessagePortWritable.asTransferable({
body
})
port.postMessage({ type: 'request', id, opts: newOpts, port: transferable.port, threadId }, transferable.transferList)
} else {
port.postMessage({ type: 'request', id, opts: newOpts, threadId })
}
const inflights = portInflights.get(port)

@@ -124,4 +128,5 @@

// but we should consider adding a test for this in the future
/* c8 ignore next 4 */
/* c8 ignore next 6 */
if (controller.aborted) {
res.port?.close()
handler.onResponseError(controller, controller.reason)

@@ -131,2 +136,4 @@ return

} catch (err) {
// No need to close the transferable port here, because it cannot happen
// for requests with a body
handler.onResponseError(controller, err)

@@ -136,6 +143,32 @@ return

handler.onResponseData(controller, res.rawPayload)
handler.onResponseEnd(controller, [])
if (res.port) {
const body = new MessagePortReadable({
port: res.port
})
hooks.fireOnClientResponseEnd(newOpts, res, clientCtx)
controller.on('resume', () => {
body.resume()
})
controller.on('pause', () => {
body.pause()
})
body.on('data', (chunk) => {
handler.onResponseData(controller, chunk)
})
body.on('end', () => {
handler.onResponseEnd(controller, [])
hooks.fireOnClientResponseEnd(newOpts, res, clientCtx)
})
body.on('error', (err) => {
handler.onResponseError(controller, err)
})
} else {
handler.onResponseData(controller, res.body)
handler.onResponseEnd(controller, [])
hooks.fireOnClientResponseEnd(newOpts, res, clientCtx)
}
}))

@@ -290,4 +323,11 @@

if (msg.type === 'request') {
const { id, opts } = msg
const { id, opts, port: bodyPort } = msg
let bodyReadable
if (bodyPort) {
bodyReadable = new MessagePortReadable({
port: bodyPort
})
}
const headers = {}

@@ -306,6 +346,7 @@

query: opts.query,
body: opts.body instanceof Uint8Array ? Buffer.from(opts.body) : opts.body,
body: opts.body || bodyReadable,
payloadAsStream: true
}
interceptor.hooks.fireOnServerRequest(injectOpts, () => {
const onInject = (err, res) => {
const onInject = async (err, res) => {
if (err) {

@@ -317,20 +358,48 @@ interceptor.hooks.fireOnServerError(injectOpts, res, err)

const newRes = {
headers: res.headers,
statusCode: res.statusCode,
}
const length = res.headers['content-length']
const parsedLength = length === undefined ? MAX_BODY : Number(length)
if (res.headers['content-type']?.indexOf('application/json') === 0) {
// TODO(mcollina): maybe use a fast path also for HTML
// fast path because it's utf-8, use a string
newRes.rawPayload = res.payload
let newRes
let forwardRes
let transferList
if (parsedLength < MAX_BODY) {
try {
const body = await collectBody(res.stream())
newRes = {
headers: res.headers,
statusCode: res.statusCode,
body
}
forwardRes = {
type: 'response',
id,
res: newRes,
}
} catch (err) {
forwardRes = {
type: 'response',
id,
err
}
}
} else {
// slow path, buffer
newRes.rawPayload = res.rawPayload
}
const transferable = MessagePortWritable.asTransferable({
body: res.stream()
})
transferList = transferable.transferList
const forwardRes = {
type: 'response',
id,
res: newRes,
newRes = {
headers: res.headers,
statusCode: res.statusCode,
port: transferable.port,
}
forwardRes = {
type: 'response',
id,
res: newRes,
}
}

@@ -342,3 +411,3 @@

// that sent the request
this.postMessage(forwardRes)
this.postMessage(forwardRes, transferList)
}

@@ -375,15 +444,14 @@

async function collectBodyAndDispatch (opts) {
async function collectBody (stream) {
const data = []
for await (const chunk of opts.body) {
for await (const chunk of stream) {
data.push(chunk)
}
if (typeof data[0] === 'string') {
opts.body = data.join('')
} else if (data[0] instanceof Buffer || data[0] instanceof Uint8Array) {
opts.body = Buffer.concat(data)
/* c8 ignore next 7 */
if (data[0] instanceof Buffer || data[0] instanceof Uint8Array) {
return Buffer.concat(data)
} else {
throw new Error('Cannot transfer streams of objects')
throw new Error('Cannot transfer streams of strings or objects')
}

@@ -390,0 +458,0 @@ }

'use strict'
class DispatchController {
const { EventEmitter } = require('node:events')
class DispatchController extends EventEmitter {
#paused = false

@@ -22,2 +24,3 @@ #reason = null

this.#paused = true
this.emit('pause')
}

@@ -27,2 +30,3 @@

this.#paused = false
this.emit('resume')
}

@@ -29,0 +33,0 @@

{
"name": "undici-thread-interceptor",
"version": "0.10.3",
"version": "0.11.0",
"description": "An Undici interceptor that routes requests over a worker thread",

@@ -29,5 +29,5 @@ "main": "index.js",

"hyperid": "^3.2.0",
"light-my-request": "^6.0.0",
"light-my-request": "^6.5.1",
"undici": "^7.0.0"
}
}

@@ -435,1 +435,65 @@ 'use strict'

})
test('big stream using backpressure', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())
const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)
const agent = new Agent().compose(interceptor)
const { statusCode, body } = await request('http://myserver.local/big', {
dispatcher: agent,
})
strictEqual(statusCode, 200)
let size = 0
body.on('readable', () => {
let chunk
while ((chunk = body.read()) !== null) {
size += chunk.length
}
})
await once(body, 'end')
strictEqual(size, 1024 * 1024 * 100)
})
test('handles an error within a stream response with a content length', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())
const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)
const agent = new Agent().compose(interceptor)
await rejects(request('http://myserver.local/stream-error', {
dispatcher: agent,
}))
})
test('handle an error with a stream response response without content length', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())
const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)
const agent = new Agent().compose(interceptor)
const res = await request('http://myserver.local/stream-error-2', {
dispatcher: agent,
})
strictEqual(res.statusCode, 200)
await rejects(res.body.text())
})

@@ -18,4 +18,3 @@ 'use strict'

cb()
// cb(null, 'done')
}
})

@@ -15,6 +15,5 @@ 'use strict'

port: parentPort,
onServerResponse: (_req, res) => {
const payload = Buffer.from(res.rawPayload).toString()
console.log('onServerResponse called', JSON.stringify(payload))
onServerResponse: (req) => {
console.log('onServerResponse called', req.url)
}
})

@@ -47,2 +47,49 @@ 'use strict'

app.get('/big', (req, reply) => {
let i = 0
const big = new Readable({
read () {
if (++i > 100) {
this.push(null)
return
}
this.push(Buffer.alloc(1024 * 1024, 'x'))
},
})
return big
})
app.get('/stream-error', (req, reply) => {
// The content-lengh header is necessary to make sure that
// the mesh network collects the whole body
reply.header('content-length', 1024)
let called = false
reply.send(new Readable({
read () {
if (!called) {
called = true
this.push('hello')
setTimeout(() => {
this.destroy(new Error('kaboom'))
}, 1000)
}
},
}))
})
app.get('/stream-error-2', (req, reply) => {
let called = false
reply.send(new Readable({
read () {
if (!called) {
called = true
this.push('hello')
setTimeout(() => {
this.destroy(new Error('kaboom'))
}, 1000)
}
},
}))
})
app.post('/echo-body', (req, reply) => {

@@ -49,0 +96,0 @@ reply.send(req.body)

@@ -94,4 +94,4 @@ 'use strict'

domain: '.local',
onClientResponse: (_req, res) => {
hookCalled = Buffer.from(res.rawPayload).toString()
onClientResponse: (req) => {
hookCalled = req.path
}

@@ -107,3 +107,3 @@ })

strictEqual(statusCode, 200)
deepStrictEqual(hookCalled, '{"hello":"world"}')
strictEqual(hookCalled, '/')
})

@@ -119,3 +119,3 @@

onClientResponseEnd: (_req, res) => {
hookCalled = Buffer.from(res.rawPayload).toString()
hookCalled = true
}

@@ -131,3 +131,3 @@ })

strictEqual(statusCode, 200)
deepStrictEqual(hookCalled, '{"hello":"world"}')
strictEqual(hookCalled, true)
})

@@ -182,3 +182,3 @@

await sleep(300)
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"}}'])
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"},"payloadAsStream":true}'])
})

@@ -208,3 +208,3 @@

await sleep(300)
deepStrictEqual(lines, ['onServerResponse called "{\\"hello\\":\\"world\\"}"'])
deepStrictEqual(lines, ['onServerResponse called /'])
})

@@ -257,3 +257,3 @@

await sleep(300)
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"}}', 'onServerResponse called: propagated'])
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"},"payloadAsStream":true}', 'onServerResponse called: propagated'])
})

@@ -260,0 +260,0 @@

@@ -70,3 +70,3 @@ 'use strict'

await rejects(request('http://myserver.local/echo-body', {
const res = await request('http://myserver.local/echo-body', {
dispatcher: agent,

@@ -82,3 +82,10 @@ method: 'POST',

}),
}))
})
strictEqual(res.statusCode, 400)
deepStrictEqual(await res.body.json(), {
statusCode: 400,
error: 'Bad Request',
message: 'kaboom',
})
})

@@ -110,3 +117,4 @@

test('POST errors with streams of objects', async (t) => {
// Unskip when https://github.com/nodejs/node/pull/55270 is released
test.skip('POST errors with streams of objects', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))

@@ -128,4 +136,29 @@ t.after(() => worker.terminate())

},
body: Readable.from([{ hello: 'world' }]),
body: Readable.from([{ hello: 'world' }])
}))
})
test('correctly handles aborted requests', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())
const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)
const agent = new Agent().compose(interceptor)
const abortController = new AbortController()
setImmediate(() => abortController.abort())
await rejects(request('http://myserver.local/unfinished-business', {
dispatcher: agent,
signal: abortController.signal,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: JSON.stringify({ hello: 'world' })
}))
})

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc