undici-thread-interceptor
Advanced tools
Comparing version
142
index.js
@@ -12,5 +12,8 @@ 'use strict' | ||
const WrapHandler = require('./lib/wrap-handler') | ||
const { MessagePortWritable, MessagePortReadable } = require('./lib/message-port-streams') | ||
const kAddress = Symbol('undici-thread-interceptor.address') | ||
const MAX_BODY = 32 * 1024 | ||
function createThreadInterceptor (opts) { | ||
@@ -76,13 +79,14 @@ const routes = new Map() | ||
if (newOpts.body?.[Symbol.asyncIterator]) { | ||
collectBodyAndDispatch(newOpts, handler).then(() => { | ||
port.postMessage({ type: 'request', id, opts: newOpts, threadId }) | ||
}, (err) => { | ||
clearTimeout(handle) | ||
hooks.fireOnClientError(newOpts, null, err) | ||
handler.onResponseError(controller, err) | ||
if (typeof newOpts.body?.resume === 'function' || newOpts.body?.[Symbol.asyncIterator]) { | ||
const body = newOpts.body | ||
delete newOpts.body | ||
const transferable = MessagePortWritable.asTransferable({ | ||
body | ||
}) | ||
port.postMessage({ type: 'request', id, opts: newOpts, port: transferable.port, threadId }, transferable.transferList) | ||
} else { | ||
port.postMessage({ type: 'request', id, opts: newOpts, threadId }) | ||
} | ||
const inflights = portInflights.get(port) | ||
@@ -124,4 +128,5 @@ | ||
// but we should consider adding a test for this in the future | ||
/* c8 ignore next 4 */ | ||
/* c8 ignore next 6 */ | ||
if (controller.aborted) { | ||
res.port?.close() | ||
handler.onResponseError(controller, controller.reason) | ||
@@ -131,2 +136,4 @@ return | ||
} catch (err) { | ||
// No need to close the transferable port here, because it cannot happen | ||
// for requests with a body | ||
handler.onResponseError(controller, err) | ||
@@ -136,6 +143,32 @@ return | ||
handler.onResponseData(controller, res.rawPayload) | ||
handler.onResponseEnd(controller, []) | ||
if (res.port) { | ||
const body = new MessagePortReadable({ | ||
port: res.port | ||
}) | ||
hooks.fireOnClientResponseEnd(newOpts, res, clientCtx) | ||
controller.on('resume', () => { | ||
body.resume() | ||
}) | ||
controller.on('pause', () => { | ||
body.pause() | ||
}) | ||
body.on('data', (chunk) => { | ||
handler.onResponseData(controller, chunk) | ||
}) | ||
body.on('end', () => { | ||
handler.onResponseEnd(controller, []) | ||
hooks.fireOnClientResponseEnd(newOpts, res, clientCtx) | ||
}) | ||
body.on('error', (err) => { | ||
handler.onResponseError(controller, err) | ||
}) | ||
} else { | ||
handler.onResponseData(controller, res.body) | ||
handler.onResponseEnd(controller, []) | ||
hooks.fireOnClientResponseEnd(newOpts, res, clientCtx) | ||
} | ||
})) | ||
@@ -290,4 +323,11 @@ | ||
if (msg.type === 'request') { | ||
const { id, opts } = msg | ||
const { id, opts, port: bodyPort } = msg | ||
let bodyReadable | ||
if (bodyPort) { | ||
bodyReadable = new MessagePortReadable({ | ||
port: bodyPort | ||
}) | ||
} | ||
const headers = {} | ||
@@ -306,6 +346,7 @@ | ||
query: opts.query, | ||
body: opts.body instanceof Uint8Array ? Buffer.from(opts.body) : opts.body, | ||
body: opts.body || bodyReadable, | ||
payloadAsStream: true | ||
} | ||
interceptor.hooks.fireOnServerRequest(injectOpts, () => { | ||
const onInject = (err, res) => { | ||
const onInject = async (err, res) => { | ||
if (err) { | ||
@@ -317,20 +358,48 @@ interceptor.hooks.fireOnServerError(injectOpts, res, err) | ||
const newRes = { | ||
headers: res.headers, | ||
statusCode: res.statusCode, | ||
} | ||
const length = res.headers['content-length'] | ||
const parsedLength = length === undefined ? MAX_BODY : Number(length) | ||
if (res.headers['content-type']?.indexOf('application/json') === 0) { | ||
// TODO(mcollina): maybe use a fast path also for HTML | ||
// fast path because it's utf-8, use a string | ||
newRes.rawPayload = res.payload | ||
let newRes | ||
let forwardRes | ||
let transferList | ||
if (parsedLength < MAX_BODY) { | ||
try { | ||
const body = await collectBody(res.stream()) | ||
newRes = { | ||
headers: res.headers, | ||
statusCode: res.statusCode, | ||
body | ||
} | ||
forwardRes = { | ||
type: 'response', | ||
id, | ||
res: newRes, | ||
} | ||
} catch (err) { | ||
forwardRes = { | ||
type: 'response', | ||
id, | ||
err | ||
} | ||
} | ||
} else { | ||
// slow path, buffer | ||
newRes.rawPayload = res.rawPayload | ||
} | ||
const transferable = MessagePortWritable.asTransferable({ | ||
body: res.stream() | ||
}) | ||
transferList = transferable.transferList | ||
const forwardRes = { | ||
type: 'response', | ||
id, | ||
res: newRes, | ||
newRes = { | ||
headers: res.headers, | ||
statusCode: res.statusCode, | ||
port: transferable.port, | ||
} | ||
forwardRes = { | ||
type: 'response', | ||
id, | ||
res: newRes, | ||
} | ||
} | ||
@@ -342,3 +411,3 @@ | ||
// that sent the request | ||
this.postMessage(forwardRes) | ||
this.postMessage(forwardRes, transferList) | ||
} | ||
@@ -375,15 +444,14 @@ | ||
async function collectBodyAndDispatch (opts) { | ||
async function collectBody (stream) { | ||
const data = [] | ||
for await (const chunk of opts.body) { | ||
for await (const chunk of stream) { | ||
data.push(chunk) | ||
} | ||
if (typeof data[0] === 'string') { | ||
opts.body = data.join('') | ||
} else if (data[0] instanceof Buffer || data[0] instanceof Uint8Array) { | ||
opts.body = Buffer.concat(data) | ||
/* c8 ignore next 7 */ | ||
if (data[0] instanceof Buffer || data[0] instanceof Uint8Array) { | ||
return Buffer.concat(data) | ||
} else { | ||
throw new Error('Cannot transfer streams of objects') | ||
throw new Error('Cannot transfer streams of strings or objects') | ||
} | ||
@@ -390,0 +458,0 @@ } |
'use strict' | ||
class DispatchController { | ||
const { EventEmitter } = require('node:events') | ||
class DispatchController extends EventEmitter { | ||
#paused = false | ||
@@ -22,2 +24,3 @@ #reason = null | ||
this.#paused = true | ||
this.emit('pause') | ||
} | ||
@@ -27,2 +30,3 @@ | ||
this.#paused = false | ||
this.emit('resume') | ||
} | ||
@@ -29,0 +33,0 @@ |
{ | ||
"name": "undici-thread-interceptor", | ||
"version": "0.10.3", | ||
"version": "0.11.0", | ||
"description": "An Undici interceptor that routes requests over a worker thread", | ||
@@ -29,5 +29,5 @@ "main": "index.js", | ||
"hyperid": "^3.2.0", | ||
"light-my-request": "^6.0.0", | ||
"light-my-request": "^6.5.1", | ||
"undici": "^7.0.0" | ||
} | ||
} |
@@ -435,1 +435,65 @@ 'use strict' | ||
}) | ||
test('big stream using backpressure', async (t) => { | ||
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
t.after(() => worker.terminate()) | ||
const interceptor = createThreadInterceptor({ | ||
domain: '.local', | ||
}) | ||
interceptor.route('myserver', worker) | ||
const agent = new Agent().compose(interceptor) | ||
const { statusCode, body } = await request('http://myserver.local/big', { | ||
dispatcher: agent, | ||
}) | ||
strictEqual(statusCode, 200) | ||
let size = 0 | ||
body.on('readable', () => { | ||
let chunk | ||
while ((chunk = body.read()) !== null) { | ||
size += chunk.length | ||
} | ||
}) | ||
await once(body, 'end') | ||
strictEqual(size, 1024 * 1024 * 100) | ||
}) | ||
test('handles an error within a stream response with a content length', async (t) => { | ||
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
t.after(() => worker.terminate()) | ||
const interceptor = createThreadInterceptor({ | ||
domain: '.local', | ||
}) | ||
interceptor.route('myserver', worker) | ||
const agent = new Agent().compose(interceptor) | ||
await rejects(request('http://myserver.local/stream-error', { | ||
dispatcher: agent, | ||
})) | ||
}) | ||
test('handle an error with a stream response response without content length', async (t) => { | ||
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
t.after(() => worker.terminate()) | ||
const interceptor = createThreadInterceptor({ | ||
domain: '.local', | ||
}) | ||
interceptor.route('myserver', worker) | ||
const agent = new Agent().compose(interceptor) | ||
const res = await request('http://myserver.local/stream-error-2', { | ||
dispatcher: agent, | ||
}) | ||
strictEqual(res.statusCode, 200) | ||
await rejects(res.body.text()) | ||
}) |
@@ -18,4 +18,3 @@ 'use strict' | ||
cb() | ||
// cb(null, 'done') | ||
} | ||
}) |
@@ -15,6 +15,5 @@ 'use strict' | ||
port: parentPort, | ||
onServerResponse: (_req, res) => { | ||
const payload = Buffer.from(res.rawPayload).toString() | ||
console.log('onServerResponse called', JSON.stringify(payload)) | ||
onServerResponse: (req) => { | ||
console.log('onServerResponse called', req.url) | ||
} | ||
}) |
@@ -47,2 +47,49 @@ 'use strict' | ||
app.get('/big', (req, reply) => { | ||
let i = 0 | ||
const big = new Readable({ | ||
read () { | ||
if (++i > 100) { | ||
this.push(null) | ||
return | ||
} | ||
this.push(Buffer.alloc(1024 * 1024, 'x')) | ||
}, | ||
}) | ||
return big | ||
}) | ||
app.get('/stream-error', (req, reply) => { | ||
// The content-lengh header is necessary to make sure that | ||
// the mesh network collects the whole body | ||
reply.header('content-length', 1024) | ||
let called = false | ||
reply.send(new Readable({ | ||
read () { | ||
if (!called) { | ||
called = true | ||
this.push('hello') | ||
setTimeout(() => { | ||
this.destroy(new Error('kaboom')) | ||
}, 1000) | ||
} | ||
}, | ||
})) | ||
}) | ||
app.get('/stream-error-2', (req, reply) => { | ||
let called = false | ||
reply.send(new Readable({ | ||
read () { | ||
if (!called) { | ||
called = true | ||
this.push('hello') | ||
setTimeout(() => { | ||
this.destroy(new Error('kaboom')) | ||
}, 1000) | ||
} | ||
}, | ||
})) | ||
}) | ||
app.post('/echo-body', (req, reply) => { | ||
@@ -49,0 +96,0 @@ reply.send(req.body) |
@@ -94,4 +94,4 @@ 'use strict' | ||
domain: '.local', | ||
onClientResponse: (_req, res) => { | ||
hookCalled = Buffer.from(res.rawPayload).toString() | ||
onClientResponse: (req) => { | ||
hookCalled = req.path | ||
} | ||
@@ -107,3 +107,3 @@ }) | ||
strictEqual(statusCode, 200) | ||
deepStrictEqual(hookCalled, '{"hello":"world"}') | ||
strictEqual(hookCalled, '/') | ||
}) | ||
@@ -119,3 +119,3 @@ | ||
onClientResponseEnd: (_req, res) => { | ||
hookCalled = Buffer.from(res.rawPayload).toString() | ||
hookCalled = true | ||
} | ||
@@ -131,3 +131,3 @@ }) | ||
strictEqual(statusCode, 200) | ||
deepStrictEqual(hookCalled, '{"hello":"world"}') | ||
strictEqual(hookCalled, true) | ||
}) | ||
@@ -182,3 +182,3 @@ | ||
await sleep(300) | ||
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"}}']) | ||
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"},"payloadAsStream":true}']) | ||
}) | ||
@@ -208,3 +208,3 @@ | ||
await sleep(300) | ||
deepStrictEqual(lines, ['onServerResponse called "{\\"hello\\":\\"world\\"}"']) | ||
deepStrictEqual(lines, ['onServerResponse called /']) | ||
}) | ||
@@ -257,3 +257,3 @@ | ||
await sleep(300) | ||
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"}}', 'onServerResponse called: propagated']) | ||
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"},"payloadAsStream":true}', 'onServerResponse called: propagated']) | ||
}) | ||
@@ -260,0 +260,0 @@ |
@@ -70,3 +70,3 @@ 'use strict' | ||
await rejects(request('http://myserver.local/echo-body', { | ||
const res = await request('http://myserver.local/echo-body', { | ||
dispatcher: agent, | ||
@@ -82,3 +82,10 @@ method: 'POST', | ||
}), | ||
})) | ||
}) | ||
strictEqual(res.statusCode, 400) | ||
deepStrictEqual(await res.body.json(), { | ||
statusCode: 400, | ||
error: 'Bad Request', | ||
message: 'kaboom', | ||
}) | ||
}) | ||
@@ -110,3 +117,4 @@ | ||
test('POST errors with streams of objects', async (t) => { | ||
// Unskip when https://github.com/nodejs/node/pull/55270 is released | ||
test.skip('POST errors with streams of objects', async (t) => { | ||
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
@@ -128,4 +136,29 @@ t.after(() => worker.terminate()) | ||
}, | ||
body: Readable.from([{ hello: 'world' }]), | ||
body: Readable.from([{ hello: 'world' }]) | ||
})) | ||
}) | ||
test('correctly handles aborted requests', async (t) => { | ||
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
t.after(() => worker.terminate()) | ||
const interceptor = createThreadInterceptor({ | ||
domain: '.local', | ||
}) | ||
interceptor.route('myserver', worker) | ||
const agent = new Agent().compose(interceptor) | ||
const abortController = new AbortController() | ||
setImmediate(() => abortController.abort()) | ||
await rejects(request('http://myserver.local/unfinished-business', { | ||
dispatcher: agent, | ||
signal: abortController.signal, | ||
method: 'POST', | ||
headers: { | ||
'content-type': 'application/json', | ||
}, | ||
body: JSON.stringify({ hello: 'world' }) | ||
})) | ||
}) |
Sorry, the diff of this file is not supported yet
URL strings
Supply chain riskPackage contains fragments of external URLs or IP addresses, which the package may be accessing at runtime.
Found 1 instance in 1 package
URL strings
Supply chain riskPackage contains fragments of external URLs or IP addresses, which the package may be accessing at runtime.
Found 1 instance in 1 package
89677
20.54%49
22.5%2493
24.84%Updated