Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@matrixai/rpc

Package Overview
Dependencies
Maintainers
3
Versions
28
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@matrixai/rpc - npm Package Compare versions

Comparing version 0.1.6 to 0.1.8

2

dist/errors/errors.d.ts

@@ -28,3 +28,3 @@ import type { Class } from '@matrixai/errors';

interface RPCError extends Error {
code?: number;
code: number;
}

@@ -31,0 +31,0 @@ declare class ErrorRPC<T> extends AbstractError<T> implements RPCError {

import type { ClientHandlerImplementation, DuplexHandlerImplementation, JSONRPCRequest, JSONRPCResponse, JSONRPCResponseResult, ServerManifest, RawHandlerImplementation, ServerHandlerImplementation, UnaryHandlerImplementation, RPCStream, MiddlewareFactory } from './types';
import type { JSONValue } from './types';
import type { IdGen } from './types';
import type { ErrorRPC, ErrorRPCRemote } from './errors';
import Logger from '@matrixai/logger';

@@ -35,4 +36,2 @@ import { PromiseCancellable } from '@matrixai/async-cancellable';

* path and `JSONRPCResponse` to `Uint8Array` on the reverse path.
* @param obj.sensitive - If true, sanitises any rpc error messages of any
* sensitive information.
* @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the

@@ -47,12 +46,12 @@ * value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a

*/
static createRPCServer({ manifest, middlewareFactory, sensitive, handlerTimeoutTime, // 1 minute
logger, idGen, fromError, replacer, }: {
static createRPCServer({ manifest, middlewareFactory, handlerTimeoutTime, // 1 minute
logger, idGen, fromError, filterSensitive, toError, }: {
manifest: ServerManifest;
middlewareFactory?: MiddlewareFactory<JSONRPCRequest, Uint8Array, Uint8Array, JSONRPCResponse>;
sensitive?: boolean;
handlerTimeoutTime?: number;
logger?: Logger;
idGen: IdGen;
fromError?: (error: Error) => JSONValue;
replacer?: (key: string, value: any) => any;
fromError?: (error: ErrorRPC<any>) => JSONValue;
filterSensitive?: (key: string, value: any) => any;
toError?: (errorResponse: any, metadata?: any) => ErrorRPCRemote<any>;
}): Promise<RPCServer>;

@@ -66,17 +65,16 @@ protected onTimeoutCallback?: () => void;

protected activeStreams: Set<PromiseCancellable<void>>;
protected sensitive: boolean;
protected fromError: (error: Error, sensitive?: boolean) => JSONValue;
protected replacer: (key: string, value: any) => any;
protected fromError: (error: ErrorRPC<any>) => JSONValue;
protected filterSensitive: (key: string, value: any) => any;
protected toError: (errorResponse: any, metadata?: any) => ErrorRPCRemote<any>;
protected middlewareFactory: MiddlewareFactory<JSONRPCRequest, Uint8Array, Uint8Array, JSONRPCResponseResult>;
registerOnTimeoutCallback(callback: () => void): void;
constructor({ manifest, middlewareFactory, sensitive, handlerTimeoutTime, // 1 minuet
logger, idGen, fromError, replacer, }: {
constructor({ manifest, middlewareFactory, handlerTimeoutTime, logger, idGen, fromError, filterSensitive, toError, }: {
manifest: ServerManifest;
middlewareFactory: MiddlewareFactory<JSONRPCRequest, Uint8Array, Uint8Array, JSONRPCResponseResult>;
handlerTimeoutTime?: number;
sensitive: boolean;
logger: Logger;
idGen: IdGen;
fromError?: (error: Error) => JSONValue;
replacer?: (key: string, value: any) => any;
fromError?: (error: ErrorRPC<any>) => JSONValue;
filterSensitive?: (key: string, value: any) => any;
toError?: (errorResponse: any, metadata?: any) => ErrorRPCRemote<any>;
});

@@ -83,0 +81,0 @@ destroy(force?: boolean): Promise<void>;

@@ -72,4 +72,2 @@ "use strict";

* path and `JSONRPCResponse` to `Uint8Array` on the reverse path.
* @param obj.sensitive - If true, sanitises any rpc error messages of any
* sensitive information.
* @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the

@@ -84,4 +82,4 @@ * value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a

*/
static async createRPCServer({ manifest, middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(), sensitive = false, handlerTimeoutTime = Infinity, // 1 minute
logger = new logger_1.default(this.name), idGen = () => Promise.resolve(null), fromError = rpcUtils.fromError, replacer = rpcUtils.replacer, }) {
static async createRPCServer({ manifest, middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(), handlerTimeoutTime = Infinity, // 1 minute
logger = new logger_1.default(this.name), idGen = () => Promise.resolve(null), fromError = rpcUtils.fromError, filterSensitive = rpcUtils.filterSensitive, toError = rpcUtils.toError, }) {
logger.info(`Creating ${this.name}`);

@@ -91,3 +89,2 @@ const rpcServer = new this({

middlewareFactory,
sensitive,
handlerTimeoutTime,

@@ -97,3 +94,4 @@ logger,

fromError,
replacer,
filterSensitive,
toError,
});

@@ -110,5 +108,5 @@ logger.info(`Created ${this.name}`);

activeStreams = new Set();
sensitive;
fromError;
replacer;
filterSensitive;
toError;
middlewareFactory;

@@ -119,4 +117,3 @@ // Function to register a callback for timeout

}
constructor({ manifest, middlewareFactory, sensitive, handlerTimeoutTime = Infinity, // 1 minuet
logger, idGen = () => Promise.resolve(null), fromError = rpcUtils.fromError, replacer = rpcUtils.replacer, }) {
constructor({ manifest, middlewareFactory, handlerTimeoutTime = Infinity, logger, idGen = () => Promise.resolve(null), fromError = rpcUtils.fromError, filterSensitive = rpcUtils.filterSensitive, toError = rpcUtils.toError, }) {
super();

@@ -152,7 +149,7 @@ for (const [key, manifestItem] of Object.entries(manifest)) {

this.middlewareFactory = middlewareFactory;
this.sensitive = sensitive;
this.handlerTimeoutTime = handlerTimeoutTime;
this.logger = logger;
this.fromError = fromError || rpcUtils.fromError;
this.replacer = replacer || rpcUtils.replacer;
this.filterSensitive = filterSensitive || rpcUtils.filterSensitive;
this.toError = toError || rpcUtils.toError;
}

@@ -270,3 +267,3 @@ async destroy(force = true) {

message: e.description ?? '',
data: JSON.stringify(this.fromError(e), this.replacer),
data: JSON.stringify(this.fromError(e), this.filterSensitive),
type: e.type,

@@ -457,3 +454,3 @@ };

message: e.description ?? '',
data: JSON.stringify(this.fromError(e), this.replacer),
data: JSON.stringify(this.fromError(e), this.filterSensitive),
type: e.type,

@@ -460,0 +457,0 @@ };

@@ -28,3 +28,3 @@ /// <reference types="node" />

*/
declare const replacer: (keyToRemove: any) => (key: any, value: any) => any;
declare const filterSensitive: (keyToRemove: any) => (key: any, value: any) => any;
/**

@@ -68,2 +68,2 @@ * Deserializes an error response object into an ErrorRPCRemote instance.

declare function parseHeadStream<T extends JSONRPCMessage>(messageParser: (message: unknown) => T, bufferByteLimit?: number): TransformStream<Uint8Array, T | Uint8Array>;
export { parseJSONRPCRequest, parseJSONRPCRequestMessage, parseJSONRPCRequestNotification, parseJSONRPCResponseResult, parseJSONRPCResponseError, parseJSONRPCResponse, parseJSONRPCMessage, replacer, fromError, toError, clientInputTransformStream, clientOutputTransformStream, getHandlerTypes, parseHeadStream, promise, isObject, sleep, };
export { parseJSONRPCRequest, parseJSONRPCRequestMessage, parseJSONRPCRequestNotification, parseJSONRPCResponseResult, parseJSONRPCResponseError, parseJSONRPCResponse, parseJSONRPCMessage, filterSensitive, fromError, toError, clientInputTransformStream, clientOutputTransformStream, getHandlerTypes, parseHeadStream, promise, isObject, sleep, };

@@ -26,3 +26,3 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.sleep = exports.isObject = exports.promise = exports.parseHeadStream = exports.getHandlerTypes = exports.clientOutputTransformStream = exports.clientInputTransformStream = exports.toError = exports.fromError = exports.replacer = exports.parseJSONRPCMessage = exports.parseJSONRPCResponse = exports.parseJSONRPCResponseError = exports.parseJSONRPCResponseResult = exports.parseJSONRPCRequestNotification = exports.parseJSONRPCRequestMessage = exports.parseJSONRPCRequest = void 0;
exports.sleep = exports.isObject = exports.promise = exports.parseHeadStream = exports.getHandlerTypes = exports.clientOutputTransformStream = exports.clientInputTransformStream = exports.toError = exports.fromError = exports.filterSensitive = exports.parseJSONRPCMessage = exports.parseJSONRPCResponse = exports.parseJSONRPCResponseError = exports.parseJSONRPCResponseResult = exports.parseJSONRPCRequestNotification = exports.parseJSONRPCRequestMessage = exports.parseJSONRPCRequest = void 0;
const web_1 = require("stream/web");

@@ -32,3 +32,2 @@ const json_1 = require("@streamparser/json");

const rpcErrors = __importStar(require("./errors"));
const errors = __importStar(require("./errors"));
const errors_2 = require("./errors");

@@ -280,68 +279,5 @@ const errors_3 = require("./errors");

*/
const replacer = createReplacer();
exports.replacer = replacer;
const filterSensitive = createReplacer();
exports.filterSensitive = filterSensitive;
/**
* Reviver function for deserializing errors sent over RPC.
* @param {string} key - The key in the JSON object.
* @param {any} value - The value corresponding to the key in the JSON object.
* @returns {any} The reconstructed error object or the original value.
*/
function reviver(key, value) {
// If the value is an error then reconstruct it
if (typeof value === 'object' &&
typeof value.type === 'string' &&
typeof value.data === 'object') {
try {
let eClass = errors[value.type];
if (eClass != null)
return eClass.fromJSON(value);
eClass = standardErrors[value.type];
if (eClass != null) {
let e;
switch (eClass) {
case errors_1.AbstractError:
return eClass.fromJSON();
case AggregateError:
if (!Array.isArray(value.data.errors) ||
typeof value.data.message !== 'string' ||
('stack' in value.data && typeof value.data.stack !== 'string')) {
throw new TypeError(`cannot decode JSON to ${value.type}`);
}
e = new eClass(value.data.errors, value.data.message);
e.stack = value.data.stack;
break;
default:
if (typeof value.data.message !== 'string' ||
('stack' in value.data && typeof value.data.stack !== 'string')) {
throw new TypeError(`Cannot decode JSON to ${value.type}`);
}
e = new eClass(value.data.message);
e.stack = value.data.stack;
break;
}
return e;
}
}
catch (e) {
// If `TypeError` which represents decoding failure
// then return value as-is
// Any other exception is a bug
if (!(e instanceof TypeError)) {
throw e;
}
}
// Other values are returned as-is
return value;
}
else if (key === '') {
// Root key will be ''
// Reaching here means the root JSON value is not a valid exception
// Therefore ErrorPolykeyUnknown is only ever returned at the top-level
return new rpcErrors.ErrorRPC('Unknown error JSON');
}
else {
return value;
}
}
/**
* Deserializes an error response object into an ErrorRPCRemote instance.

@@ -348,0 +284,0 @@ * @param {any} errorResponse - The error response object.

{
"name": "@matrixai/rpc",
"version": "0.1.6",
"version": "0.1.8",
"author": "Matrix AI",

@@ -5,0 +5,0 @@ "contributors": [

@@ -11,3 +11,444 @@ # js-rpc

```
## Usage Examples
Because decorators are experimental, you must enable: `"experimentalDecorators": true` in your `tsconfig.json` to use this library.
### Client Stream
In a Client Stream, the client can write multiple messages to a single stream,
while the server reads from that stream and then returns a single response.
This pattern is useful when the client needs to send a sequence of data to the server,
after which the server processes the data and replies with a single result.
This pattern is good for scenarios like file uploads.
This example shows how to create an RPC pair and handle streaming integers and summing them up.
```ts
import {RPCServer, ClientHandler, ContainerType, RPCClient, ClientCaller} from "./index";
import {AsyncIterable} from "ix/Ix";
const webSocketServer = awwait
class Sum extends ClientHandler<ContainerType, number, number> {
public handle = async (
// handle takes input an AsyncIterable, which is a generator,
// would produce numbers input in the Writeable stream
input: AsyncIterable<number>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<number> => {
let sum = 0;
for await (const num of input) {
sum += num;
}
return acc;
};
};
// Setting up an instance of RPC Client with Sum as the handler method
async function startServer() {
const rpcServer = await RPCServer.createRPCServer({
manifest: {
Sum: new Sum({}),
},
logger,
idGen,
handlerTimeoutTime: 60000,
});
// Simulating receiving a stream from a client.
// Provided by network layer
const simulatedStream = sendStreamHere;
rpcServer.handleStream(simulatedStream);
return rpcServer;
}
async function startClient() {
// Simulate client-server pair of streams.
// Simulating network stream
const clientPair =sendStreamHere /* your logic for creating or obtaining a client-side stream */;
const rpcClient = await RPCClient.createRPCClient({
manifest: {
Sum: new ClientCaller<number, number>(),
},
streamFactory,
middlewareFactory,
logger,
idGen
})
return rpcClient;
}
// Function to execute the Sum RPC call
async function executeSum(rpcClient: typeof RPCClient){
const { output, writable } = await rpcClient.methods.Sum();
const writer = writable.getWriter();
await writer.write(5);
await writer.write(10);
await writer.close();
const ans = await output;
console.log('Sum is: $(ans)');
}
// Main function to tie everything together
async function main(){
const rpcServer = await startServer();
const rpcClient = await startClient();
await executeSum(rpcClient);
await rpcServer.destroy();
await rpcClient.destroy();
}
main();
```
### Duplex Stream
A Duplex Stream enables both the client and the server to read
and write messages in their respective streams independently of each other.
Both parties can read and write multiple messages in any order.
It's useful in scenarios that require ongoing communication in both directions, like chat applications.
In this example, the client sends a sequence of numbers and the server responds with the squares of those numbers.
```ts
import { RPCServer, DuplexHandler, ContainerType } from "./index";
class SquareNumbersDuplex extends DuplexHandler<ContainerType, number, Array<number>> {
public handle = async function* (
input: AsyncIterableIterator<number>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): AsyncIterableIterator<Array<number>> {
for await (const num of input) {
const squares: Array<number> = [];
for (let i = 1; i <= num; i++) {
squares.push(i * i);
}
yield squares;
}
};
}
async function startServer() {
const rpcServer = await RPCServer.createRPCServer({
manifest: {
SquareNumbersDuplex: new SquareNumbersDuplex({}),
},
logger,
idGen,
});
const simulatedStream = sendStreamHere/* your logic for creating or obtaining a server-side stream */;
rpcServer.handleStream(simulatedStream);
return rpcServer;
}
// Run the server
startServer();
import { RPCClient, DuplexCaller } from "./index";
async function startClient() {
const rpcClient = await RPCClient.createRPCClient({
manifest: {
SquareNumbersDuplex: new DuplexCaller<number, Array<number>>(),
},
streamFactory,
middlewareFactory,
logger,
idGen,
});
const squareStream = await rpcClient.methods.SquareNumbersDuplex();
// Write to the server
const writer = squareStream.writable.getWriter();
writer.write(2);
writer.write(3);
writer.write(4);
// Read squared numbers from the server
for await (const squares of squareStream.readable) {
console.log(`Squares up to n are: ${squares.join(", ")}`);
}
writer.close();
return rpcClient;
}
// Run the client
startClient();
async function main(){
const rpcServer = await startServer();
const rpcClient = await startClient();
await rpcServer.destroy();
await rpcClient.destroy();
}
// Run the main function to kick off the example
main();
```
### Raw Stream
Raw Stream is designed for low-level handling of RPC calls, enabling granular control over data streaming.
Unlike other patterns, Raw Streams allow both the server and client to work directly with raw data,
providing a more flexible yet complex way to handle communications.
This is especially useful when the RPC protocol itself needs customization
or when handling different types of data streams within the same connection.
In this example, the client sends a sequence of numbers and the server responds with the factorial of each number.
```ts
import {RawHandler, JSONRPCRequest, ContextTimed, JSONValue} from '../types';
import {ContainerType} from "./types";
import RPCServer from "./RPCServer"; // Assuming these are imported correctly
class FactorialStream extends RawHandler<ContainerType> {
public async handle(
[request, inputStream]: [JSONRPCRequest, ReadableStream<Uint8Array>],
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<[JSONValue, ReadableStream<Uint8Array>]> {
const {readable, writable} = new TransformStream<Uint8Array, Uint8Array>();
(async () => {
const reader = inputStream.getReader();
const writer = writable.getWriter();
while (true) {
const {done, value} = await reader.read();
if (done) {
break;
}
const num = parseInt(new TextDecoder().decode(value), 10);
const factorial = factorialOf(num).toString();
const outputBuffer = new TextEncoder().encode(factorial);
writer.write(outputBuffer);
}
writer.close();
})();
return ["Starting factorial computation", readable];
}
}
function factorialOf(n: number): number {
return (n === 0) ? 1 : n * factorialOf(n - 1);
}
async function startSErver() {
const rpcServer = await RPCServer.createRPCServer({
manifest: {
FactorialStream: new FactorialStream({}),
},
logger,
idGen,
});
const simulatedStream = sendStreamHere/* your logic for creating or obtaining a server-side stream */;
rpcServer.handleStream(simulatedStream);
return rpcServer;
}
async function startClient() {
const rpcClient = await RPCClient.createRPCClient({
manifest: {
FactorialStream: new RawCaller(),
},
streamFactory,
middlewareFactory,
logger,
idGen,
});
const { readable, writable, meta } = await rpcClient.methods.FactorialRaw({});
//Printing 'Starting factorial computation'
console.log(meta);
const writer = writable.getWriter();
const numbers = [1, 2, 3, 4, 5];
for (const num of numbers){
writer.write(new TextEncoder().encode(num.toString()));
}
writer.close();
const reader = readable.getReader();
while (true) {
const {done, value} = await reader.read();
if (done) {
break;
}
console.log('The factorial is: $(new TextDecoder().decode(value))');
}
return rpcClient;
}
async function main(){
const rpcServer = await startServer();
const rpcClient = await startClient();
await rpcServer.destroy();
await rpcClient.destroy();
}
// Run the main function to kick off the example
main();
```
### Server Stream
In Server Stream calls,
the client sends a single request and receives multiple responses in a read-only stream from the server.
The server can keep pushing messages as long as it needs, allowing real-time updates from the server to the client.
This is useful for things like monitoring,
where the server needs to update the client in real-time based on events or data changes.
In this example, the client sends a number and the server responds with the squares of all numbers up to that number.
```ts
import ServerHandler from "./ServerHandler";
import {ContainerType} from "./types";
import {AsyncIterable} from "ix/Ix";
class SquaredNums extends ServerHandler<ContainerType, number, number> {
public handle = async function* (
input: number,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): AsyncIterable<number>{
for (let i = 0; i<= input; i++){
yield i*i;
}
};
}
async function startServer() {
const rpcServer = await RPCServer.createRPCServer({
manifest: {
SquaredNums: new SquaredNums({}),
},
logger,
idGen,
handlerTimeoutTime: 60000,
});
// Simulating receiving a stream from a client.
// Provided by network layer
const simulatedStream =sendStreamHere;
rpcServer.handleStream(simulatedStream);
return rpcServer;
}
async function startClient() {
// Simulate client-server pair of streams.
// Simulating network stream
const clientPair = sendStreamHere/* your logic for creating or obtaining a client-side stream */;
const rpcClient = await RPCClient.createRPCClient({
manifest: {
SquaredNums: new ServerCaller
},
streamFactory,
middlewareFactory,
logger,
idGen
})
const squaredStream = await rpcClient.methods.SquaredNums(4);
// Read squared numbers from the server
const outputs: Array<number> = [];
for await(const num of squaredStream) {
outputs.push(num);
}
console.log('Squared numbers are: $(outputs.join(', ')}');
return
return rpcClient;
}
async function main(){
const rpcServer = await startServer();
const rpcClient = await startClient();
await rpcServer.destroy();
await rpcClient.destroy();
}
main();
```
### Unary Stream
In a Unary Stream, the client sends a single request to the server and gets a single response back,
just like HTTP/REST calls but over a connection.
It's the simplest form of RPC, suitable for short-lived operations that don't require streaming data.
It's the go-to choice for straightforward "request and response" interactions.
In this example, the client sends a number and the server responds with the square of that number.
```ts
class SquaredNumberUnary extends UnaryHandler<ContainerType, number, number> {
public handle = async (
input: number,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<number> => {
return input * input;
};
}
async function startServer() {
const rpcServer = await RPCServer.createRPCServer({
manifest: {
SquaredNumberUnary: new SquaredNumberUnary({}),
},
logger,
idGen,
});
const simulatedStream = sendStreamHere/* your logic for creating or obtaining a server-side stream */;
rpcServer.handleStream(simulatedStream);
return rpcServer;
}
async function startClient() {
const rpcClient = await RPCClient.createRPCClient({
manifest: {
SquaredNumberUnary: new UnaryCaller<number, number>(),
},
streamFactory,
middlewareFactory,
logger,
idGen,
});
const squaredNumber = await rpcClient.methods.SquaredNumberUnary(4);
console.log('Squared number is: $(squaredNumber)');
return rpcClient;
}
async function main(){
const rpcServer = await startServer();
const rpcClient = await startClient();
await rpcServer.destroy();
await rpcClient.destroy();
}
main();
```
## Development

@@ -14,0 +455,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc