Port Agent
A RPC-like facility for making inter-thread function calls.
Introduction
Port Agent provides a simple and intuitive interface that makes inter-thread function calls easy.
Features
- Port Agent will marshal the return value or
Error
from the other thread back to the caller. - The other thread may be the main thread or a worker thread.
- Registered functions (i.e.,
Agent.register
) persist until deregistered (i.e., Agent.deregister
) . - Late binding registrants will be called with previously awaited invocations.
Table of Contents
- Concepts
- API
- Usage
- Examples
Concepts
Agent
An instance of an Agent
facilitates communication across threads. The Agent
can be used in order to register a function in one thread and call it from another thread. Calls may be made from the main thread to a worker thread, and conversely from a worker thread to the main thread.
Late binding registrants will be called with previously awaited invocations; thus preventing a race condition. This means that you may await a call to a function that has not yet been registered. Once the function is registered in the other thread it will be called and its return value or Error
will be marshalled back to the caller.
Please see the Examples for variations on the Agent
's usage.
API
The Agent
Class
port_agent.Agent(port)
- port
<threads.MessagePort>
or <threads.Worker>
The message port.
agent.call<T>(name, ...args)
agent.register(name, fn)
agent.deregister(name)
Usage
How to create an Agent
instance.
You can create a new Agent
by passing a parentPort
or a Worker
instance to the Agent
constructor:
In the main thread,
const worker = new Worker(fileURLToPath(import.meta.url));
const agent = new Agent(worker);
or, in a worker thread,
const agent = new Agent(worker_threads.parentPort);
How to use an Agent
instance.
You can register a function in the main thread or in a worker thread using the Agent.register
method:
agent.register('hello_world', (value: string): string => `Hello, ${value} world!`);
You can call a function registered in another thread (i.e., the main thread or a worker thread) using the Agent.call
method:
const greeting = await agent.call<string>('hello_world', 'happy');
Examples
A Simple Example
In this example you will:
- Instantiate a worker thread.
- Instantiate an
Agent
in the main thread. - Use the
Agent
to call the hello_world
function. - Instantiate an
Agent
in the worker thread. - Use the
Agent
in order to register a function to handle calls to the hello_world
function. - Resolve (3) and log the result to the console.
examples/simple/index.js
import { Worker, isMainThread, parentPort } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
import { Agent } from 'port_agent';
if (isMainThread) {
void (async () => {
const worker = new Worker(fileURLToPath(import.meta.url));
const agent = new Agent(worker);
try {
const greeting = await agent.call('hello_world', 'another');
console.log(greeting);
}
catch (err) {
console.error(err);
}
finally {
worker.terminate();
}
})();
}
else {
if (parentPort) {
const agent = new Agent(parentPort);
agent.register('hello_world', (value) => `Hello, ${value} world!`);
}
}
The example should log to the console:
Hello, another world!
Please see the Simple Example for a working implementation.
A Test
In this test you will:
- Instantiate a worker thread.
- Instantiate an Agent in the main thread.
- Use the Agent to call the
hello_world
function and await resolution.
- At this point the
hello_world
function has not yet been registered in the worker thread. The function will be called once it is registered.
- Wait for the worker to come online.
- Instantiate an Agent in the worker thread.
- Use the Agent to register the
hello_world
function in the worker. - Use the Agent to register the
a_reasonable_assertion
function in the worker. - Use the Agent to call a
magic
function in the main thread that is not yet registered. - Use the Agent to call the function registered as
hello_world
and await resolution. - Resolve (3) and log the return value.
- Resolve (8) and log the return value.
- Use the Agent to call the function registered as
a_reasonable_assertion
and await resolution. - Resolve (11) and catch the Error and log the stack trace in the main thread.
- The Error was marshalled from the Error produced by the reasonable assertion that was made in the
nowThrowAnError
function in the worker thread.
- Terminate the worker thread asynchronously.
- Await abends.
- The worker thread exited; hence, log the exit code.
- If an unhandled exception had occurred in the worker thread it would have been handled accordingly.
- Use the Agent to register a
magic
function in the main thread and log the long disposed thread's ID.
Please see the comments in the code that specify each of the steps above. The output of the test is printed below.
./tests/test/index.ts
import { Worker, isMainThread, parentPort, threadId } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
import { strict as assert } from 'node:assert';
import { Agent } from 'port_agent';
if (isMainThread) {
void (async () => {
const worker = new Worker(fileURLToPath(import.meta.url));
const agent = new Agent(worker);
worker.on('online', async () => {
try {
const greeting = await agent.call<string>('hello_world', 'again, another');
console.log(greeting);
await agent.call('a_reasonable_assertion', 'To err is Human.');
}
catch (err) {
console.error(`Now, back in the main thread, we will handle the`, err);
}
finally {
void worker.terminate();
setTimeout(async () => {
try {
await agent.call<string>('hello_world', 'no more...');
}
catch (err) {
if (err instanceof Error) {
console.error(err);
}
else if (typeof err == 'number') {
console.log(`Exit code: ${err.toString()}`);
}
}
agent.register('magic', (value: number): void => console.log(`Seriously, the worker's thread ID was ${value}.`));
}, 4);
}
});
try {
const greeting = await agent.call<string>('hello_world', 'another');
console.log(greeting);
}
catch (err) {
console.error(err);
}
})();
} else {
function nowThrowAnError(message: string) {
assert.notEqual(typeof new Object(), typeof null, message);
}
function callAFunction(message: string) {
nowThrowAnError(message);
}
if (parentPort) {
const agent = new Agent(parentPort);
agent.register('hello_world', (value: string): string => `Hello, ${value} world!`);
agent.register('a_reasonable_assertion', callAFunction);
const result = await agent.call('magic', threadId);
}
}
This test should log to the console something that looks similar to this:
Hello, another world!
Hello, again, another world!
Now, back in the Main Thread, we will handle the AssertionError [ERR_ASSERTION]: To err is Human.
at nowThrowAnError (file:///port_agent/tests/test/dist/index.js:31:16)
at callAFunction (file:///port_agent/tests/test/dist/index.js:34:9)
at Agent.tryPost (/port_agent/dist/index.js:92:33)
at MessagePort.<anonymous> (/port_agent/dist/index.js:62:36)
at [nodejs.internal.kHybridDispatch] (node:internal/event_target:762:20)
at exports.emitMessage (node:internal/per_context/messageport:23:28) {
generatedMessage: false,
code: 'ERR_ASSERTION',
actual: 'object',
expected: 'object',
operator: 'notStrictEqual'
}
Exit code: 1
Seriously, the worker's thread ID was 1.
Run the Test
Clone the repository.
git clone https://github.com/faranalytics/port_agent.git
Change directory into the root of the repository.
cd port_agent
Run the test.
npm run test