New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

port_agent

Package Overview
Dependencies
Maintainers
1
Versions
33
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

port_agent

A RPC-like facility for making inter-thread function calls.

1.0.0
Source
npm
Version published
Weekly downloads
19
58.33%
Maintainers
1
Weekly downloads
 
Created
Source

Port Agent

A RPC-like facility for making inter-thread function calls.

Features

  • Port Agent will marshall return values and Errors (stack traces) back to the caller.
  • Registered functions (i.e., Agent.register) are persistent.
  • Late binding registrants will be called with previously awaited invocations.

Examples

A Simple Example

index.js

import { Worker, isMainThread, parentPort } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
import { Agent } from 'port_agent';

if (isMainThread) { // This is the Main Thread.
    (async () => {
        const worker = new Worker(fileURLToPath(import.meta.url));
        const agent = new Agent(worker);
        worker.on('online', async () => {
            try {
                let greeting = await agent.call('hello_world', 'again, another');
                console.log(greeting);
                await agent.call('error', 'again, another');
            }
            catch (err) {
                console.error(err);
            }
            finally {
                worker.terminate();
            }
        });
        let greeting = await agent.call('hello_world', 'another');
        console.log(greeting);
    })();
} else { // This is a Worker Thread.
    function nowThrowAnError() {
        throw new Error('To err is Human.');
    }
    function callAFunction() {
        nowThrowAnError();
    }
    (async () => {
        const agent = new Agent(parentPort);
        await agent.register('hello_world', (value) => `Hello ${value} world!`);
        await agent.register('error', callAFunction);
    })();
} 

This example should log to the console:

Hello another world!
Hello again, another world!
Error: To err is Human.
    at nowThrowAnError (file:///index.js:30:15)
    at callAFunction (file:///home/adpatter/repositories/faranalytics/port_agent/test/index.js:33:9)
    at Agent.tryPost (/index.js:82:33)
    at MessagePort.<anonymous> (index.js:56:36)
    at [nodejs.internal.kHybridDispatch] (node:internal/event_target:762:20)
    at exports.emitMessage (node:internal/per_context/messageport:23:28)
Run Test

You can run the test using:

npm run test

Subclassing Agent

An Agent can be used effectively as shown in the Simple Example or it can be subclassed in order to act as a "wrapper" around Worker threads. In this example the Agent class is subclassed by WorkerAgent in order to provide the state (i.e., online, ready, and exited) of a Worker thread, which may be useful in some contexts.

import * as threads from 'node:worker_threads';
import * as net from 'node:net';
import { Agent } from 'port_agent';

interface WorkerAgentOptions {
    worker: threads.Worker;
    workerOnlineTimeout: number;
}

export class WorkerAgent extends Agent {
    public connections: number;
    public online: boolean;
    public exited: boolean;
    public proxyServerConnectOptions?: net.SocketConnectOpts;
    public ready: Promise<boolean>;
    public workerOnlineTimeout: number;

    constructor({
        worker,
        workerOnlineTimeout = 10000
    }: WorkerAgentOptions
    ) {
        super(worker);
        this.connections = 0;
        this.online = false;
        this.exited = false;
        this.workerOnlineTimeout = workerOnlineTimeout;

        this.ready = new Promise<boolean>((r, j) => {
            const timeout = setTimeout(() => {
                j(new Error(`Worker failed to come online in ${this.workerOnlineTimeout} milliseconds.`));
            }, this.workerOnlineTimeout);
            worker.on('online', () => {
                clearTimeout(timeout);
                this.online = true;
                r(true);
            });
        });

        worker.on('error', (err: Error) => {
            this.ready = Promise.reject(err);
        });

        void (async () => {
            try {
                await this.register('worker_exit', () => this.exited = true);
            }
            catch (err) {
                console.error(err);
            }
        })();
    }

    public async call(name: string, ...args: any): Promise<any> {
        await this.ready;
        return super.call(name, ...args);
    }

    public async register(name: string, fn: (...args: any) => any): Promise<any> {
        await this.ready;
        return super.register(name, fn);
    }
}

FAQs

Package last updated on 02 Aug 2023

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts