undici-thread-interceptor
An Undici agent that routes requests to a worker thread.
Supports:
- load balancing (round robin)
- mesh networking between the worker threads
Installation
npm install undici undici-thread-interceptor
Usage
Main (main.js)
import { Worker } from "node:worker_threads";
import { join } from "node:path";
import { createThreadInterceptor } from "undici-thread-interceptor";
import { Agent, request } from "undici";
const worker = new Worker(join(import.meta.dirname, "worker.js"));
const interceptor = createThreadInterceptor({
domain: ".local",
});
interceptor.route("myserver", worker);
const agent = new Agent().compose(interceptor);
const { statusCode, body } = await request("http://myserver.local", {
dispatcher: agent,
});
console.log(statusCode, await body.json());
Worker (worker.js)
Generic node HTTP application
import { wire } from "undici-thread-interceptor";
import { parentPort } from "node:worker_threads";
function app(req, res) {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ hello: "world" }));
}
wire({ server: app, port: parentPort });
Fastify
import { wire } from "undici-thread-interceptor";
import { parentPort } from "node:worker_threads";
import fastify from "fastify";
const app = fastify();
app.get("/", (req, reply) => {
reply.send({ hello: "world" });
});
wire({ server: app, port: parentPort });
Express
import { wire } from "undici-thread-interceptor";
import { parentPort } from "node:worker_threads";
import express from "express";
const app = express();
app.get("/", (req, res) => {
res.send({ hello: "world" });
});
wire({ server: app, port: parentPort });
Koa
import { wire } from "undici-thread-interceptor";
import { parentPort } from "node:worker_threads";
import Koa from "koa";
const app = new Koa();
app.use((ctx) => {
ctx.body = { hello: workerData?.message || "world" };
});
wire({ server: app.callback(), port: parentPort });
Replace the server at runtime
import { wire } from "undici-thread-interceptor";
import { parentPort } from "node:worker_threads";
import fastify from "fastify";
const app1 = fastify();
app1.get("/", (req, reply) => {
reply.send({ hello: "this is app 1" });
});
const app2 = fastify();
app2.get("/", (req, reply) => {
reply.send({ hello: "this is app 2" });
});
const { replaceServer } = wire({ server: app1, port: parentPort });
setTimeout(() => {
replaceServer(app2);
}, 5000);
Gracefully close the worker thread
If you want to gracefully close the worker thread, remember to call the close
function of the interceptor.
import { wire } from "undici-thread-interceptor";
const { interceptor } = wire({ server: app, port: parentPort });
interceptor.close();
API
Hooks
It's possible to set some simple synchronous functions as hooks:
onServerRequest(req, cb)
onServerResponse(req, res)
onServerError(req, res, error)
onClientRequest(req, clientCtx)
onClientResponse(req, res, clientCtx)
onClientResponseEnd(req, res, clientCtx)
onClientError(req, res, clientCtx, error)
The clientCtx
is used to pass through hooks calls objects which cannot be set on request
(which is then sent through postMessage
, so it might be not serializable).
Client hooks
These are set on the agent dispatcher.
const interceptor = createThreadInterceptor({
domain: ".local",
onClientRequest: (req) => console.log("onClientRequest called", req),
});
interceptor.route("myserver", worker);
const agent = new Agent().compose(interceptor);
const { statusCode } = await request("http://myserver.local", {
dispatcher: agent,
});
Server hooks
These can be passed to the wire
function in workers. e.g. with Fastify:
import { wire } from "undici-thread-interceptor";
import { parentPort } from "node:worker_threads";
import fastify from "fastify";
const app = fastify();
app.get("/", (req, reply) => {
reply.send({ hello: "world" });
});
wire({
server: app,
port: parentPort,
onServerRequest: (req, cb) => {
console.log("onServerRequest called", req);
cb();
},
});
License
MIT