undici-thread-interceptor
Advanced tools
Comparing version 0.10.3 to 0.11.0
142
index.js
@@ -12,5 +12,8 @@ 'use strict' | ||
const WrapHandler = require('./lib/wrap-handler') | ||
const { MessagePortWritable, MessagePortReadable } = require('./lib/message-port-streams') | ||
const kAddress = Symbol('undici-thread-interceptor.address') | ||
const MAX_BODY = 32 * 1024 | ||
function createThreadInterceptor (opts) { | ||
@@ -76,13 +79,14 @@ const routes = new Map() | ||
if (newOpts.body?.[Symbol.asyncIterator]) { | ||
collectBodyAndDispatch(newOpts, handler).then(() => { | ||
port.postMessage({ type: 'request', id, opts: newOpts, threadId }) | ||
}, (err) => { | ||
clearTimeout(handle) | ||
hooks.fireOnClientError(newOpts, null, err) | ||
handler.onResponseError(controller, err) | ||
if (typeof newOpts.body?.resume === 'function' || newOpts.body?.[Symbol.asyncIterator]) { | ||
const body = newOpts.body | ||
delete newOpts.body | ||
const transferable = MessagePortWritable.asTransferable({ | ||
body | ||
}) | ||
port.postMessage({ type: 'request', id, opts: newOpts, port: transferable.port, threadId }, transferable.transferList) | ||
} else { | ||
port.postMessage({ type: 'request', id, opts: newOpts, threadId }) | ||
} | ||
const inflights = portInflights.get(port) | ||
@@ -124,4 +128,5 @@ | ||
// but we should consider adding a test for this in the future | ||
/* c8 ignore next 4 */ | ||
/* c8 ignore next 6 */ | ||
if (controller.aborted) { | ||
res.port?.close() | ||
handler.onResponseError(controller, controller.reason) | ||
@@ -131,2 +136,4 @@ return | ||
} catch (err) { | ||
// No need to close the transferable port here, because it cannot happen | ||
// for requests with a body | ||
handler.onResponseError(controller, err) | ||
@@ -136,6 +143,32 @@ return | ||
handler.onResponseData(controller, res.rawPayload) | ||
handler.onResponseEnd(controller, []) | ||
if (res.port) { | ||
const body = new MessagePortReadable({ | ||
port: res.port | ||
}) | ||
hooks.fireOnClientResponseEnd(newOpts, res, clientCtx) | ||
controller.on('resume', () => { | ||
body.resume() | ||
}) | ||
controller.on('pause', () => { | ||
body.pause() | ||
}) | ||
body.on('data', (chunk) => { | ||
handler.onResponseData(controller, chunk) | ||
}) | ||
body.on('end', () => { | ||
handler.onResponseEnd(controller, []) | ||
hooks.fireOnClientResponseEnd(newOpts, res, clientCtx) | ||
}) | ||
body.on('error', (err) => { | ||
handler.onResponseError(controller, err) | ||
}) | ||
} else { | ||
handler.onResponseData(controller, res.body) | ||
handler.onResponseEnd(controller, []) | ||
hooks.fireOnClientResponseEnd(newOpts, res, clientCtx) | ||
} | ||
})) | ||
@@ -290,4 +323,11 @@ | ||
if (msg.type === 'request') { | ||
const { id, opts } = msg | ||
const { id, opts, port: bodyPort } = msg | ||
let bodyReadable | ||
if (bodyPort) { | ||
bodyReadable = new MessagePortReadable({ | ||
port: bodyPort | ||
}) | ||
} | ||
const headers = {} | ||
@@ -306,6 +346,7 @@ | ||
query: opts.query, | ||
body: opts.body instanceof Uint8Array ? Buffer.from(opts.body) : opts.body, | ||
body: opts.body || bodyReadable, | ||
payloadAsStream: true | ||
} | ||
interceptor.hooks.fireOnServerRequest(injectOpts, () => { | ||
const onInject = (err, res) => { | ||
const onInject = async (err, res) => { | ||
if (err) { | ||
@@ -317,20 +358,48 @@ interceptor.hooks.fireOnServerError(injectOpts, res, err) | ||
const newRes = { | ||
headers: res.headers, | ||
statusCode: res.statusCode, | ||
} | ||
const length = res.headers['content-length'] | ||
const parsedLength = length === undefined ? MAX_BODY : Number(length) | ||
if (res.headers['content-type']?.indexOf('application/json') === 0) { | ||
// TODO(mcollina): maybe use a fast path also for HTML | ||
// fast path because it's utf-8, use a string | ||
newRes.rawPayload = res.payload | ||
let newRes | ||
let forwardRes | ||
let transferList | ||
if (parsedLength < MAX_BODY) { | ||
try { | ||
const body = await collectBody(res.stream()) | ||
newRes = { | ||
headers: res.headers, | ||
statusCode: res.statusCode, | ||
body | ||
} | ||
forwardRes = { | ||
type: 'response', | ||
id, | ||
res: newRes, | ||
} | ||
} catch (err) { | ||
forwardRes = { | ||
type: 'response', | ||
id, | ||
err | ||
} | ||
} | ||
} else { | ||
// slow path, buffer | ||
newRes.rawPayload = res.rawPayload | ||
} | ||
const transferable = MessagePortWritable.asTransferable({ | ||
body: res.stream() | ||
}) | ||
transferList = transferable.transferList | ||
const forwardRes = { | ||
type: 'response', | ||
id, | ||
res: newRes, | ||
newRes = { | ||
headers: res.headers, | ||
statusCode: res.statusCode, | ||
port: transferable.port, | ||
} | ||
forwardRes = { | ||
type: 'response', | ||
id, | ||
res: newRes, | ||
} | ||
} | ||
@@ -342,3 +411,3 @@ | ||
// that sent the request | ||
this.postMessage(forwardRes) | ||
this.postMessage(forwardRes, transferList) | ||
} | ||
@@ -375,15 +444,14 @@ | ||
async function collectBodyAndDispatch (opts) { | ||
async function collectBody (stream) { | ||
const data = [] | ||
for await (const chunk of opts.body) { | ||
for await (const chunk of stream) { | ||
data.push(chunk) | ||
} | ||
if (typeof data[0] === 'string') { | ||
opts.body = data.join('') | ||
} else if (data[0] instanceof Buffer || data[0] instanceof Uint8Array) { | ||
opts.body = Buffer.concat(data) | ||
/* c8 ignore next 7 */ | ||
if (data[0] instanceof Buffer || data[0] instanceof Uint8Array) { | ||
return Buffer.concat(data) | ||
} else { | ||
throw new Error('Cannot transfer streams of objects') | ||
throw new Error('Cannot transfer streams of strings or objects') | ||
} | ||
@@ -390,0 +458,0 @@ } |
'use strict' | ||
class DispatchController { | ||
const { EventEmitter } = require('node:events') | ||
class DispatchController extends EventEmitter { | ||
#paused = false | ||
@@ -22,2 +24,3 @@ #reason = null | ||
this.#paused = true | ||
this.emit('pause') | ||
} | ||
@@ -27,2 +30,3 @@ | ||
this.#paused = false | ||
this.emit('resume') | ||
} | ||
@@ -29,0 +33,0 @@ |
{ | ||
"name": "undici-thread-interceptor", | ||
"version": "0.10.3", | ||
"version": "0.11.0", | ||
"description": "An Undici interceptor that routes requests over a worker thread", | ||
@@ -29,5 +29,5 @@ "main": "index.js", | ||
"hyperid": "^3.2.0", | ||
"light-my-request": "^6.0.0", | ||
"light-my-request": "^6.5.1", | ||
"undici": "^7.0.0" | ||
} | ||
} |
@@ -435,1 +435,65 @@ 'use strict' | ||
}) | ||
test('big stream using backpressure', async (t) => { | ||
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
t.after(() => worker.terminate()) | ||
const interceptor = createThreadInterceptor({ | ||
domain: '.local', | ||
}) | ||
interceptor.route('myserver', worker) | ||
const agent = new Agent().compose(interceptor) | ||
const { statusCode, body } = await request('http://myserver.local/big', { | ||
dispatcher: agent, | ||
}) | ||
strictEqual(statusCode, 200) | ||
let size = 0 | ||
body.on('readable', () => { | ||
let chunk | ||
while ((chunk = body.read()) !== null) { | ||
size += chunk.length | ||
} | ||
}) | ||
await once(body, 'end') | ||
strictEqual(size, 1024 * 1024 * 100) | ||
}) | ||
test('handles an error within a stream response with a content length', async (t) => { | ||
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
t.after(() => worker.terminate()) | ||
const interceptor = createThreadInterceptor({ | ||
domain: '.local', | ||
}) | ||
interceptor.route('myserver', worker) | ||
const agent = new Agent().compose(interceptor) | ||
await rejects(request('http://myserver.local/stream-error', { | ||
dispatcher: agent, | ||
})) | ||
}) | ||
test('handle an error with a stream response response without content length', async (t) => { | ||
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
t.after(() => worker.terminate()) | ||
const interceptor = createThreadInterceptor({ | ||
domain: '.local', | ||
}) | ||
interceptor.route('myserver', worker) | ||
const agent = new Agent().compose(interceptor) | ||
const res = await request('http://myserver.local/stream-error-2', { | ||
dispatcher: agent, | ||
}) | ||
strictEqual(res.statusCode, 200) | ||
await rejects(res.body.text()) | ||
}) |
@@ -18,4 +18,3 @@ 'use strict' | ||
cb() | ||
// cb(null, 'done') | ||
} | ||
}) |
@@ -15,6 +15,5 @@ 'use strict' | ||
port: parentPort, | ||
onServerResponse: (_req, res) => { | ||
const payload = Buffer.from(res.rawPayload).toString() | ||
console.log('onServerResponse called', JSON.stringify(payload)) | ||
onServerResponse: (req) => { | ||
console.log('onServerResponse called', req.url) | ||
} | ||
}) |
@@ -47,2 +47,49 @@ 'use strict' | ||
app.get('/big', (req, reply) => { | ||
let i = 0 | ||
const big = new Readable({ | ||
read () { | ||
if (++i > 100) { | ||
this.push(null) | ||
return | ||
} | ||
this.push(Buffer.alloc(1024 * 1024, 'x')) | ||
}, | ||
}) | ||
return big | ||
}) | ||
app.get('/stream-error', (req, reply) => { | ||
// The content-lengh header is necessary to make sure that | ||
// the mesh network collects the whole body | ||
reply.header('content-length', 1024) | ||
let called = false | ||
reply.send(new Readable({ | ||
read () { | ||
if (!called) { | ||
called = true | ||
this.push('hello') | ||
setTimeout(() => { | ||
this.destroy(new Error('kaboom')) | ||
}, 1000) | ||
} | ||
}, | ||
})) | ||
}) | ||
app.get('/stream-error-2', (req, reply) => { | ||
let called = false | ||
reply.send(new Readable({ | ||
read () { | ||
if (!called) { | ||
called = true | ||
this.push('hello') | ||
setTimeout(() => { | ||
this.destroy(new Error('kaboom')) | ||
}, 1000) | ||
} | ||
}, | ||
})) | ||
}) | ||
app.post('/echo-body', (req, reply) => { | ||
@@ -49,0 +96,0 @@ reply.send(req.body) |
@@ -94,4 +94,4 @@ 'use strict' | ||
domain: '.local', | ||
onClientResponse: (_req, res) => { | ||
hookCalled = Buffer.from(res.rawPayload).toString() | ||
onClientResponse: (req) => { | ||
hookCalled = req.path | ||
} | ||
@@ -107,3 +107,3 @@ }) | ||
strictEqual(statusCode, 200) | ||
deepStrictEqual(hookCalled, '{"hello":"world"}') | ||
strictEqual(hookCalled, '/') | ||
}) | ||
@@ -119,3 +119,3 @@ | ||
onClientResponseEnd: (_req, res) => { | ||
hookCalled = Buffer.from(res.rawPayload).toString() | ||
hookCalled = true | ||
} | ||
@@ -131,3 +131,3 @@ }) | ||
strictEqual(statusCode, 200) | ||
deepStrictEqual(hookCalled, '{"hello":"world"}') | ||
strictEqual(hookCalled, true) | ||
}) | ||
@@ -182,3 +182,3 @@ | ||
await sleep(300) | ||
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"}}']) | ||
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"},"payloadAsStream":true}']) | ||
}) | ||
@@ -208,3 +208,3 @@ | ||
await sleep(300) | ||
deepStrictEqual(lines, ['onServerResponse called "{\\"hello\\":\\"world\\"}"']) | ||
deepStrictEqual(lines, ['onServerResponse called /']) | ||
}) | ||
@@ -257,3 +257,3 @@ | ||
await sleep(300) | ||
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"}}', 'onServerResponse called: propagated']) | ||
deepStrictEqual(lines, ['onServerRequest called {"method":"GET","url":"/","headers":{"host":"myserver.local"},"payloadAsStream":true}', 'onServerResponse called: propagated']) | ||
}) | ||
@@ -260,0 +260,0 @@ |
@@ -70,3 +70,3 @@ 'use strict' | ||
await rejects(request('http://myserver.local/echo-body', { | ||
const res = await request('http://myserver.local/echo-body', { | ||
dispatcher: agent, | ||
@@ -82,3 +82,10 @@ method: 'POST', | ||
}), | ||
})) | ||
}) | ||
strictEqual(res.statusCode, 400) | ||
deepStrictEqual(await res.body.json(), { | ||
statusCode: 400, | ||
error: 'Bad Request', | ||
message: 'kaboom', | ||
}) | ||
}) | ||
@@ -110,3 +117,4 @@ | ||
test('POST errors with streams of objects', async (t) => { | ||
// Unskip when https://github.com/nodejs/node/pull/55270 is released | ||
test.skip('POST errors with streams of objects', async (t) => { | ||
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
@@ -128,4 +136,29 @@ t.after(() => worker.terminate()) | ||
}, | ||
body: Readable.from([{ hello: 'world' }]), | ||
body: Readable.from([{ hello: 'world' }]) | ||
})) | ||
}) | ||
test('correctly handles aborted requests', async (t) => { | ||
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) | ||
t.after(() => worker.terminate()) | ||
const interceptor = createThreadInterceptor({ | ||
domain: '.local', | ||
}) | ||
interceptor.route('myserver', worker) | ||
const agent = new Agent().compose(interceptor) | ||
const abortController = new AbortController() | ||
setImmediate(() => abortController.abort()) | ||
await rejects(request('http://myserver.local/unfinished-business', { | ||
dispatcher: agent, | ||
signal: abortController.signal, | ||
method: 'POST', | ||
headers: { | ||
'content-type': 'application/json', | ||
}, | ||
body: JSON.stringify({ hello: 'world' }) | ||
})) | ||
}) |
Sorry, the diff of this file is not supported yet
89677
49
2493
Updatedlight-my-request@^6.5.1