Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@matrixai/rpc

Package Overview
Dependencies
Maintainers
3
Versions
28
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@matrixai/rpc - npm Package Compare versions

Comparing version 0.2.5 to 0.2.6

4

dist/types.d.ts

@@ -114,3 +114,3 @@ /// <reference types="node" />

}>;
} & Omit<T, 'metadata'>;
} & T;
type JSONRPCResult<T extends JSONObject = JSONObject> = {

@@ -122,3 +122,3 @@ metadata?: {

}>;
} & Omit<T, 'metadata'>;
} & T;
/**

@@ -125,0 +125,0 @@ * This is a JSON RPC error object, it encodes the error data for the JSONRPCResponseError object.

{
"name": "@matrixai/rpc",
"version": "0.2.5",
"version": "0.2.6",
"author": "Matrix AI",

@@ -5,0 +5,0 @@ "contributors": [

@@ -14,377 +14,296 @@ # js-rpc

### Overview of different streams
### Basic Usage
#### Duplex Stream ->
Because decorators are experimental, you must enable:
`"experimentalDecorators": true` in your `tsconfig.json` to use this library.
##### Client:
The client initiates a duplex streaming RPC call using a method that returns both a readable and a writable stream. The client can read from the readable stream and write to the writable stream.
First, setup an `RPCStream` to use:
```ts
import { JSONValue } from "./types";
import Caller from './Caller';
const rpcStream = {
readable: new ReadableStream(),
writable: new WritableStream(),
cancel: () => {};
};
```
// Initialize the duplex call
const { ReadableStream, WritableStream } = client.method();
#### Server
// Get the reader and writer from the streams
const reader = ReadableStream.getReader();
const writer = WritableStream.getWriter();
```ts
import type { JSONRPCParams, JSONRPCResult, JSONValue } from "@matrixai/rpc";
import { RPCServer, UnaryHandler } from "@matrixai/rpc";
// Read output from the server
const readResult = await reader.read();
if (!readResult.done) {
const output: JSONValue = readResult.value;
console.log("Received from server:", output);
// Create a Handler
class SquaredNumberUnary extends UnaryHandler<ContainerType, JSONRPCParams<{ value: number }>, JSONRPCResult<{ value: number }>> {
public handle = async (
input: JSONRPCParams<{ value: number }>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<JSONRPCResult<{ value: number }>> => {
return input.value**2;
};
}
// Write data to the server
const inputData: JSONValue = { someKey: "someValue" };
await writer.write(inputData);
const rpcServer = new RPCServer();
// Don't forget to close the writer when you're done
await writer.close();
await rpcServer.start({
manifest: {
SquaredDuplex: new SquaredDuplex(),
// ... add more handlers here
},
});
rpcServer.handleStream(rpcStream);
```
##### Server :
#### Client
```ts
import type { ContainerType, JSONValue } from '../types';
import type { ContextTimed } from '@matrixai/contexts';
import Handler from './Handler';
import type { HandlerTypes } from "@matrixai/rpc";
import { RPCClient, UnaryCaller } from "@matrixai/rpc";
// Define the handler as an async generator function
const handle = async function* (
input: AsyncIterableIterator<JSONValue>, // This is a generator.
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed
): AsyncIterableIterator<JSONValue> {
// Get the CallerTypes of the handler
type CallerTypes = HandlerTypes<SquaredNumberUnary>;
const squaredNumber = new UnaryCaller<
CallerTypes['input'],
CallerTypes['output']
>();
// Loop through the incoming stream of messages
for await (const incomingData of input) {
console.log("Received from client:", incomingData);
const rpcClient = new RPCClient({
manifest: {
squaredNumber,
// ... add more here
},
streamFactory: async () => rpcStream,
});
// Perform some operation on the incoming data
const outputData: JSONValue = { responseKey: "responseValue" };
// Yield data back to the client
yield outputData;
// We yield since stream can contain multiple messages.
}
};
await rpcClient.methods.squaredNumber({ value: 2 });
// returns { value: 4 }
```
#### Client Streaming
##### Client
The client initiates a client streaming RPC call using a method that returns a writable stream and a promise. The client writes to the writable stream and awaits the output promise to get the response.
Any of the callers or handlers can be added from the below `Call Types` section.
```ts
{ output: Promise<JSONValue>, WriteableStream<JSONValue>} = client.method();
const writer = WritableStream.getWriter();
writer.write();
const Output = await output;
```
##### Server
On the server side, the handle function is defined as an asynchronous generator function. It takes an AsyncIterableIterator as input, which represents the stream of incoming messages from the client. It yields output back to the client as needed.
```ts
// Initialize the client streaming call
const { output, writable } = client.method();
const writer = writable.getWriter();
### Call Types
// Write multiple messages to the server
const inputData1: JSONValue = { key1: "value1" };
const inputData2: JSONValue = { key2: "value2" };
await writer.write(inputData1);
await writer.write(inputData2);
#### Unary
// Close the writer when done
await writer.close();
In Unary calls, the client sends a single request to the server and receives a single response back, much like a regular async function call.
// Wait for the server's response
const serverOutput: JSONValue = await output;
console.log("Received from server:", serverOutput);
```
##### Handler
###### Server
On the server side, the handle function is an asynchronous function that takes an AsyncIterableIterator as input, representing the stream of incoming messages from the client. It returns a promise that resolves to the output that will be sent back to the client.
```ts
import { JSONValue } from "./types";
import type { ContextTimed } from '@matrixai/contexts';
const handle = async (
input: AsyncIterableIterator<JSONValue>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed
): Promise<JSONValue> {
// Aggregate or process the incoming messages from the client
let aggregateData: any = {}; // Replace 'any' with your specific type
for await (const incomingData of input) {
// Perform some aggregation or processing on incomingData
import type { JSONRPCParams, JSONRPCResult, JSONValue } from "@matrixai/rpc";
import { UnaryHandler } from "@matrixai/rpc";
class SquaredNumberUnary extends UnaryHandler<ContainerType, JSONRPCParams<{ value: number }>, JSONRPCResult<{ value: number }>> {
public handle = async (
input: JSONRPCParams<{ value: number }>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<JSONRPCResult<{ value: number }>> => {
return input.value**2;
};
}
// Generate the response to be sent back to the client
const outputData: JSONValue = { responseKey: "responseValue" };
return outputData;
};
```
#### Server Streaming
##### Caller
##### Client
The client initiates a server streaming RPC call using a method that takes input parameters and returns a readable stream. The client writes a single message and then reads multiple messages from the readable stream.
```ts
// Initialize the server streaming call
const readableStream: ReadableStream<JSONValue> = client.method();
import type { HandlerTypes } from "@matrixai/rpc";
import { UnaryCaller } from "@matrixai/rpc";
type CallerTypes = HandlerTypes<SquaredNumberUnary>;
const squaredNumber = new UnaryCaller<
CallerTypes['input'],
CallerTypes['output']
>();
```
const reader = readableStream.getReader();
##### Call-Site
// Read multiple messages from the server
while (true) {
const { value, done } = await reader.read();
if (done) break;
The client initiates a unary RPC call by invoking a method that returns a promise. It passes the required input parameters as arguments to the method. The client then waits for the promise to resolve, receiving the output.
console.log("Received from server:", value);
}
``` ts
await rpcClient.methods.squaredNumber({ value: 3 });
// returns { value: 9 }
```
##### Server
On the server side, the handle function is an asynchronous generator function that takes a single input parameter from the client. It yields multiple messages that will be sent back to the client through the readable stream.
```ts
import { JSONValue } from "./types";
import type { ContextTimed } from '@matrixai/contexts';
public handle = async function* (
input: JSONValue,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed
): AsyncIterableIterator<JSONValue> {
#### Client Streaming
// Process the input and prepare the data to be sent back to the client
const outputData1: JSONValue = { responseKey1: "responseValue1" };
const outputData2: JSONValue = { responseKey2: "responseValue2" };
In Client Streaming calls, the client can write multiple messages to a single stream, while the server reads from that stream and then returns a single response. This pattern is useful when the client needs to send a sequence of data to the server, after which the server processes the data and replies with a single result. This pattern is good for scenarios like file uploads.
// Yield multiple messages to be sent to the client
yield outputData1;
};
```
##### Handler
#### Unary Streaming
In a unary RPC, the client sends a single request to the server and receives a single response back, much like a regular function call.
##### Client
The client initiates a unary RPC call by invoking a method that returns a promise. It passes the required input parameters as arguments to the method. The client then waits for the promise to resolve, receiving the output.
``` ts
import type { JSONValue } from '../types';
import Caller from './Caller';
On the server side, the handle function is an asynchronous function that takes an AsyncIterableIterator as input, representing the stream of incoming messages from the client. It returns a promise that resolves to the output that will be sent back to the client.
// Initialize the unary RPC call with input parameters
const promise: Promise<JSONValue> = client.unaryCaller("methodName", { someParam: "someValue" });
```ts
import type { JSONRPCParams, JSONRPCResult, JSONValue } from "@matrixai/rpc";
import { ClientHandler } from "@matrixai/rpc";
class AccumulateClient extends ClientHandler<ContainerType, JSONRPCParams<{ value: number }>, JSONRPCResult<{ value: number }>> {
public handle = async (
input: AsyncIterableIterator<JSONRPCParams<{ value: number }>>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<JSONRPCResult<{ value: number }>> => {
let acc = 0;
for await (const number of input) {
acc += number.value;
}
return { value: acc };
};
}
```
// Wait for the response
const output: JSONValue = await promise;
console.log("Received from server:", output);
##### Caller
```ts
import type { HandlerTypes } from "@matrixai/rpc";
import { ClientCaller } from "@matrixai/rpc";
type CallerTypes = HandlerTypes<AccumulateClient>;
const accumulate = new ClientCaller<
CallerTypes['input'],
CallerTypes['output']
>();
```
##### Server
```ts
import { JSONValue } from "./types";
import { ContextTimed } from "./contexts"; // Assuming ContextTimed is imported from a module named 'contexts'
##### Call-Site
public handle = async (
input: JSONValue,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<JSONValue> {
The client initiates a client streaming RPC call using a method that returns a writable stream and a promise. The client writes to the writable stream and awaits the output promise to get the response.
// Process the input and prepare the data to be sent back to the client
const outputData: JSONValue = { responseKey: "responseValue" };
return outputData;
};
```ts
const { output, writable } = await rpcClient.methods.accumulate();
const writer = writabe.getWriter();
await writer.write({ value: 1 });
await writer.write({ value: 2 });
await writer.write({ value: 3 });
await writer.write({ value: 4 });
await writer.close();
await output;
// output resolves to { value: 10 }
```
### Usage Examples
#### Server Streaming
Because decorators are experimental, you must enable:
`"experimentalDecorators": true` in your `tsconfig.json` to use this library.
In Server Streaming calls,
the client sends a single request and receives multiple responses in a read-only stream from the server.
The server can keep pushing messages as long as it needs, allowing real-time updates from the server to the client.
This is useful for things like monitoring,
where the server needs to update the client in real-time based on events or data changes.
In this example, the client sends a number and the server responds with the squares of all numbers up to that number.
#### Client Stream
In a Client Stream, the client can write multiple messages to a single stream,
while the server reads from that stream and then returns a single response.
This pattern is useful when the client needs to send a sequence of data to the server,
after which the server processes the data and replies with a single result.
This pattern is good for scenarios like file uploads.
##### Handler
This example shows how to create an RPC pair and handle streaming integers and summing them up.
On the server side, the handle function is an asynchronous generator function that takes a single input parameter from the client. It yields multiple messages that will be sent back to the client through the readable stream.
```ts
import {
ContainerType,
JSONValue,
IdGen,
StreamFactory,
MiddlewareFactory, ClientManifest,
} from "@matrixai/rpc/dist/types";
import WebSocket = require('ws');
import {ClientHandler} from "@matrixai/rpc/dist/handlers";
import type { ContextTimed } from '@matrixai/contexts';
import RPCServer from '@matrixai/rpc/dist/RPCServer'
import RPCClient from '@matrixai/rpc/dist/RPCClient'
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import { createDestroy } from '@matrixai/async-init';
import {ClientCaller} from "@matrixai/rpc/dist/callers";
import {ReadableStream, WritableStream} from "stream/web";
import {ReadableStreamDefaultController} from "stream/web";
import type { JSONRPCParams, JSONRPCResult, JSONValue } from "@matrixai/rpc";
import { ServerHandler } from "@matrixai/rpc";
class CountServer extends ServerHandler<ContainerType, JSONRPCParams<{ value: number }>, JSONRPCResult<{ value: number }>> {
public handle = async function* (
input: JSONRPCParams<{ value: number }>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): AsyncIterableIterator<JSONRPCResult<{ value: number }>> {
for (let i = input.number; i < input.number + 5; i++) {
yield { value: i };
}
};
}
```
const logger = new Logger(`RPC Test`, LogLevel.WARN, [new StreamHandler()]);
##### Caller
let streamFactory: StreamFactory;
let middlewareFactory: MiddlewareFactory<any, any, any, any>;
let idGen: IdGen;
```ts
import type { HandlerTypes } from "@matrixai/rpc";
import { ServerCaller } from "@matrixai/rpc";
type CallerTypes = HandlerTypes<CountServer>;
const count = new ServerCaller<
CallerTypes['input'],
CallerTypes['output']
>();
```
class Sum extends ClientHandler<ContainerType, number, number> {
public handle = async (
input: AsyncIterable<number>,
): Promise<number> => {
let sum = 0;
console.log("Entering handle method on server.");
for await (const num of input) {
console.log(`Received number: ${num}`);
sum += num;
}
console.log(`Returning sum: ${sum}`);
return sum;
};
##### Call-Site
The client initiates a server streaming RPC call using a method that takes input parameters and returns a readable stream. The client writes a single message and then reads multiple messages from the readable stream.
```ts
const callerInterface = await rpcClient.methods.count({ value: 5 });
const numbers = [];
while (true) {
const { value, done } = await reader.read();
numbers.push(value.value);
if (done) break;
}
// numbers is [5, 6, 7, 8, 9]
```
// Server-side WebSocket setup
// Server-side setup
async function startServer() {
const rpcServer = new RPCServer({
logger: new Logger('rpc-server'),
timeoutTime: 60000,
idGen,
});
#### Duplex Stream
await rpcServer.start({
manifest: {
Sum: new Sum({}),
},
});
A Duplex Stream enables both the client and the server to read and write messages in their respective streams independently of each other. Both parties can read and write multiple messages in any order. It's useful in scenarios that require ongoing communication in both directions, like chat applications.
// Create synthetic streams here
const { readable, writable } = createSyntheticStreams();
rpcServer.handleStream({ readable, writable, cancel: () => {} });
##### Handler
return { rpcServer };
```ts
import type { JSONRPCParams, JSONRPCResult, JSONValue } from "@matrixai/rpc";
import { DuplexHandler } from "@matrixai/rpc";
class EchoDuplex extends DuplexHandler<ContainerType, JSONRPCParams, JSONRPCResult> {
public handle = async function* (
input: AsyncIterableIterator<JSONRPCParams<{ value: number }>>, // This is a generator.
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed
): AsyncIterableIterator<JSONRPCResult<{ value: number }>> {
for await (const incomingData of input) {
yield incomingData;
}
};
}
```
// Create synthetic streams
function createSyntheticStreams() {
const readable = new ReadableStream();
const writable = new WritableStream();
##### Caller
return { readable, writable };
}
```ts
import type { HandlerTypes } from "@matrixai/rpc";
import { ServerCaller } from "@matrixai/rpc";
type CallerTypes = HandlerTypes<EchoDuplex>;
const echo = new ServerCaller<
CallerTypes['input'],
CallerTypes['output']
>();
```
type Manifest = {
Sum: ClientCaller<number, number>;
};
// Client-side WebSocket setup
// Client-side setup
async function startClient() {
const { readable, writable } = createSyntheticStreams();
const rpcClient = new RPCClient({
manifest: {
Sum: new ClientCaller<number, number>(),
},
streamFactory: async () => ({ readable, writable, cancel: () => {} }),
middlewareFactory,
logger,
idGen,
});
##### Call-Site
return rpcClient;
}
The client initiates a duplex streaming RPC call using a method that returns both a readable and a writable stream. The client can read from the readable stream and write to the writable stream.
// Function to execute the Sum RPC call
async function executeSum(rpcClient: RPCClient<Manifest>) {
try {
const { output, writable } = await rpcClient.methods.Sum();
const writer = writable.getWriter();
await writer.write(5);
await writer.write(10);
await writer.close();
```ts
// Initialize the duplex call
const { readable, writable } = await rpcClient.methods.SquaredDuplex();
const ans = await output;
console.log(`Received output: ${ans}`);
console.log(`Sum is: ${ans}`);
} catch (error) {
console.error("Error in executeSum:", error);
}
}
// Get the reader and writer from the streams
const reader = readable.getReader();
const writer = writable.getWriter();
// Main function to tie everything together
async function main() {
try {
const serverObject = await startServer();
const rpcClient = await startClient();
// Write data to the server
const inputData: JSONObject = { someKey: "someValue" };
await writer.write(inputData);
await executeSum(rpcClient);
await serverObject.rpcServer.destroy();
} catch (err) {
console.error("An error occurred:", err);
}
}
const readResult = await reader.read();
main();
// readResult is { someKey: "someValue" }
```
![img.png](images/clientTest.png)
#### Raw Stream
Raw Stream is designed for low-level handling of RPC calls, enabling granular control over data streaming.
Unlike other patterns, Raw Streams allow both the server and client to work directly with raw data,
providing a more flexible yet complex way to handle communications.
This is especially useful when the RPC protocol itself needs customization
or when handling different types of data streams within the same connection.
#### Raw Streams
In this example, the client sends a sequence of numbers and the server responds with the factorial of each number.
```ts
import {
ContainerType,
JSONValue,
IdGen,
StreamFactory,
MiddlewareFactory, ClientManifest, JSONRPCRequest,
} from "@matrixai/rpc/dist/types";
import WebSocket = require('ws');
import { RawHandler} from "@matrixai/rpc/dist/handlers";
import type { ContextTimed } from '@matrixai/contexts';
import RPCServer from '@matrixai/rpc/dist/RPCServer'
import RPCClient from '@matrixai/rpc/dist/RPCClient'
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import { createDestroy } from '@matrixai/async-init';
import {RawCaller} from "@matrixai/rpc/dist/callers";
import {ReadableStream, WritableStream} from "stream/web";
import {ReadableStreamDefaultController} from "stream/web";
Raw Streams are designed for low-level handling of RPC calls, enabling granular control over data streaming. Unlike other patterns, Raw Streams allow both the server and client to work directly with raw data, providing a more flexible yet complex way to handle communications. This is especially useful when the RPC protocol itself needs customization or when handling different types of data streams within the same connection.
##### Handler
const logger = new Logger('RPC Test', LogLevel.WARN, [new StreamHandler()]);
let streamFactory: StreamFactory;
let middlewareFactory: MiddlewareFactory<any, any, any, any>;
let idGen: IdGen;
type Manifest = {
FactorialStream: RawCaller;
};
class FactorialStream extends RawHandler<ContainerType> {
```ts
import type { JSONRPCRequest, JSONValue } from "@matrixai/rpc";
import { RawHandler } from "@matrixai/rpc";
class FactorialRaw extends RawHandler<ContainerType> {
public handle = async (

@@ -398,2 +317,6 @@ [request, inputStream]: [JSONRPCRequest, ReadableStream<Uint8Array>],

(async () => {
function factorialOf(n: number): number {
return n === 0 ? 1 : n * factorialOf(n - 1);
}
const reader = inputStream.getReader();

@@ -419,492 +342,58 @@ const writer = writable.getWriter();

}
```
function factorialOf(n: number): number {
return n === 0 ? 1 : n * factorialOf(n - 1);
}
##### Caller
async function startServer() {
const rpcServer = new RPCServer({
timeoutTime: 200,
logger,
idGen,
});
await rpcServer.start({
manifest: {
FactorialStream: new FactorialStream({}),
},
});
// Create synthetic streams here
const { readable, writable } = createSyntheticStreams();
rpcServer.handleStream({ readable, writable, cancel: () => {} });
return { rpcServer };
}
// Create synthetic streams
function createSyntheticStreams() {
const readable = new ReadableStream();
const writable = new WritableStream();
return { readable, writable };
}
async function startClient() {
const { readable, writable } = createSyntheticStreams();
const rpcClient = new RPCClient({
manifest: {
FactorialStream: new RawCaller(),
},
streamFactory: async () => ({ readable, writable, cancel: () => {} }),
middlewareFactory,
logger,
idGen,
});
return rpcClient;
}
async function execute(rpcClient: RPCClient<Manifest>){
try{
// Initialize the FactorialStream RPC method
const { readable, writable, meta } = await rpcClient.methods.FactorialStream({timer: 200});
console.log('Meta:', meta); // Output meta information, should be 'Starting factorial computation'
// Create a writer for the writable stream
const writer = writable.getWriter();
// Send numbers 4, 5, 6, 8 to the server for factorial computation
for (const num of [4, 5, 6, 8]) {
const buffer = new TextEncoder().encode(num.toString());
await writer.write(buffer);
}
await writer.close();
// Create a reader for the readable stream
const reader = readable.getReader();
// Read the computed factorials from the server
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('Done reading from stream.');
process.exit(0);
break;
}
const factorialResult = new TextDecoder().decode(value).trim(); // Added trim() to remove any extra whitespace
console.log(`The factorial is: ${factorialResult}`);
}
}catch (error){
console.error("Error is :", error);
}
}
async function main() {
try {
const serverObject = await startServer();
const rpcClient = await startClient();
await execute(rpcClient);
await serverObject.rpcServer.destroy();
} catch (err) {
console.error("An error occurred:", err);
}
}
main();
```ts
import { RawCaller } from "@matrixai/rpc";
const factorial = new RawCaller();
```
![img.png](images/rawTest.png)
#### Server Stream
In Server Stream calls,
the client sends a single request and receives multiple responses in a read-only stream from the server.
The server can keep pushing messages as long as it needs, allowing real-time updates from the server to the client.
This is useful for things like monitoring,
where the server needs to update the client in real-time based on events or data changes.
In this example, the client sends a number and the server responds with the squares of all numbers up to that number.
```ts
import Logger, {LogLevel, StreamHandler} from "@matrixai/logger";
import {ContainerType, IdGen, JSONValue, MiddlewareFactory, StreamFactory} from "@matrixai/rpc/dist/types";
import {RawCaller, ServerCaller} from "@matrixai/rpc/dist/callers";
import {ServerHandler} from "@matrixai/rpc/dist/handlers";
import {ContextTimed} from "@matrixai/contexts";
import RPCServer from "@matrixai/rpc/dist/RPCServer";
import WebSocket = require('ws');
import {ReadableStream, ReadableStreamDefaultController, WritableStream} from "stream/web";
import RPCClient from "@matrixai/rpc/dist/RPCClient";
##### Call-Site
const logger = new Logger('Server Test', LogLevel.WARN, [new StreamHandler()]);
let streamFactory: StreamFactory;
let middlewareFactory: MiddlewareFactory<any, any, any, any>;
let idGen: IdGen;
class SquaredNumbers extends ServerHandler<ContainerType, number, number>{
public handle = async function* (
input: number,
):AsyncGenerator<number>{
for (let i = 0; i<= input; i++){
yield i*i;
}
};
}
type Manifest = {
SquaredNumbers: ServerCaller<number,number>;
}
// Create synthetic streams
function createSyntheticStreams() {
const readable = new ReadableStream();
const writable = new WritableStream();
return { readable, writable };
}
async function startServer() {
const rpcServer = new RPCServer({
logger,
idGen,
});
await rpcServer.start({
manifest: {
SquaredNumbers: new SquaredNumbers({}),
},
});
const { readable, writable } = createSyntheticStreams();
rpcServer.handleStream({ readable, writable, cancel: () => {} });
return { rpcServer };
}
async function startClient() {
return new Promise<RPCClient<Manifest>>((resolve, reject) => {
const { readable, writable } = createSyntheticStreams();
const rpcClient = new RPCClient<Manifest>({
manifest: {
SquaredNumbers: new ServerCaller<number, number>(),
},
streamFactory: async () => ({ readable, writable, cancel: () => {} }),
middlewareFactory,
logger,
idGen,
});
resolve(rpcClient);
});
}
async function execute(rpcClient: RPCClient<Manifest>) {
try {
const squaredStream = await rpcClient.methods.SquaredNumbers(235);
const outputs: Array<number> = [];
for await(const num of squaredStream) {
outputs.push(num);
}
console.log(`Squared numbers are: ${outputs.join(', ')}`);
} catch (error) {
console.error("Error in execute:", error);
}
}
async function main() {
try {
const serverObject = await startServer();
const rpcClient = await startClient();
await execute(rpcClient);
await serverObject.rpcServer.destroy();
} catch (err) {
console.log('An Error occurred: ', err)
}
}
main();
```
#### Duplex Stream
A Duplex Stream enables both the client and the server to read
and write messages in their respective streams independently of each other.
Both parties can read and write multiple messages in any order.
It's useful in scenarios that require ongoing communication in both directions, like chat applications.
In this example, the client sends a sequence of numbers and the server responds with the squares of those numbers.
```ts
import Logger, {LogLevel, StreamHandler} from "@matrixai/logger";
import {ContainerType, IdGen, JSONValue, MiddlewareFactory, StreamFactory} from "@matrixai/rpc/dist/types";
import {defaultMiddleware} from "@matrixai/rpc/dist/middleware";
import {ClientCaller, DuplexCaller} from "@matrixai/rpc/dist/callers";
import {DuplexHandler} from "@matrixai/rpc/dist/handlers";
import {ContextTimed} from "@matrixai/contexts";
import RPCServer from "@matrixai/rpc/dist/RPCServer";
import WebSocket = require('ws');
import {takeUntil} from "ix/Ix.dom.asynciterable.operators";
import RPCClient from "@matrixai/rpc/dist/RPCClient";
import {ReadableStream, ReadableStreamDefaultController, WritableStream} from "stream/web";
const { readable, writable, meta } = await rpcClient.methods.factorial();
console.log('Meta:', meta); // Output meta information, should be 'Starting factorial computation'
const logger = new Logger('Duplex Test', LogLevel.WARN, [new StreamHandler()]);
// Create a writer for the writable stream
const writer = writable.getWriter();
let streamFactory: StreamFactory;
let middlewareFactory: MiddlewareFactory<any, any, any, any>;
let idGen: IdGen;
class SquaredDuplex extends DuplexHandler<ContainerType, number, Array<number>>{
public handle = async function*(
input: AsyncIterableIterator<number>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): AsyncIterableIterator<Array<number>>{
for await (const num of input){
const squares: Array<number> = []
for(let i =1; i<=num; i++){
squares.push(i*i);
}
yield squares;
}
};
// Send numbers 4, 5, 6, 8 to the server for factorial computation
for (const num of [4, 5, 6, 8]) {
const buffer = new TextEncoder().encode(num.toString());
await writer.write(buffer);
}
await writer.close();
type Manifest = {
SquaredDuplex: DuplexCaller<number, Array<number>>;
};
// Create a reader for the readable stream
const reader = readable.getReader();
async function startServer() {
const wss = new WebSocket.Server({ port: 8080 });
const rpcServer = new RPCServer({
logger: new Logger('rpc-server'),
timeoutTime: 1000,
idGen,
});
rpcServer.start({
manifest: {
SquaredDuplex: new SquaredDuplex({}),
},
});
wss.on('connection', (ws) => {
const { readable, writable } = wsToStream(ws);
rpcServer.handleStream({ readable, writable, cancel: () => {} });
});
return { rpcServer };
}
async function startClient() {
return new Promise<RPCClient<Manifest>>( (resolve, reject) => {
const ws = new WebSocket('ws://localhost:8080');
ws.addEventListener('open', async () => {
const { readable, writable } = wsToStream(ws);
const rpcClient = new RPCClient({
manifest: {
SquaredDuplex: new DuplexCaller<number, Array<number>>(),
},
streamFactory: async () => ({ readable, writable, cancel: () => {} }),
middlewareFactory,
logger,
idGen,
});
resolve(rpcClient);
});
ws.addEventListener('error', (err) => {
reject(err);
});
});
}
function wsToStream(ws: WebSocket): { readable: ReadableStream, writable: WritableStream } {
let readableController: ReadableStreamDefaultController<any> | null = null;
const readable = new ReadableStream({
start(controller) {
readableController = controller;
},
cancel() {
ws.close();
},
});
ws.on('message', (chunk: any) => {
readableController?.enqueue(chunk);
});
ws.on('close', () => {
readableController?.close();
});
const writable = new WritableStream({
write(chunk) {
ws.send(chunk);
},
close() {
ws.close();
},
abort() {
ws.close();
},
});
return { readable, writable };
}
// Client-side duplex caller
async function executeSquareNumbersDuplex(rpcClient: RPCClient<Manifest>) {
try{const { readable, writable } = await rpcClient.methods.SquaredDuplex();
const writer = writable.getWriter();
await writer.write(2);
await writer.write(3);
await writer.write(4);
// Read squared numbers from the server
for await (const squares of readable) {
console.log(`Squares up to n are: ${squares.join(", ")}`);
}
await writer.close();
}catch (e){
console.log(e)
// Read the computed factorials from the server
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('Done reading from stream.');
process.exit(0);
break;
}
const factorialResult = new TextDecoder().decode(value).trim(); // Added trim() to remove any extra whitespace
console.log(`The factorial is: ${factorialResult}`);
}
// Main function to tie everything together
async function main() {
try {
const serverObject = await startServer();
const rpcClient = await startClient();
await executeSquareNumbersDuplex(rpcClient); // Add this line to run the duplex caller
await serverObject.rpcServer.destroy();
} catch (err) {
console.error("An error occurred:", err);
}
}
main();
```
![img.png](images/duplexTest.png)
#### Unary Stream
In a Unary Stream, the client sends a single request to the server and gets a single response back,
just like HTTP/REST calls but over a connection.
It's the simplest form of RPC, suitable for short-lived operations that don't require streaming data.
It's the go-to choice for straightforward "request and response" interactions.
## Specifications
In this example, the client sends a number and the server responds with the square of that number.
```ts
import {
ContainerType,
JSONValue,
IdGen,
StreamFactory,
MiddlewareFactory, ClientManifest,
} from "@matrixai/rpc/dist/types";
import WebSocket = require('ws');
import {ClientHandler, UnaryHandler} from "@matrixai/rpc/dist/handlers";
import type { ContextTimed } from '@matrixai/contexts';
import RPCServer from '@matrixai/rpc/dist/RPCServer'
import RPCClient from '@matrixai/rpc/dist/RPCClient'
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import { createDestroy } from '@matrixai/async-init';
import {ClientCaller, UnaryCaller} from "@matrixai/rpc/dist/callers";
import {ReadableStream, WritableStream} from "stream/web";
import {ReadableStreamDefaultController} from "stream/web";
### Timeouts
const logger = new Logger(`RPC Test`, LogLevel.WARN, [new StreamHandler()]);
Whenever the time between the initial message and the following subsequent message of an RPC call exceeds a defined timeout time, the RPC call will have timed out.
let streamFactory: StreamFactory;
let middlewareFactory: MiddlewareFactory<any, any, any, any>;
let idGen: IdGen;
For Unary calls, this is similar to the timeout of a response after sending a request.
class SquaredNumberUnary extends UnaryHandler<ContainerType, number, number> {
public handle = async (
input: number,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<number> => {
return input * input;
};
}
// Create synthetic streams
function createSyntheticStreams() {
const readable = new ReadableStream();
const writable = new WritableStream();
return { readable, writable };
}
// Server-side setup
async function startServer() {
const rpcServer = new RPCServer({
logger: new Logger('rpc-server'),
timeoutTime: 1000,
idGen,
});
If the client were to time out, the stream is forcibly closed and `ErrorRPCTimedOut` is thrown from the call.
await rpcServer.start({
manifest: {
SquaredDuplex: new SquaredDuplex({}),
},
});
If the server were to time out, is is advisory. Meaning that the server may choose to optionally eagerly throw `ErrorRPCTimedOut`, or continue processing as normal.
// Replace WebSocket with synthetic stream
const { readable, writable } = createSyntheticStreams();
rpcServer.handleStream({ readable, writable, cancel: () => {} });
#### Throwing Timeouts Server-Side
return { rpcServer };
}
// Client-side setup
async function startClient() {
return new Promise<RPCClient<Manifest>>((resolve, reject) => {
const { readable, writable } = createSyntheticStreams();
const rpcClient = new RPCClient({
manifest: {
SquaredDuplex: new DuplexCaller<number, Array<number>>(),
},
streamFactory: async () => ({ readable, writable, cancel: () => {} }),
middlewareFactory,
logger,
idGen,
});
resolve(rpcClient);
});
}
// Function to execute the Sum RPC call
async function executeSquare(rpcClient: RPCClient<Manifest>) {
try {
// Sending a number (e.g., 4) to be squared
const squaredNumber = await rpcClient.methods.SquaredNumberUnary(4);
// Log the squared number
console.log(`Squared number is: ${squaredNumber}`);
} catch (error) {
// Handle any errors
console.error(`An error occurred while executing SquaredNumberUnary: ${error}`);
}
}
// Main function to tie everything together
async function main() {
try {
const serverObject = await startServer();
const rpcClient = await startClient();
await executeSquare(rpcClient);
await serverObject.rpcServer.destroy();
} catch (err) {
console.error("An error occurred:", err);
}
}
main();
```
![img.png](images/unaryTest.png)
## Specifications
### Throwing Timeouts
By default, a timeout will not cause an RPC call to automatically throw, this must be manually done by the handler when it receives the abort signal from `ctx.signal`. An example of this is like so:

@@ -930,3 +419,3 @@

### Timeout Priority
#### Priority of Timeout Options

@@ -958,5 +447,5 @@ A `timeoutTime` can be passed both to the constructors of `RPCServer` and `RPCClient`. This is the default `timeoutTime` for all callers/handlers.

It's important to note that any of these timeouts will ultimately be overridden by the shortest timeout of the server and client combined using the timeout middleware below.
However, it's important to note that any of these timeouts may ultimately be overridden by the shortest timeout of the server and client combined using the timeout middleware below.
### Timeout Middleware
#### Timeout Middleware

@@ -963,0 +452,0 @@ The `timeoutMiddleware` sets an RPCServer's timeout based on the lowest timeout between the Client and the Server. This is so that handlers can eagerly time out and stop processing as soon as it is known that the client has timed out.

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc