Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@matrixai/rpc

Package Overview
Dependencies
Maintainers
3
Versions
28
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@matrixai/rpc - npm Package Compare versions

Comparing version 0.1.8 to 0.1.9

images/clientTest.png

2

dist/RPCClient.d.ts

@@ -46,3 +46,3 @@ /// <reference types="node" />

logger?: Logger;
idGen: IdGen;
idGen?: IdGen;
}): Promise<RPCClient<M>>;

@@ -49,0 +49,0 @@ protected onTimeoutCallback?: () => void;

import type { ClientHandlerImplementation, DuplexHandlerImplementation, JSONRPCRequest, JSONRPCResponse, JSONRPCResponseResult, ServerManifest, RawHandlerImplementation, ServerHandlerImplementation, UnaryHandlerImplementation, RPCStream, MiddlewareFactory } from './types';
import type { JSONValue } from './types';
import type { IdGen } from './types';
import type { ErrorRPC, ErrorRPCRemote } from './errors';
import type { ErrorRPC } from './errors';
import Logger from '@matrixai/logger';

@@ -46,3 +46,3 @@ import { PromiseCancellable } from '@matrixai/async-cancellable';

static createRPCServer({ manifest, middlewareFactory, handlerTimeoutTime, // 1 minute
logger, idGen, fromError, filterSensitive, toError, }: {
logger, idGen, fromError, filterSensitive, }: {
manifest: ServerManifest;

@@ -52,6 +52,5 @@ middlewareFactory?: MiddlewareFactory<JSONRPCRequest, Uint8Array, Uint8Array, JSONRPCResponse>;

logger?: Logger;
idGen: IdGen;
idGen?: IdGen;
fromError?: (error: ErrorRPC<any>) => JSONValue;
filterSensitive?: (key: string, value: any) => any;
toError?: (errorResponse: any, metadata?: any) => ErrorRPCRemote<any>;
}): Promise<RPCServer>;

@@ -67,6 +66,5 @@ protected onTimeoutCallback?: () => void;

protected filterSensitive: (key: string, value: any) => any;
protected toError: (errorResponse: any, metadata?: any) => ErrorRPCRemote<any>;
protected middlewareFactory: MiddlewareFactory<JSONRPCRequest, Uint8Array, Uint8Array, JSONRPCResponseResult>;
registerOnTimeoutCallback(callback: () => void): void;
constructor({ manifest, middlewareFactory, handlerTimeoutTime, logger, idGen, fromError, filterSensitive, toError, }: {
constructor({ manifest, middlewareFactory, handlerTimeoutTime, logger, idGen, fromError, filterSensitive, }: {
manifest: ServerManifest;

@@ -79,3 +77,2 @@ middlewareFactory: MiddlewareFactory<JSONRPCRequest, Uint8Array, Uint8Array, JSONRPCResponseResult>;

filterSensitive?: (key: string, value: any) => any;
toError?: (errorResponse: any, metadata?: any) => ErrorRPCRemote<any>;
});

@@ -82,0 +79,0 @@ destroy(force?: boolean): Promise<void>;

@@ -82,3 +82,3 @@ "use strict";

static async createRPCServer({ manifest, middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(), handlerTimeoutTime = Infinity, // 1 minute
logger = new logger_1.default(this.name), idGen = () => Promise.resolve(null), fromError = rpcUtils.fromError, filterSensitive = rpcUtils.filterSensitive, toError = rpcUtils.toError, }) {
logger = new logger_1.default(this.name), idGen = () => Promise.resolve(null), fromError = rpcUtils.fromError, filterSensitive = rpcUtils.filterSensitive, }) {
logger.info(`Creating ${this.name}`);

@@ -93,3 +93,2 @@ const rpcServer = new this({

filterSensitive,
toError,
});

@@ -108,3 +107,2 @@ logger.info(`Created ${this.name}`);

filterSensitive;
toError;
middlewareFactory;

@@ -115,3 +113,3 @@ // Function to register a callback for timeout

}
constructor({ manifest, middlewareFactory, handlerTimeoutTime = Infinity, logger, idGen = () => Promise.resolve(null), fromError = rpcUtils.fromError, filterSensitive = rpcUtils.filterSensitive, toError = rpcUtils.toError, }) {
constructor({ manifest, middlewareFactory, handlerTimeoutTime = Infinity, logger, idGen = () => Promise.resolve(null), fromError = rpcUtils.fromError, filterSensitive = rpcUtils.filterSensitive, }) {
super();

@@ -151,3 +149,2 @@ for (const [key, manifestItem] of Object.entries(manifest)) {

this.filterSensitive = filterSensitive || rpcUtils.filterSensitive;
this.toError = toError || rpcUtils.toError;
}

@@ -154,0 +151,0 @@ async destroy(force = true) {

{
"name": "@matrixai/rpc",
"version": "0.1.8",
"version": "0.1.9",
"author": "Matrix AI",

@@ -5,0 +5,0 @@ "contributors": [

@@ -11,187 +11,372 @@ # js-rpc

```
## Usage Examples
Because decorators are experimental, you must enable: `"experimentalDecorators": true` in your `tsconfig.json` to use this library.
## Usage
### Client Stream
In a Client Stream, the client can write multiple messages to a single stream,
while the server reads from that stream and then returns a single response.
This pattern is useful when the client needs to send a sequence of data to the server,
after which the server processes the data and replies with a single result.
This pattern is good for scenarios like file uploads.
### Overview of different streams
This example shows how to create an RPC pair and handle streaming integers and summing them up.
#### Duplex Stream ->
##### Client:
The client initiates a duplex streaming RPC call using a method that returns both a readable and a writable stream. The client can read from the readable stream and write to the writable stream.
```ts
import {RPCServer, ClientHandler, ContainerType, RPCClient, ClientCaller} from "./index";
import {AsyncIterable} from "ix/Ix";
import { JSONValue } from "./types";
import Caller from './Caller';
const webSocketServer = awwait
// Initialize the duplex call
const { ReadableStream, WritableStream } = client.method();
class Sum extends ClientHandler<ContainerType, number, number> {
public handle = async (
// handle takes input an AsyncIterable, which is a generator,
// would produce numbers input in the Writeable stream
input: AsyncIterable<number>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<number> => {
let sum = 0;
for await (const num of input) {
sum += num;
}
return acc;
};
// Get the reader and writer from the streams
const reader = ReadableStream.getReader();
const writer = WritableStream.getWriter();
// Read output from the server
const readResult = await reader.read();
if (!readResult.done) {
const output: JSONValue = readResult.value;
console.log("Received from server:", output);
}
// Write data to the server
const inputData: JSONValue = { someKey: "someValue" };
await writer.write(inputData);
// Don't forget to close the writer when you're done
await writer.close();
```
##### Server :
```ts
import type { ContainerType, JSONValue } from '../types';
import type { ContextTimed } from '@matrixai/contexts';
import Handler from './Handler';
// Define the handler as an async generator function
const handle = async function* (
input: AsyncIterableIterator<JSONValue>, // This is a generator.
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed
): AsyncIterableIterator<JSONValue> {
// Loop through the incoming stream of messages
for await (const incomingData of input) {
console.log("Received from client:", incomingData);
// Perform some operation on the incoming data
const outputData: JSONValue = { responseKey: "responseValue" };
// Yield data back to the client
yield outputData;
// We yield since stream can contain multiple messages.
}
};
// Setting up an instance of RPC Client with Sum as the handler method
async function startServer() {
```
#### Client Streaming
const rpcServer = await RPCServer.createRPCServer({
manifest: {
Sum: new Sum({}),
},
logger,
idGen,
handlerTimeoutTime: 60000,
});
// Simulating receiving a stream from a client.
// Provided by network layer
const simulatedStream = sendStreamHere;
##### Client
The client initiates a client streaming RPC call using a method that returns a writable stream and a promise. The client writes to the writable stream and awaits the output promise to get the response.
rpcServer.handleStream(simulatedStream);
return rpcServer;
}
```ts
{ output: Promise<JSONValue>, WriteableStream<JSONValue>} = client.method();
const writer = WritableStream.getWriter();
writer.write();
const Output = await output;
```
##### Server
On the server side, the handle function is defined as an asynchronous generator function. It takes an AsyncIterableIterator as input, which represents the stream of incoming messages from the client. It yields output back to the client as needed.
```ts
// Initialize the client streaming call
const { output, writable } = client.method();
const writer = writable.getWriter();
async function startClient() {
// Simulate client-server pair of streams.
// Simulating network stream
const clientPair =sendStreamHere /* your logic for creating or obtaining a client-side stream */;
const rpcClient = await RPCClient.createRPCClient({
manifest: {
Sum: new ClientCaller<number, number>(),
},
streamFactory,
middlewareFactory,
logger,
idGen
})
// Write multiple messages to the server
const inputData1: JSONValue = { key1: "value1" };
const inputData2: JSONValue = { key2: "value2" };
await writer.write(inputData1);
await writer.write(inputData2);
return rpcClient;
}
// Function to execute the Sum RPC call
async function executeSum(rpcClient: typeof RPCClient){
const { output, writable } = await rpcClient.methods.Sum();
const writer = writable.getWriter();
await writer.write(5);
await writer.write(10);
await writer.close();
// Close the writer when done
await writer.close();
const ans = await output;
// Wait for the server's response
const serverOutput: JSONValue = await output;
console.log("Received from server:", serverOutput);
```
console.log('Sum is: $(ans)');
###### Server
On the server side, the handle function is an asynchronous function that takes an AsyncIterableIterator as input, representing the stream of incoming messages from the client. It returns a promise that resolves to the output that will be sent back to the client.
```ts
import { JSONValue } from "./types";
import type { ContextTimed } from '@matrixai/contexts';
const handle = async (
input: AsyncIterableIterator<JSONValue>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed
): Promise<JSONValue> {
// Aggregate or process the incoming messages from the client
let aggregateData: any = {}; // Replace 'any' with your specific type
for await (const incomingData of input) {
// Perform some aggregation or processing on incomingData
}
// Main function to tie everything together
async function main(){
const rpcServer = await startServer();
const rpcClient = await startClient();
await executeSum(rpcClient);
// Generate the response to be sent back to the client
const outputData: JSONValue = { responseKey: "responseValue" };
await rpcServer.destroy();
await rpcClient.destroy();
return outputData;
};
```
#### Server Streaming
##### Client
The client initiates a server streaming RPC call using a method that takes input parameters and returns a readable stream. The client writes a single message and then reads multiple messages from the readable stream.
```ts
// Initialize the server streaming call
const readableStream: ReadableStream<JSONValue> = client.method();
const reader = readableStream.getReader();
// Read multiple messages from the server
while (true) {
const { value, done } = await reader.read();
if (done) break;
console.log("Received from server:", value);
}
```
##### Server
On the server side, the handle function is an asynchronous generator function that takes a single input parameter from the client. It yields multiple messages that will be sent back to the client through the readable stream.
```ts
import { JSONValue } from "./types";
import type { ContextTimed } from '@matrixai/contexts';
main();
public handle = async function* (
input: JSONValue,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed
): AsyncIterableIterator<JSONValue> {
// Process the input and prepare the data to be sent back to the client
const outputData1: JSONValue = { responseKey1: "responseValue1" };
const outputData2: JSONValue = { responseKey2: "responseValue2" };
// Yield multiple messages to be sent to the client
yield outputData1;
};
```
### Duplex Stream
A Duplex Stream enables both the client and the server to read
and write messages in their respective streams independently of each other.
Both parties can read and write multiple messages in any order.
It's useful in scenarios that require ongoing communication in both directions, like chat applications.
In this example, the client sends a sequence of numbers and the server responds with the squares of those numbers.
#### Unary Streaming
In a unary RPC, the client sends a single request to the server and receives a single response back, much like a regular function call.
##### Client
The client initiates a unary RPC call by invoking a method that returns a promise. It passes the required input parameters as arguments to the method. The client then waits for the promise to resolve, receiving the output.
``` ts
import type { JSONValue } from '../types';
import Caller from './Caller';
// Initialize the unary RPC call with input parameters
const promise: Promise<JSONValue> = client.unaryCaller("methodName", { someParam: "someValue" });
// Wait for the response
const output: JSONValue = await promise;
console.log("Received from server:", output);
```
##### Server
```ts
import { RPCServer, DuplexHandler, ContainerType } from "./index";
import { JSONValue } from "./types";
import { ContextTimed } from "./contexts"; // Assuming ContextTimed is imported from a module named 'contexts'
class SquareNumbersDuplex extends DuplexHandler<ContainerType, number, Array<number>> {
public handle = async function* (
input: AsyncIterableIterator<number>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): AsyncIterableIterator<Array<number>> {
for await (const num of input) {
const squares: Array<number> = [];
for (let i = 1; i <= num; i++) {
squares.push(i * i);
}
yield squares;
}
};
public handle = async (
input: JSONValue,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<JSONValue> {
// Process the input and prepare the data to be sent back to the client
const outputData: JSONValue = { responseKey: "responseValue" };
return outputData;
};
```
### Usage Examples
Because decorators are experimental, you must enable:
`"experimentalDecorators": true` in your `tsconfig.json` to use this library.
#### Client Stream
In a Client Stream, the client can write multiple messages to a single stream,
while the server reads from that stream and then returns a single response.
This pattern is useful when the client needs to send a sequence of data to the server,
after which the server processes the data and replies with a single result.
This pattern is good for scenarios like file uploads.
This example shows how to create an RPC pair and handle streaming integers and summing them up.
```ts
import {
ContainerType,
JSONValue,
IdGen,
StreamFactory,
MiddlewareFactory, ClientManifest,
} from "@matrixai/rpc/dist/types";
import WebSocket = require('ws');
import {ClientHandler} from "@matrixai/rpc/dist/handlers";
import type { ContextTimed } from '@matrixai/contexts';
import RPCServer from '@matrixai/rpc/dist/RPCServer'
import RPCClient from '@matrixai/rpc/dist/RPCClient'
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import { createDestroy } from '@matrixai/async-init';
import {ClientCaller} from "@matrixai/rpc/dist/callers";
import {ReadableStream, WritableStream} from "stream/web";
import {ReadableStreamDefaultController} from "stream/web";
const logger = new Logger(`RPC Test`, LogLevel.WARN, [new StreamHandler()]);
let streamFactory: StreamFactory;
let middlewareFactory: MiddlewareFactory<any, any, any, any>;
let idGen: IdGen;
class Sum extends ClientHandler<ContainerType, number, number> {
public handle = async (
input: AsyncIterable<number>,
): Promise<number> => {
let sum = 0;
console.log("Entering handle method on server.");
for await (const num of input) {
console.log(`Received number: ${num}`);
sum += num;
}
console.log(`Returning sum: ${sum}`);
return sum;
};
}
// Server-side WebSocket setup
async function startServer() {
const rpcServer = await RPCServer.createRPCServer({
manifest: {
SquareNumbersDuplex: new SquareNumbersDuplex({}),
},
logger,
idGen,
});
const simulatedStream = sendStreamHere/* your logic for creating or obtaining a server-side stream */;
rpcServer.handleStream(simulatedStream);
const wss = new WebSocket.Server({ port: 8080 });
const rpcServer = await RPCServer.createRPCServer({
manifest: {
Sum: new Sum({}),
},
logger: new Logger('rpc-server'),
handlerTimeoutTime: 60000,
idGen,
});
return rpcServer;
wss.on('connection', (ws) => {
const { readable, writable } = wsToStream(ws);
rpcServer.handleStream({ readable, writable, cancel: () => {} });
});
return { rpcServer };
}
type Manifest = {
Sum: ClientCaller<number, number>;
};
// Client-side WebSocket setup
async function startClient() {
return new Promise<RPCClient<Manifest>>( (resolve, reject) => {
const ws = new WebSocket('ws://localhost:8080');
// Run the server
startServer();
import { RPCClient, DuplexCaller } from "./index";
ws.addEventListener('open', async () => {
const { readable, writable } = wsToStream(ws);
const rpcClient = await RPCClient.createRPCClient({
manifest: {
Sum: new ClientCaller<number, number>(),
},
streamFactory: async () => ({ readable, writable, cancel: () => {} }), middlewareFactory,
logger,
idGen,
});
resolve(rpcClient);
});
async function startClient() {
const rpcClient = await RPCClient.createRPCClient({
manifest: {
SquareNumbersDuplex: new DuplexCaller<number, Array<number>>(),
},
streamFactory,
middlewareFactory,
logger,
idGen,
});
ws.addEventListener('error', (err) => {
reject(err);
});
});
}
const squareStream = await rpcClient.methods.SquareNumbersDuplex();
function wsToStream(ws: WebSocket): { readable: ReadableStream, writable: WritableStream } {
let readableController: ReadableStreamDefaultController<any> | null = null;
// Write to the server
const writer = squareStream.writable.getWriter();
writer.write(2);
writer.write(3);
writer.write(4);
const readable = new ReadableStream({
start(controller) {
readableController = controller;
},
cancel() {
ws.close();
},
});
// Read squared numbers from the server
for await (const squares of squareStream.readable) {
console.log(`Squares up to n are: ${squares.join(", ")}`);
}
ws.on('message', (chunk: any) => {
readableController?.enqueue(chunk);
});
writer.close();
ws.on('close', () => {
readableController?.close();
});
return rpcClient;
const writable = new WritableStream({
write(chunk) {
ws.send(chunk);
},
close() {
ws.close();
},
abort() {
ws.close();
},
});
return { readable, writable };
}
// Function to execute the Sum RPC call
// Function to execute the Sum RPC call
async function executeSum(rpcClient: RPCClient<Manifest>) {
try {
const { output, writable } = await rpcClient.methods.Sum();
const writer = writable.getWriter();
await writer.write(5);
await writer.write(10);
await writer.close();
// Run the client
startClient();
async function main(){
const rpcServer = await startServer();
const rpcClient = await startClient();
const ans = await output;
console.log(`Received output: ${ans}`);
console.log(`Sum is: ${ans}`);
} catch (error) {
console.error("Error in executeSum:", error);
}
}
await rpcServer.destroy();
await rpcClient.destroy();
// Main function to tie everything together
async function main() {
try {
const serverObject = await startServer();
const rpcClient = await startClient();
await executeSum(rpcClient);
await rpcClient.destroy();
await serverObject.rpcServer.destroy();
} catch (err) {
console.error("An error occurred:", err);
}
}
// Run the main function to kick off the example
main();
```
### Raw Stream
![img.png](images/clientTest.png)
#### Raw Stream

@@ -205,109 +390,198 @@ Raw Stream is designed for low-level handling of RPC calls, enabling granular control over data streaming.

In this example, the client sends a sequence of numbers and the server responds with the factorial of each number.
```ts
import {RawHandler, JSONRPCRequest, ContextTimed, JSONValue} from '../types';
import {ContainerType} from "./types";
import RPCServer from "./RPCServer"; // Assuming these are imported correctly
import {
ContainerType,
JSONValue,
IdGen,
StreamFactory,
MiddlewareFactory, ClientManifest, JSONRPCRequest,
} from "@matrixai/rpc/dist/types";
import WebSocket = require('ws');
import { RawHandler} from "@matrixai/rpc/dist/handlers";
import type { ContextTimed } from '@matrixai/contexts';
import RPCServer from '@matrixai/rpc/dist/RPCServer'
import RPCClient from '@matrixai/rpc/dist/RPCClient'
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import { createDestroy } from '@matrixai/async-init';
import {RawCaller} from "@matrixai/rpc/dist/callers";
import {ReadableStream, WritableStream} from "stream/web";
import {ReadableStreamDefaultController} from "stream/web";
const logger = new Logger('RPC Test', LogLevel.WARN, [new StreamHandler()]);
let streamFactory: StreamFactory;
let middlewareFactory: MiddlewareFactory<any, any, any, any>;
let idGen: IdGen;
type Manifest = {
FactorialStream: RawCaller;
};
class FactorialStream extends RawHandler<ContainerType> {
public async handle(
[request, inputStream]: [JSONRPCRequest, ReadableStream<Uint8Array>],
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<[JSONValue, ReadableStream<Uint8Array>]> {
public handle = async (
[request, inputStream]: [JSONRPCRequest, ReadableStream<Uint8Array>],
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed
): Promise<[JSONValue, ReadableStream<Uint8Array>]> => {
const { readable, writable } = new TransformStream<Uint8Array, Uint8Array>();
const {readable, writable} = new TransformStream<Uint8Array, Uint8Array>();
(async () => {
const reader = inputStream.getReader();
const writer = writable.getWriter();
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
(async () => {
const reader = inputStream.getReader();
const writer = writable.getWriter();
while (true) {
const {done, value} = await reader.read();
if (done) {
break;
}
const num = parseInt(new TextDecoder().decode(value), 10);
const factorial = factorialOf(num).toString();
const outputBuffer = new TextEncoder().encode(factorial);
const num = parseInt(new TextDecoder().decode(value), 10);
const factorial = factorialOf(num).toString();
const outputBuffer = new TextEncoder().encode(factorial);
writer.write(outputBuffer);
}
writer.close();
})();
writer.write(outputBuffer);
}
writer.close();
})();
return ["Starting factorial computation", readable];
}
return ['Starting factorial computation', readable as ReadableStream<Uint8Array>]; }
}
function factorialOf(n: number): number {
return (n === 0) ? 1 : n * factorialOf(n - 1);
return n === 0 ? 1 : n * factorialOf(n - 1);
}
async function startSErver() {
const rpcServer = await RPCServer.createRPCServer({
manifest: {
FactorialStream: new FactorialStream({}),
},
logger,
idGen,
});
async function startServer() {
const wss = new WebSocket.Server({ port: 1221 });
const rpcServer = await RPCServer.createRPCServer({
manifest: {
FactorialStream: new FactorialStream({}),
},
handlerTimeoutTime: 200,
logger,
idGen,
});
const simulatedStream = sendStreamHere/* your logic for creating or obtaining a server-side stream */;
rpcServer.handleStream(simulatedStream);
return rpcServer;
wss.on('connection', (ws) => {
const { readable, writable } = wsToStream(ws);
rpcServer.handleStream({ readable, writable, cancel: () => {} });
});
return { rpcServer };
}
async function startClient() {
const rpcClient = await RPCClient.createRPCClient({
manifest: {
FactorialStream: new RawCaller(),
},
streamFactory,
middlewareFactory,
logger,
idGen,
});
return new Promise<RPCClient<Manifest>>((resolve, reject) => {
const ws = new WebSocket('ws://localhost:1221');
const { readable, writable, meta } = await rpcClient.methods.FactorialRaw({});
ws.addEventListener('open', async () => {
const { readable, writable } = wsToStream(ws);
const rpcClient = await RPCClient.createRPCClient({
manifest: {
FactorialStream: new RawCaller(),
},
streamFactory: async () => ({ readable, writable, cancel: () => {} }),
middlewareFactory,
logger,
idGen,
});
resolve(rpcClient);
});
ws.addEventListener('error', (err) => {
reject(err);
});
});
}
//Printing 'Starting factorial computation'
console.log(meta);
function wsToStream(ws: WebSocket): { readable: ReadableStream, writable: WritableStream } {
let readableController: ReadableStreamDefaultController<any> | null = null;
const readable = new ReadableStream({
start(controller) {
readableController = controller;
},
cancel() {
ws.close();
},
});
const writer = writable.getWriter();
ws.on('message', (chunk: any) => {
readableController?.enqueue(chunk);
});
const numbers = [1, 2, 3, 4, 5];
ws.on('close', () => {
readableController?.close();
});
for (const num of numbers){
writer.write(new TextEncoder().encode(num.toString()));
}
writer.close();
const writable = new WritableStream({
write(chunk) {
ws.send(chunk);
},
close() {
ws.close();
},
abort() {
ws.close();
},
});
return { readable, writable };
}
const reader = readable.getReader();
while (true) {
const {done, value} = await reader.read();
if (done) {
break;
}
console.log('The factorial is: $(new TextDecoder().decode(value))');
}
return rpcClient;
async function execute(rpcClient: RPCClient<Manifest>){
try{
// Initialize the FactorialStream RPC method
const { readable, writable, meta } = await rpcClient.methods.FactorialStream({timer: 200});
console.log('Meta:', meta); // Output meta information, should be 'Starting factorial computation'
// Create a writer for the writable stream
const writer = writable.getWriter();
// Send numbers 4, 5, 6, 8 to the server for factorial computation
for (const num of [4, 5, 6, 8]) {
const buffer = new TextEncoder().encode(num.toString());
await writer.write(buffer);
}
await writer.close();
// Create a reader for the readable stream
const reader = readable.getReader();
// Read the computed factorials from the server
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('Done reading from stream.');
process.exit(0);
break;
}
const factorialResult = new TextDecoder().decode(value).trim(); // Added trim() to remove any extra whitespace
console.log(`The factorial is: ${factorialResult}`);
}
}catch (error){
console.error("Error is :", error);
}
}
async function main(){
const rpcServer = await startServer();
const rpcClient = await startClient();
async function main() {
try {
const serverObject = await startServer();
const rpcClient = await startClient();
await rpcServer.destroy();
await rpcClient.destroy();
await execute(rpcClient);
await rpcClient.destroy();
await serverObject.rpcServer.destroy();
} catch (err) {
console.error("An error occurred:", err);
}
}
// Run the main function to kick off the example
main();
```
### Server Stream
![img.png](images/rawTest.png)
#### Server Stream
In Server Stream calls,

@@ -318,79 +592,312 @@ the client sends a single request and receives multiple responses in a read-only stream from the server.

where the server needs to update the client in real-time based on events or data changes.
In this example, the client sends a number and the server responds with the squares of all numbers up to that number.
```ts
import Logger, {LogLevel, StreamHandler} from "@matrixai/logger";
import {ContainerType, IdGen, JSONValue, MiddlewareFactory, StreamFactory} from "@matrixai/rpc/dist/types";
import {RawCaller, ServerCaller} from "@matrixai/rpc/dist/callers";
import {ServerHandler} from "@matrixai/rpc/dist/handlers";
import {ContextTimed} from "@matrixai/contexts";
import RPCServer from "@matrixai/rpc/dist/RPCServer";
import WebSocket = require('ws');
import {ReadableStream, ReadableStreamDefaultController, WritableStream} from "stream/web";
import RPCClient from "@matrixai/rpc/dist/RPCClient";
In this example, the client sends a number and the server responds with the squares of all numbers up to that number.
const logger = new Logger('Server Test', LogLevel.WARN, [new StreamHandler()]);
let streamFactory: StreamFactory;
let middlewareFactory: MiddlewareFactory<any, any, any, any>;
let idGen: IdGen;
class SquaredNumbers extends ServerHandler<ContainerType, number, number>{
public handle = async function* (
input: number,
):AsyncGenerator<number>{
for (let i = 0; i<= input; i++){
yield i*i;
}
};
}
type Manifest = {
SquaredNumbers: ServerCaller<number,number>;
}
async function startServer() {
const wss =
new WebSocket.Server({ port: 1221 });
const rpcServer =
await RPCServer.createRPCServer({
manifest: {
SquaredNumbers: new SquaredNumbers({}),
},
logger,
idGen,
});
wss.on('connection', (ws) => {
const { readable, writable } =
wsToStream(ws);
rpcServer.handleStream({ readable, writable, cancel: () => {} });
});
return { rpcServer };
}
async function startClient() {
return new Promise<RPCClient<Manifest>>((resolve, reject) => {
const ws = new WebSocket('ws://localhost:1221');
ws.addEventListener('open', async () => {
const { readable, writable } =
wsToStream(ws);
const rpcClient =
await RPCClient.createRPCClient<Manifest>({
manifest: {
SquaredNumbers: new ServerCaller<number, number>(),
},
streamFactory: async () => ({ readable, writable,
cancel: () => {} }),
middlewareFactory,
logger,
idGen,
});
resolve(rpcClient);
});
ws.addEventListener('error', (err) => {
reject(err);
});
});
}
function wsToStream(ws: WebSocket): { readable: ReadableStream, writable: WritableStream } {
let readableController: ReadableStreamDefaultController<any> | null = null;
const readable = new ReadableStream({
start(controller) {
readableController = controller;
},
cancel() {
ws.close();
},
});
ws.on('message', (chunk: any) => {
readableController?.enqueue(chunk);
});
ws.on('close', () => {
readableController?.close();
});
const writable = new WritableStream({
write(chunk) {
ws.send(chunk);
},
close() {
ws.close();
},
abort() {
ws.close();
},
});
return { readable, writable };
}
async function execute(rpcClient: RPCClient<Manifest>) {
try {
const squaredStream = await rpcClient.methods.SquaredNumbers(235);
const outputs: Array<number> = [];
for await(const num of squaredStream) {
outputs.push(num);
}
console.log(`Squared numbers are: ${outputs.join(', ')}`);
} catch (error) {
console.error("Error in execute:", error);
}
}
async function main() {
try {
const serverObject = await startServer();
const rpcClient = await startClient();
await execute(rpcClient);
await rpcClient.destroy();
await serverObject.rpcServer.destroy();
} catch (err) {
console.log('An Error occurred: ', err)
}
}
main();
```
#### Duplex Stream
A Duplex Stream enables both the client and the server to read
and write messages in their respective streams independently of each other.
Both parties can read and write multiple messages in any order.
It's useful in scenarios that require ongoing communication in both directions, like chat applications.
In this example, the client sends a sequence of numbers and the server responds with the squares of those numbers.
```ts
import ServerHandler from "./ServerHandler";
import {ContainerType} from "./types";
import {AsyncIterable} from "ix/Ix";
import Logger, {LogLevel, StreamHandler} from "@matrixai/logger";
import {ContainerType, IdGen, JSONValue, MiddlewareFactory, StreamFactory} from "@matrixai/rpc/dist/types";
import {defaultMiddleware} from "@matrixai/rpc/dist/middleware";
import {ClientCaller, DuplexCaller} from "@matrixai/rpc/dist/callers";
import {DuplexHandler} from "@matrixai/rpc/dist/handlers";
import {ContextTimed} from "@matrixai/contexts";
import RPCServer from "@matrixai/rpc/dist/RPCServer";
import WebSocket = require('ws');
import {takeUntil} from "ix/Ix.dom.asynciterable.operators";
import RPCClient from "@matrixai/rpc/dist/RPCClient";
import {ReadableStream, ReadableStreamDefaultController, WritableStream} from "stream/web";
class SquaredNums extends ServerHandler<ContainerType, number, number> {
public handle = async function* (
input: number,
const logger = new Logger('Duplex Test', LogLevel.WARN, [new StreamHandler()]);
let streamFactory: StreamFactory;
let middlewareFactory: MiddlewareFactory<any, any, any, any>;
let idGen: IdGen;
class SquaredDuplex extends DuplexHandler<ContainerType, number, Array<number>>{
public handle = async function*(
input: AsyncIterableIterator<number>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): AsyncIterable<number>{
for (let i = 0; i<= input; i++){
yield i*i;
}
): AsyncIterableIterator<Array<number>>{
for await (const num of input){
const squares: Array<number> = []
for(let i =1; i<=num; i++){
squares.push(i*i);
}
yield squares;
}
};
}
type Manifest = {
SquaredDuplex: DuplexCaller<number, Array<number>>;
};
async function startServer() {
const wss = new WebSocket.Server({ port: 8080 });
const rpcServer = await RPCServer.createRPCServer({
manifest: {
SquaredNums: new SquaredNums({}),
SquaredDuplex: new SquaredDuplex({}),
},
logger,
logger: new Logger('rpc-server'),
handlerTimeoutTime: 1000,
idGen,
handlerTimeoutTime: 60000,
});
// Simulating receiving a stream from a client.
// Provided by network layer
const simulatedStream =sendStreamHere;
rpcServer.handleStream(simulatedStream);
return rpcServer;
wss.on('connection', (ws) => {
const { readable, writable } = wsToStream(ws);
rpcServer.handleStream({ readable, writable, cancel: () => {} });
});
return { rpcServer };
}
async function startClient() {
// Simulate client-server pair of streams.
// Simulating network stream
const clientPair = sendStreamHere/* your logic for creating or obtaining a client-side stream */;
const rpcClient = await RPCClient.createRPCClient({
manifest: {
SquaredNums: new ServerCaller
return new Promise<RPCClient<Manifest>>( (resolve, reject) => {
const ws = new WebSocket('ws://localhost:8080');
ws.addEventListener('open', async () => {
const { readable, writable } = wsToStream(ws);
const rpcClient = await RPCClient.createRPCClient({
manifest: {
SquaredDuplex: new DuplexCaller<number, Array<number>>(),
},
streamFactory: async () => ({ readable, writable, cancel: () => {} }),
middlewareFactory,
logger,
idGen,
});
resolve(rpcClient);
});
ws.addEventListener('error', (err) => {
reject(err);
});
});
}
function wsToStream(ws: WebSocket): { readable: ReadableStream, writable: WritableStream } {
let readableController: ReadableStreamDefaultController<any> | null = null;
const readable = new ReadableStream({
start(controller) {
readableController = controller;
},
streamFactory,
middlewareFactory,
logger,
idGen
})
cancel() {
ws.close();
},
});
const squaredStream = await rpcClient.methods.SquaredNums(4);
// Read squared numbers from the server
const outputs: Array<number> = [];
for await(const num of squaredStream) {
outputs.push(num);
}
ws.on('message', (chunk: any) => {
readableController?.enqueue(chunk);
});
console.log('Squared numbers are: $(outputs.join(', ')}');
ws.on('close', () => {
readableController?.close();
});
return
const writable = new WritableStream({
write(chunk) {
ws.send(chunk);
},
close() {
ws.close();
},
abort() {
ws.close();
},
});
return rpcClient;
return { readable, writable };
}
async function main(){
const rpcServer = await startServer();
// Client-side duplex caller
async function executeSquareNumbersDuplex(rpcClient: RPCClient<Manifest>) {
try{const { readable, writable } = await rpcClient.methods.SquaredDuplex();
const writer = writable.getWriter();
await writer.write(2);
await writer.write(3);
await writer.write(4);
// Read squared numbers from the server
for await (const squares of readable) {
console.log(`Squares up to n are: ${squares.join(", ")}`);
}
await writer.close();
}catch (e){
console.log(e)
}
}
// Main function to tie everything together
async function main() {
try {
const serverObject = await startServer();
const rpcClient = await startClient();
await rpcServer.destroy();
await executeSquareNumbersDuplex(rpcClient); // Add this line to run the duplex caller
await serverObject.rpcServer.destroy();
await rpcClient.destroy();
} catch (err) {
console.error("An error occurred:", err);
}
}
main();
```
### Unary Stream
![img.png](images/duplexTest.png)
#### Unary Stream

@@ -404,2 +911,26 @@ In a Unary Stream, the client sends a single request to the server and gets a single response back,

```ts
import {
ContainerType,
JSONValue,
IdGen,
StreamFactory,
MiddlewareFactory, ClientManifest,
} from "@matrixai/rpc/dist/types";
import WebSocket = require('ws');
import {ClientHandler, UnaryHandler} from "@matrixai/rpc/dist/handlers";
import type { ContextTimed } from '@matrixai/contexts';
import RPCServer from '@matrixai/rpc/dist/RPCServer'
import RPCClient from '@matrixai/rpc/dist/RPCClient'
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import { createDestroy } from '@matrixai/async-init';
import {ClientCaller, UnaryCaller} from "@matrixai/rpc/dist/callers";
import {ReadableStream, WritableStream} from "stream/web";
import {ReadableStreamDefaultController} from "stream/web";
const logger = new Logger(`RPC Test`, LogLevel.WARN, [new StreamHandler()]);
let streamFactory: StreamFactory;
let middlewareFactory: MiddlewareFactory<any, any, any, any>;
let idGen: IdGen;
class SquaredNumberUnary extends UnaryHandler<ContainerType, number, number> {

@@ -416,3 +947,5 @@ public handle = async (

// Server-side WebSocket setup
async function startServer() {
const wss = new WebSocket.Server({ port: 8080 });
const rpcServer = await RPCServer.createRPCServer({

@@ -422,38 +955,108 @@ manifest: {

},
logger,
logger: new Logger('rpc-server'),
handlerTimeoutTime: 1000,
idGen,
});
const simulatedStream = sendStreamHere/* your logic for creating or obtaining a server-side stream */;
rpcServer.handleStream(simulatedStream);
return rpcServer;
wss.on('connection', (ws) => {
const { readable, writable } = wsToStream(ws);
rpcServer.handleStream({ readable, writable, cancel: () => {} });
});
return { rpcServer };
}
type Manifest = {
SquaredNumberUnary: UnaryCaller<number, number>;
};
// Client-side WebSocket setup
async function startClient() {
return new Promise<RPCClient<Manifest>>( (resolve, reject) => {
const ws = new WebSocket('ws://localhost:8080');
async function startClient() {
const rpcClient = await RPCClient.createRPCClient({
manifest: {
SquaredNumberUnary: new UnaryCaller<number, number>(),
ws.addEventListener('open', async () => {
const { readable, writable } = wsToStream(ws);
const rpcClient = await RPCClient.createRPCClient({
manifest: {
SquaredNumberUnary: new UnaryCaller<number, number>(),
},
streamFactory: async () => ({ readable, writable, cancel: () => {} }),
middlewareFactory,
logger,
idGen,
});
resolve(rpcClient);
});
ws.addEventListener('error', (err) => {
reject(err);
});
});
}
function wsToStream(ws: WebSocket): { readable: ReadableStream, writable: WritableStream } {
let readableController: ReadableStreamDefaultController<any> | null = null;
const readable = new ReadableStream({
start(controller) {
readableController = controller;
},
streamFactory,
middlewareFactory,
logger,
idGen,
cancel() {
ws.close();
},
});
const squaredNumber = await rpcClient.methods.SquaredNumberUnary(4);
console.log('Squared number is: $(squaredNumber)');
ws.on('message', (chunk: any) => {
readableController?.enqueue(chunk);
});
return rpcClient;
ws.on('close', () => {
readableController?.close();
});
const writable = new WritableStream({
write(chunk) {
ws.send(chunk);
},
close() {
ws.close();
},
abort() {
ws.close();
},
});
return { readable, writable };
}
// Function to execute the Sum RPC call
// Function to execute the Sum RPC call
async function executeSquare(rpcClient: RPCClient<Manifest>) {
try {
// Sending a number (e.g., 4) to be squared
const squaredNumber = await rpcClient.methods.SquaredNumberUnary(4);
async function main(){
const rpcServer = await startServer();
const rpcClient = await startClient();
// Log the squared number
console.log(`Squared number is: ${squaredNumber}`);
} catch (error) {
// Handle any errors
console.error(`An error occurred while executing SquaredNumberUnary: ${error}`);
}
}
await rpcServer.destroy();
await rpcClient.destroy();
// Main function to tie everything together
async function main() {
try {
const serverObject = await startServer();
const rpcClient = await startClient();
await executeSquare(rpcClient);
await rpcClient.destroy();
await serverObject.rpcServer.destroy();
} catch (err) {
console.error("An error occurred:", err);
}
}
main();
```
![img.png](images/unaryTest.png)
## Development

@@ -460,0 +1063,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc