@matrixai/rpc
Advanced tools
Comparing version 0.2.5 to 0.2.6
@@ -114,3 +114,3 @@ /// <reference types="node" /> | ||
}>; | ||
} & Omit<T, 'metadata'>; | ||
} & T; | ||
type JSONRPCResult<T extends JSONObject = JSONObject> = { | ||
@@ -122,3 +122,3 @@ metadata?: { | ||
}>; | ||
} & Omit<T, 'metadata'>; | ||
} & T; | ||
/** | ||
@@ -125,0 +125,0 @@ * This is a JSON RPC error object, it encodes the error data for the JSONRPCResponseError object. |
{ | ||
"name": "@matrixai/rpc", | ||
"version": "0.2.5", | ||
"version": "0.2.6", | ||
"author": "Matrix AI", | ||
@@ -5,0 +5,0 @@ "contributors": [ |
1027
README.md
@@ -14,377 +14,296 @@ # js-rpc | ||
### Overview of different streams | ||
### Basic Usage | ||
#### Duplex Stream -> | ||
Because decorators are experimental, you must enable: | ||
`"experimentalDecorators": true` in your `tsconfig.json` to use this library. | ||
##### Client: | ||
The client initiates a duplex streaming RPC call using a method that returns both a readable and a writable stream. The client can read from the readable stream and write to the writable stream. | ||
First, setup an `RPCStream` to use: | ||
```ts | ||
import { JSONValue } from "./types"; | ||
import Caller from './Caller'; | ||
const rpcStream = { | ||
readable: new ReadableStream(), | ||
writable: new WritableStream(), | ||
cancel: () => {}; | ||
}; | ||
``` | ||
// Initialize the duplex call | ||
const { ReadableStream, WritableStream } = client.method(); | ||
#### Server | ||
// Get the reader and writer from the streams | ||
const reader = ReadableStream.getReader(); | ||
const writer = WritableStream.getWriter(); | ||
```ts | ||
import type { JSONRPCParams, JSONRPCResult, JSONValue } from "@matrixai/rpc"; | ||
import { RPCServer, UnaryHandler } from "@matrixai/rpc"; | ||
// Read output from the server | ||
const readResult = await reader.read(); | ||
if (!readResult.done) { | ||
const output: JSONValue = readResult.value; | ||
console.log("Received from server:", output); | ||
// Create a Handler | ||
class SquaredNumberUnary extends UnaryHandler<ContainerType, JSONRPCParams<{ value: number }>, JSONRPCResult<{ value: number }>> { | ||
public handle = async ( | ||
input: JSONRPCParams<{ value: number }>, | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed, | ||
): Promise<JSONRPCResult<{ value: number }>> => { | ||
return input.value**2; | ||
}; | ||
} | ||
// Write data to the server | ||
const inputData: JSONValue = { someKey: "someValue" }; | ||
await writer.write(inputData); | ||
const rpcServer = new RPCServer(); | ||
// Don't forget to close the writer when you're done | ||
await writer.close(); | ||
await rpcServer.start({ | ||
manifest: { | ||
SquaredDuplex: new SquaredDuplex(), | ||
// ... add more handlers here | ||
}, | ||
}); | ||
rpcServer.handleStream(rpcStream); | ||
``` | ||
##### Server : | ||
#### Client | ||
```ts | ||
import type { ContainerType, JSONValue } from '../types'; | ||
import type { ContextTimed } from '@matrixai/contexts'; | ||
import Handler from './Handler'; | ||
import type { HandlerTypes } from "@matrixai/rpc"; | ||
import { RPCClient, UnaryCaller } from "@matrixai/rpc"; | ||
// Define the handler as an async generator function | ||
const handle = async function* ( | ||
input: AsyncIterableIterator<JSONValue>, // This is a generator. | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed | ||
): AsyncIterableIterator<JSONValue> { | ||
// Get the CallerTypes of the handler | ||
type CallerTypes = HandlerTypes<SquaredNumberUnary>; | ||
const squaredNumber = new UnaryCaller< | ||
CallerTypes['input'], | ||
CallerTypes['output'] | ||
>(); | ||
// Loop through the incoming stream of messages | ||
for await (const incomingData of input) { | ||
console.log("Received from client:", incomingData); | ||
const rpcClient = new RPCClient({ | ||
manifest: { | ||
squaredNumber, | ||
// ... add more here | ||
}, | ||
streamFactory: async () => rpcStream, | ||
}); | ||
// Perform some operation on the incoming data | ||
const outputData: JSONValue = { responseKey: "responseValue" }; | ||
// Yield data back to the client | ||
yield outputData; | ||
// We yield since stream can contain multiple messages. | ||
} | ||
}; | ||
await rpcClient.methods.squaredNumber({ value: 2 }); | ||
// returns { value: 4 } | ||
``` | ||
#### Client Streaming | ||
##### Client | ||
The client initiates a client streaming RPC call using a method that returns a writable stream and a promise. The client writes to the writable stream and awaits the output promise to get the response. | ||
Any of the callers or handlers can be added from the below `Call Types` section. | ||
```ts | ||
{ output: Promise<JSONValue>, WriteableStream<JSONValue>} = client.method(); | ||
const writer = WritableStream.getWriter(); | ||
writer.write(); | ||
const Output = await output; | ||
``` | ||
##### Server | ||
On the server side, the handle function is defined as an asynchronous generator function. It takes an AsyncIterableIterator as input, which represents the stream of incoming messages from the client. It yields output back to the client as needed. | ||
```ts | ||
// Initialize the client streaming call | ||
const { output, writable } = client.method(); | ||
const writer = writable.getWriter(); | ||
### Call Types | ||
// Write multiple messages to the server | ||
const inputData1: JSONValue = { key1: "value1" }; | ||
const inputData2: JSONValue = { key2: "value2" }; | ||
await writer.write(inputData1); | ||
await writer.write(inputData2); | ||
#### Unary | ||
// Close the writer when done | ||
await writer.close(); | ||
In Unary calls, the client sends a single request to the server and receives a single response back, much like a regular async function call. | ||
// Wait for the server's response | ||
const serverOutput: JSONValue = await output; | ||
console.log("Received from server:", serverOutput); | ||
``` | ||
##### Handler | ||
###### Server | ||
On the server side, the handle function is an asynchronous function that takes an AsyncIterableIterator as input, representing the stream of incoming messages from the client. It returns a promise that resolves to the output that will be sent back to the client. | ||
```ts | ||
import { JSONValue } from "./types"; | ||
import type { ContextTimed } from '@matrixai/contexts'; | ||
const handle = async ( | ||
input: AsyncIterableIterator<JSONValue>, | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed | ||
): Promise<JSONValue> { | ||
// Aggregate or process the incoming messages from the client | ||
let aggregateData: any = {}; // Replace 'any' with your specific type | ||
for await (const incomingData of input) { | ||
// Perform some aggregation or processing on incomingData | ||
import type { JSONRPCParams, JSONRPCResult, JSONValue } from "@matrixai/rpc"; | ||
import { UnaryHandler } from "@matrixai/rpc"; | ||
class SquaredNumberUnary extends UnaryHandler<ContainerType, JSONRPCParams<{ value: number }>, JSONRPCResult<{ value: number }>> { | ||
public handle = async ( | ||
input: JSONRPCParams<{ value: number }>, | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed, | ||
): Promise<JSONRPCResult<{ value: number }>> => { | ||
return input.value**2; | ||
}; | ||
} | ||
// Generate the response to be sent back to the client | ||
const outputData: JSONValue = { responseKey: "responseValue" }; | ||
return outputData; | ||
}; | ||
``` | ||
#### Server Streaming | ||
##### Caller | ||
##### Client | ||
The client initiates a server streaming RPC call using a method that takes input parameters and returns a readable stream. The client writes a single message and then reads multiple messages from the readable stream. | ||
```ts | ||
// Initialize the server streaming call | ||
const readableStream: ReadableStream<JSONValue> = client.method(); | ||
import type { HandlerTypes } from "@matrixai/rpc"; | ||
import { UnaryCaller } from "@matrixai/rpc"; | ||
type CallerTypes = HandlerTypes<SquaredNumberUnary>; | ||
const squaredNumber = new UnaryCaller< | ||
CallerTypes['input'], | ||
CallerTypes['output'] | ||
>(); | ||
``` | ||
const reader = readableStream.getReader(); | ||
##### Call-Site | ||
// Read multiple messages from the server | ||
while (true) { | ||
const { value, done } = await reader.read(); | ||
if (done) break; | ||
The client initiates a unary RPC call by invoking a method that returns a promise. It passes the required input parameters as arguments to the method. The client then waits for the promise to resolve, receiving the output. | ||
console.log("Received from server:", value); | ||
} | ||
``` ts | ||
await rpcClient.methods.squaredNumber({ value: 3 }); | ||
// returns { value: 9 } | ||
``` | ||
##### Server | ||
On the server side, the handle function is an asynchronous generator function that takes a single input parameter from the client. It yields multiple messages that will be sent back to the client through the readable stream. | ||
```ts | ||
import { JSONValue } from "./types"; | ||
import type { ContextTimed } from '@matrixai/contexts'; | ||
public handle = async function* ( | ||
input: JSONValue, | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed | ||
): AsyncIterableIterator<JSONValue> { | ||
#### Client Streaming | ||
// Process the input and prepare the data to be sent back to the client | ||
const outputData1: JSONValue = { responseKey1: "responseValue1" }; | ||
const outputData2: JSONValue = { responseKey2: "responseValue2" }; | ||
In Client Streaming calls, the client can write multiple messages to a single stream, while the server reads from that stream and then returns a single response. This pattern is useful when the client needs to send a sequence of data to the server, after which the server processes the data and replies with a single result. This pattern is good for scenarios like file uploads. | ||
// Yield multiple messages to be sent to the client | ||
yield outputData1; | ||
}; | ||
``` | ||
##### Handler | ||
#### Unary Streaming | ||
In a unary RPC, the client sends a single request to the server and receives a single response back, much like a regular function call. | ||
##### Client | ||
The client initiates a unary RPC call by invoking a method that returns a promise. It passes the required input parameters as arguments to the method. The client then waits for the promise to resolve, receiving the output. | ||
``` ts | ||
import type { JSONValue } from '../types'; | ||
import Caller from './Caller'; | ||
On the server side, the handle function is an asynchronous function that takes an AsyncIterableIterator as input, representing the stream of incoming messages from the client. It returns a promise that resolves to the output that will be sent back to the client. | ||
// Initialize the unary RPC call with input parameters | ||
const promise: Promise<JSONValue> = client.unaryCaller("methodName", { someParam: "someValue" }); | ||
```ts | ||
import type { JSONRPCParams, JSONRPCResult, JSONValue } from "@matrixai/rpc"; | ||
import { ClientHandler } from "@matrixai/rpc"; | ||
class AccumulateClient extends ClientHandler<ContainerType, JSONRPCParams<{ value: number }>, JSONRPCResult<{ value: number }>> { | ||
public handle = async ( | ||
input: AsyncIterableIterator<JSONRPCParams<{ value: number }>>, | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed, | ||
): Promise<JSONRPCResult<{ value: number }>> => { | ||
let acc = 0; | ||
for await (const number of input) { | ||
acc += number.value; | ||
} | ||
return { value: acc }; | ||
}; | ||
} | ||
``` | ||
// Wait for the response | ||
const output: JSONValue = await promise; | ||
console.log("Received from server:", output); | ||
##### Caller | ||
```ts | ||
import type { HandlerTypes } from "@matrixai/rpc"; | ||
import { ClientCaller } from "@matrixai/rpc"; | ||
type CallerTypes = HandlerTypes<AccumulateClient>; | ||
const accumulate = new ClientCaller< | ||
CallerTypes['input'], | ||
CallerTypes['output'] | ||
>(); | ||
``` | ||
##### Server | ||
```ts | ||
import { JSONValue } from "./types"; | ||
import { ContextTimed } from "./contexts"; // Assuming ContextTimed is imported from a module named 'contexts' | ||
##### Call-Site | ||
public handle = async ( | ||
input: JSONValue, | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed, | ||
): Promise<JSONValue> { | ||
The client initiates a client streaming RPC call using a method that returns a writable stream and a promise. The client writes to the writable stream and awaits the output promise to get the response. | ||
// Process the input and prepare the data to be sent back to the client | ||
const outputData: JSONValue = { responseKey: "responseValue" }; | ||
return outputData; | ||
}; | ||
```ts | ||
const { output, writable } = await rpcClient.methods.accumulate(); | ||
const writer = writabe.getWriter(); | ||
await writer.write({ value: 1 }); | ||
await writer.write({ value: 2 }); | ||
await writer.write({ value: 3 }); | ||
await writer.write({ value: 4 }); | ||
await writer.close(); | ||
await output; | ||
// output resolves to { value: 10 } | ||
``` | ||
### Usage Examples | ||
#### Server Streaming | ||
Because decorators are experimental, you must enable: | ||
`"experimentalDecorators": true` in your `tsconfig.json` to use this library. | ||
In Server Streaming calls, | ||
the client sends a single request and receives multiple responses in a read-only stream from the server. | ||
The server can keep pushing messages as long as it needs, allowing real-time updates from the server to the client. | ||
This is useful for things like monitoring, | ||
where the server needs to update the client in real-time based on events or data changes. | ||
In this example, the client sends a number and the server responds with the squares of all numbers up to that number. | ||
#### Client Stream | ||
In a Client Stream, the client can write multiple messages to a single stream, | ||
while the server reads from that stream and then returns a single response. | ||
This pattern is useful when the client needs to send a sequence of data to the server, | ||
after which the server processes the data and replies with a single result. | ||
This pattern is good for scenarios like file uploads. | ||
##### Handler | ||
This example shows how to create an RPC pair and handle streaming integers and summing them up. | ||
On the server side, the handle function is an asynchronous generator function that takes a single input parameter from the client. It yields multiple messages that will be sent back to the client through the readable stream. | ||
```ts | ||
import { | ||
ContainerType, | ||
JSONValue, | ||
IdGen, | ||
StreamFactory, | ||
MiddlewareFactory, ClientManifest, | ||
} from "@matrixai/rpc/dist/types"; | ||
import WebSocket = require('ws'); | ||
import {ClientHandler} from "@matrixai/rpc/dist/handlers"; | ||
import type { ContextTimed } from '@matrixai/contexts'; | ||
import RPCServer from '@matrixai/rpc/dist/RPCServer' | ||
import RPCClient from '@matrixai/rpc/dist/RPCClient' | ||
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; | ||
import { createDestroy } from '@matrixai/async-init'; | ||
import {ClientCaller} from "@matrixai/rpc/dist/callers"; | ||
import {ReadableStream, WritableStream} from "stream/web"; | ||
import {ReadableStreamDefaultController} from "stream/web"; | ||
import type { JSONRPCParams, JSONRPCResult, JSONValue } from "@matrixai/rpc"; | ||
import { ServerHandler } from "@matrixai/rpc"; | ||
class CountServer extends ServerHandler<ContainerType, JSONRPCParams<{ value: number }>, JSONRPCResult<{ value: number }>> { | ||
public handle = async function* ( | ||
input: JSONRPCParams<{ value: number }>, | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed, | ||
): AsyncIterableIterator<JSONRPCResult<{ value: number }>> { | ||
for (let i = input.number; i < input.number + 5; i++) { | ||
yield { value: i }; | ||
} | ||
}; | ||
} | ||
``` | ||
const logger = new Logger(`RPC Test`, LogLevel.WARN, [new StreamHandler()]); | ||
##### Caller | ||
let streamFactory: StreamFactory; | ||
let middlewareFactory: MiddlewareFactory<any, any, any, any>; | ||
let idGen: IdGen; | ||
```ts | ||
import type { HandlerTypes } from "@matrixai/rpc"; | ||
import { ServerCaller } from "@matrixai/rpc"; | ||
type CallerTypes = HandlerTypes<CountServer>; | ||
const count = new ServerCaller< | ||
CallerTypes['input'], | ||
CallerTypes['output'] | ||
>(); | ||
``` | ||
class Sum extends ClientHandler<ContainerType, number, number> { | ||
public handle = async ( | ||
input: AsyncIterable<number>, | ||
): Promise<number> => { | ||
let sum = 0; | ||
console.log("Entering handle method on server."); | ||
for await (const num of input) { | ||
console.log(`Received number: ${num}`); | ||
sum += num; | ||
} | ||
console.log(`Returning sum: ${sum}`); | ||
return sum; | ||
}; | ||
##### Call-Site | ||
The client initiates a server streaming RPC call using a method that takes input parameters and returns a readable stream. The client writes a single message and then reads multiple messages from the readable stream. | ||
```ts | ||
const callerInterface = await rpcClient.methods.count({ value: 5 }); | ||
const numbers = []; | ||
while (true) { | ||
const { value, done } = await reader.read(); | ||
numbers.push(value.value); | ||
if (done) break; | ||
} | ||
// numbers is [5, 6, 7, 8, 9] | ||
``` | ||
// Server-side WebSocket setup | ||
// Server-side setup | ||
async function startServer() { | ||
const rpcServer = new RPCServer({ | ||
logger: new Logger('rpc-server'), | ||
timeoutTime: 60000, | ||
idGen, | ||
}); | ||
#### Duplex Stream | ||
await rpcServer.start({ | ||
manifest: { | ||
Sum: new Sum({}), | ||
}, | ||
}); | ||
A Duplex Stream enables both the client and the server to read and write messages in their respective streams independently of each other. Both parties can read and write multiple messages in any order. It's useful in scenarios that require ongoing communication in both directions, like chat applications. | ||
// Create synthetic streams here | ||
const { readable, writable } = createSyntheticStreams(); | ||
rpcServer.handleStream({ readable, writable, cancel: () => {} }); | ||
##### Handler | ||
return { rpcServer }; | ||
```ts | ||
import type { JSONRPCParams, JSONRPCResult, JSONValue } from "@matrixai/rpc"; | ||
import { DuplexHandler } from "@matrixai/rpc"; | ||
class EchoDuplex extends DuplexHandler<ContainerType, JSONRPCParams, JSONRPCResult> { | ||
public handle = async function* ( | ||
input: AsyncIterableIterator<JSONRPCParams<{ value: number }>>, // This is a generator. | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed | ||
): AsyncIterableIterator<JSONRPCResult<{ value: number }>> { | ||
for await (const incomingData of input) { | ||
yield incomingData; | ||
} | ||
}; | ||
} | ||
``` | ||
// Create synthetic streams | ||
function createSyntheticStreams() { | ||
const readable = new ReadableStream(); | ||
const writable = new WritableStream(); | ||
##### Caller | ||
return { readable, writable }; | ||
} | ||
```ts | ||
import type { HandlerTypes } from "@matrixai/rpc"; | ||
import { ServerCaller } from "@matrixai/rpc"; | ||
type CallerTypes = HandlerTypes<EchoDuplex>; | ||
const echo = new ServerCaller< | ||
CallerTypes['input'], | ||
CallerTypes['output'] | ||
>(); | ||
``` | ||
type Manifest = { | ||
Sum: ClientCaller<number, number>; | ||
}; | ||
// Client-side WebSocket setup | ||
// Client-side setup | ||
async function startClient() { | ||
const { readable, writable } = createSyntheticStreams(); | ||
const rpcClient = new RPCClient({ | ||
manifest: { | ||
Sum: new ClientCaller<number, number>(), | ||
}, | ||
streamFactory: async () => ({ readable, writable, cancel: () => {} }), | ||
middlewareFactory, | ||
logger, | ||
idGen, | ||
}); | ||
##### Call-Site | ||
return rpcClient; | ||
} | ||
The client initiates a duplex streaming RPC call using a method that returns both a readable and a writable stream. The client can read from the readable stream and write to the writable stream. | ||
// Function to execute the Sum RPC call | ||
async function executeSum(rpcClient: RPCClient<Manifest>) { | ||
try { | ||
const { output, writable } = await rpcClient.methods.Sum(); | ||
const writer = writable.getWriter(); | ||
await writer.write(5); | ||
await writer.write(10); | ||
await writer.close(); | ||
```ts | ||
// Initialize the duplex call | ||
const { readable, writable } = await rpcClient.methods.SquaredDuplex(); | ||
const ans = await output; | ||
console.log(`Received output: ${ans}`); | ||
console.log(`Sum is: ${ans}`); | ||
} catch (error) { | ||
console.error("Error in executeSum:", error); | ||
} | ||
} | ||
// Get the reader and writer from the streams | ||
const reader = readable.getReader(); | ||
const writer = writable.getWriter(); | ||
// Main function to tie everything together | ||
async function main() { | ||
try { | ||
const serverObject = await startServer(); | ||
const rpcClient = await startClient(); | ||
// Write data to the server | ||
const inputData: JSONObject = { someKey: "someValue" }; | ||
await writer.write(inputData); | ||
await executeSum(rpcClient); | ||
await serverObject.rpcServer.destroy(); | ||
} catch (err) { | ||
console.error("An error occurred:", err); | ||
} | ||
} | ||
const readResult = await reader.read(); | ||
main(); | ||
// readResult is { someKey: "someValue" } | ||
``` | ||
![img.png](images/clientTest.png) | ||
#### Raw Stream | ||
Raw Stream is designed for low-level handling of RPC calls, enabling granular control over data streaming. | ||
Unlike other patterns, Raw Streams allow both the server and client to work directly with raw data, | ||
providing a more flexible yet complex way to handle communications. | ||
This is especially useful when the RPC protocol itself needs customization | ||
or when handling different types of data streams within the same connection. | ||
#### Raw Streams | ||
In this example, the client sends a sequence of numbers and the server responds with the factorial of each number. | ||
```ts | ||
import { | ||
ContainerType, | ||
JSONValue, | ||
IdGen, | ||
StreamFactory, | ||
MiddlewareFactory, ClientManifest, JSONRPCRequest, | ||
} from "@matrixai/rpc/dist/types"; | ||
import WebSocket = require('ws'); | ||
import { RawHandler} from "@matrixai/rpc/dist/handlers"; | ||
import type { ContextTimed } from '@matrixai/contexts'; | ||
import RPCServer from '@matrixai/rpc/dist/RPCServer' | ||
import RPCClient from '@matrixai/rpc/dist/RPCClient' | ||
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; | ||
import { createDestroy } from '@matrixai/async-init'; | ||
import {RawCaller} from "@matrixai/rpc/dist/callers"; | ||
import {ReadableStream, WritableStream} from "stream/web"; | ||
import {ReadableStreamDefaultController} from "stream/web"; | ||
Raw Streams are designed for low-level handling of RPC calls, enabling granular control over data streaming. Unlike other patterns, Raw Streams allow both the server and client to work directly with raw data, providing a more flexible yet complex way to handle communications. This is especially useful when the RPC protocol itself needs customization or when handling different types of data streams within the same connection. | ||
##### Handler | ||
const logger = new Logger('RPC Test', LogLevel.WARN, [new StreamHandler()]); | ||
let streamFactory: StreamFactory; | ||
let middlewareFactory: MiddlewareFactory<any, any, any, any>; | ||
let idGen: IdGen; | ||
type Manifest = { | ||
FactorialStream: RawCaller; | ||
}; | ||
class FactorialStream extends RawHandler<ContainerType> { | ||
```ts | ||
import type { JSONRPCRequest, JSONValue } from "@matrixai/rpc"; | ||
import { RawHandler } from "@matrixai/rpc"; | ||
class FactorialRaw extends RawHandler<ContainerType> { | ||
public handle = async ( | ||
@@ -398,2 +317,6 @@ [request, inputStream]: [JSONRPCRequest, ReadableStream<Uint8Array>], | ||
(async () => { | ||
function factorialOf(n: number): number { | ||
return n === 0 ? 1 : n * factorialOf(n - 1); | ||
} | ||
const reader = inputStream.getReader(); | ||
@@ -419,492 +342,58 @@ const writer = writable.getWriter(); | ||
} | ||
``` | ||
function factorialOf(n: number): number { | ||
return n === 0 ? 1 : n * factorialOf(n - 1); | ||
} | ||
##### Caller | ||
async function startServer() { | ||
const rpcServer = new RPCServer({ | ||
timeoutTime: 200, | ||
logger, | ||
idGen, | ||
}); | ||
await rpcServer.start({ | ||
manifest: { | ||
FactorialStream: new FactorialStream({}), | ||
}, | ||
}); | ||
// Create synthetic streams here | ||
const { readable, writable } = createSyntheticStreams(); | ||
rpcServer.handleStream({ readable, writable, cancel: () => {} }); | ||
return { rpcServer }; | ||
} | ||
// Create synthetic streams | ||
function createSyntheticStreams() { | ||
const readable = new ReadableStream(); | ||
const writable = new WritableStream(); | ||
return { readable, writable }; | ||
} | ||
async function startClient() { | ||
const { readable, writable } = createSyntheticStreams(); | ||
const rpcClient = new RPCClient({ | ||
manifest: { | ||
FactorialStream: new RawCaller(), | ||
}, | ||
streamFactory: async () => ({ readable, writable, cancel: () => {} }), | ||
middlewareFactory, | ||
logger, | ||
idGen, | ||
}); | ||
return rpcClient; | ||
} | ||
async function execute(rpcClient: RPCClient<Manifest>){ | ||
try{ | ||
// Initialize the FactorialStream RPC method | ||
const { readable, writable, meta } = await rpcClient.methods.FactorialStream({timer: 200}); | ||
console.log('Meta:', meta); // Output meta information, should be 'Starting factorial computation' | ||
// Create a writer for the writable stream | ||
const writer = writable.getWriter(); | ||
// Send numbers 4, 5, 6, 8 to the server for factorial computation | ||
for (const num of [4, 5, 6, 8]) { | ||
const buffer = new TextEncoder().encode(num.toString()); | ||
await writer.write(buffer); | ||
} | ||
await writer.close(); | ||
// Create a reader for the readable stream | ||
const reader = readable.getReader(); | ||
// Read the computed factorials from the server | ||
while (true) { | ||
const { done, value } = await reader.read(); | ||
if (done) { | ||
console.log('Done reading from stream.'); | ||
process.exit(0); | ||
break; | ||
} | ||
const factorialResult = new TextDecoder().decode(value).trim(); // Added trim() to remove any extra whitespace | ||
console.log(`The factorial is: ${factorialResult}`); | ||
} | ||
}catch (error){ | ||
console.error("Error is :", error); | ||
} | ||
} | ||
async function main() { | ||
try { | ||
const serverObject = await startServer(); | ||
const rpcClient = await startClient(); | ||
await execute(rpcClient); | ||
await serverObject.rpcServer.destroy(); | ||
} catch (err) { | ||
console.error("An error occurred:", err); | ||
} | ||
} | ||
main(); | ||
```ts | ||
import { RawCaller } from "@matrixai/rpc"; | ||
const factorial = new RawCaller(); | ||
``` | ||
![img.png](images/rawTest.png) | ||
#### Server Stream | ||
In Server Stream calls, | ||
the client sends a single request and receives multiple responses in a read-only stream from the server. | ||
The server can keep pushing messages as long as it needs, allowing real-time updates from the server to the client. | ||
This is useful for things like monitoring, | ||
where the server needs to update the client in real-time based on events or data changes. | ||
In this example, the client sends a number and the server responds with the squares of all numbers up to that number. | ||
```ts | ||
import Logger, {LogLevel, StreamHandler} from "@matrixai/logger"; | ||
import {ContainerType, IdGen, JSONValue, MiddlewareFactory, StreamFactory} from "@matrixai/rpc/dist/types"; | ||
import {RawCaller, ServerCaller} from "@matrixai/rpc/dist/callers"; | ||
import {ServerHandler} from "@matrixai/rpc/dist/handlers"; | ||
import {ContextTimed} from "@matrixai/contexts"; | ||
import RPCServer from "@matrixai/rpc/dist/RPCServer"; | ||
import WebSocket = require('ws'); | ||
import {ReadableStream, ReadableStreamDefaultController, WritableStream} from "stream/web"; | ||
import RPCClient from "@matrixai/rpc/dist/RPCClient"; | ||
##### Call-Site | ||
const logger = new Logger('Server Test', LogLevel.WARN, [new StreamHandler()]); | ||
let streamFactory: StreamFactory; | ||
let middlewareFactory: MiddlewareFactory<any, any, any, any>; | ||
let idGen: IdGen; | ||
class SquaredNumbers extends ServerHandler<ContainerType, number, number>{ | ||
public handle = async function* ( | ||
input: number, | ||
):AsyncGenerator<number>{ | ||
for (let i = 0; i<= input; i++){ | ||
yield i*i; | ||
} | ||
}; | ||
} | ||
type Manifest = { | ||
SquaredNumbers: ServerCaller<number,number>; | ||
} | ||
// Create synthetic streams | ||
function createSyntheticStreams() { | ||
const readable = new ReadableStream(); | ||
const writable = new WritableStream(); | ||
return { readable, writable }; | ||
} | ||
async function startServer() { | ||
const rpcServer = new RPCServer({ | ||
logger, | ||
idGen, | ||
}); | ||
await rpcServer.start({ | ||
manifest: { | ||
SquaredNumbers: new SquaredNumbers({}), | ||
}, | ||
}); | ||
const { readable, writable } = createSyntheticStreams(); | ||
rpcServer.handleStream({ readable, writable, cancel: () => {} }); | ||
return { rpcServer }; | ||
} | ||
async function startClient() { | ||
return new Promise<RPCClient<Manifest>>((resolve, reject) => { | ||
const { readable, writable } = createSyntheticStreams(); | ||
const rpcClient = new RPCClient<Manifest>({ | ||
manifest: { | ||
SquaredNumbers: new ServerCaller<number, number>(), | ||
}, | ||
streamFactory: async () => ({ readable, writable, cancel: () => {} }), | ||
middlewareFactory, | ||
logger, | ||
idGen, | ||
}); | ||
resolve(rpcClient); | ||
}); | ||
} | ||
async function execute(rpcClient: RPCClient<Manifest>) { | ||
try { | ||
const squaredStream = await rpcClient.methods.SquaredNumbers(235); | ||
const outputs: Array<number> = []; | ||
for await(const num of squaredStream) { | ||
outputs.push(num); | ||
} | ||
console.log(`Squared numbers are: ${outputs.join(', ')}`); | ||
} catch (error) { | ||
console.error("Error in execute:", error); | ||
} | ||
} | ||
async function main() { | ||
try { | ||
const serverObject = await startServer(); | ||
const rpcClient = await startClient(); | ||
await execute(rpcClient); | ||
await serverObject.rpcServer.destroy(); | ||
} catch (err) { | ||
console.log('An Error occurred: ', err) | ||
} | ||
} | ||
main(); | ||
``` | ||
#### Duplex Stream | ||
A Duplex Stream enables both the client and the server to read | ||
and write messages in their respective streams independently of each other. | ||
Both parties can read and write multiple messages in any order. | ||
It's useful in scenarios that require ongoing communication in both directions, like chat applications. | ||
In this example, the client sends a sequence of numbers and the server responds with the squares of those numbers. | ||
```ts | ||
import Logger, {LogLevel, StreamHandler} from "@matrixai/logger"; | ||
import {ContainerType, IdGen, JSONValue, MiddlewareFactory, StreamFactory} from "@matrixai/rpc/dist/types"; | ||
import {defaultMiddleware} from "@matrixai/rpc/dist/middleware"; | ||
import {ClientCaller, DuplexCaller} from "@matrixai/rpc/dist/callers"; | ||
import {DuplexHandler} from "@matrixai/rpc/dist/handlers"; | ||
import {ContextTimed} from "@matrixai/contexts"; | ||
import RPCServer from "@matrixai/rpc/dist/RPCServer"; | ||
import WebSocket = require('ws'); | ||
import {takeUntil} from "ix/Ix.dom.asynciterable.operators"; | ||
import RPCClient from "@matrixai/rpc/dist/RPCClient"; | ||
import {ReadableStream, ReadableStreamDefaultController, WritableStream} from "stream/web"; | ||
const { readable, writable, meta } = await rpcClient.methods.factorial(); | ||
console.log('Meta:', meta); // Output meta information, should be 'Starting factorial computation' | ||
const logger = new Logger('Duplex Test', LogLevel.WARN, [new StreamHandler()]); | ||
// Create a writer for the writable stream | ||
const writer = writable.getWriter(); | ||
let streamFactory: StreamFactory; | ||
let middlewareFactory: MiddlewareFactory<any, any, any, any>; | ||
let idGen: IdGen; | ||
class SquaredDuplex extends DuplexHandler<ContainerType, number, Array<number>>{ | ||
public handle = async function*( | ||
input: AsyncIterableIterator<number>, | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed, | ||
): AsyncIterableIterator<Array<number>>{ | ||
for await (const num of input){ | ||
const squares: Array<number> = [] | ||
for(let i =1; i<=num; i++){ | ||
squares.push(i*i); | ||
} | ||
yield squares; | ||
} | ||
}; | ||
// Send numbers 4, 5, 6, 8 to the server for factorial computation | ||
for (const num of [4, 5, 6, 8]) { | ||
const buffer = new TextEncoder().encode(num.toString()); | ||
await writer.write(buffer); | ||
} | ||
await writer.close(); | ||
type Manifest = { | ||
SquaredDuplex: DuplexCaller<number, Array<number>>; | ||
}; | ||
// Create a reader for the readable stream | ||
const reader = readable.getReader(); | ||
async function startServer() { | ||
const wss = new WebSocket.Server({ port: 8080 }); | ||
const rpcServer = new RPCServer({ | ||
logger: new Logger('rpc-server'), | ||
timeoutTime: 1000, | ||
idGen, | ||
}); | ||
rpcServer.start({ | ||
manifest: { | ||
SquaredDuplex: new SquaredDuplex({}), | ||
}, | ||
}); | ||
wss.on('connection', (ws) => { | ||
const { readable, writable } = wsToStream(ws); | ||
rpcServer.handleStream({ readable, writable, cancel: () => {} }); | ||
}); | ||
return { rpcServer }; | ||
} | ||
async function startClient() { | ||
return new Promise<RPCClient<Manifest>>( (resolve, reject) => { | ||
const ws = new WebSocket('ws://localhost:8080'); | ||
ws.addEventListener('open', async () => { | ||
const { readable, writable } = wsToStream(ws); | ||
const rpcClient = new RPCClient({ | ||
manifest: { | ||
SquaredDuplex: new DuplexCaller<number, Array<number>>(), | ||
}, | ||
streamFactory: async () => ({ readable, writable, cancel: () => {} }), | ||
middlewareFactory, | ||
logger, | ||
idGen, | ||
}); | ||
resolve(rpcClient); | ||
}); | ||
ws.addEventListener('error', (err) => { | ||
reject(err); | ||
}); | ||
}); | ||
} | ||
function wsToStream(ws: WebSocket): { readable: ReadableStream, writable: WritableStream } { | ||
let readableController: ReadableStreamDefaultController<any> | null = null; | ||
const readable = new ReadableStream({ | ||
start(controller) { | ||
readableController = controller; | ||
}, | ||
cancel() { | ||
ws.close(); | ||
}, | ||
}); | ||
ws.on('message', (chunk: any) => { | ||
readableController?.enqueue(chunk); | ||
}); | ||
ws.on('close', () => { | ||
readableController?.close(); | ||
}); | ||
const writable = new WritableStream({ | ||
write(chunk) { | ||
ws.send(chunk); | ||
}, | ||
close() { | ||
ws.close(); | ||
}, | ||
abort() { | ||
ws.close(); | ||
}, | ||
}); | ||
return { readable, writable }; | ||
} | ||
// Client-side duplex caller | ||
async function executeSquareNumbersDuplex(rpcClient: RPCClient<Manifest>) { | ||
try{const { readable, writable } = await rpcClient.methods.SquaredDuplex(); | ||
const writer = writable.getWriter(); | ||
await writer.write(2); | ||
await writer.write(3); | ||
await writer.write(4); | ||
// Read squared numbers from the server | ||
for await (const squares of readable) { | ||
console.log(`Squares up to n are: ${squares.join(", ")}`); | ||
} | ||
await writer.close(); | ||
}catch (e){ | ||
console.log(e) | ||
// Read the computed factorials from the server | ||
while (true) { | ||
const { done, value } = await reader.read(); | ||
if (done) { | ||
console.log('Done reading from stream.'); | ||
process.exit(0); | ||
break; | ||
} | ||
const factorialResult = new TextDecoder().decode(value).trim(); // Added trim() to remove any extra whitespace | ||
console.log(`The factorial is: ${factorialResult}`); | ||
} | ||
// Main function to tie everything together | ||
async function main() { | ||
try { | ||
const serverObject = await startServer(); | ||
const rpcClient = await startClient(); | ||
await executeSquareNumbersDuplex(rpcClient); // Add this line to run the duplex caller | ||
await serverObject.rpcServer.destroy(); | ||
} catch (err) { | ||
console.error("An error occurred:", err); | ||
} | ||
} | ||
main(); | ||
``` | ||
![img.png](images/duplexTest.png) | ||
#### Unary Stream | ||
In a Unary Stream, the client sends a single request to the server and gets a single response back, | ||
just like HTTP/REST calls but over a connection. | ||
It's the simplest form of RPC, suitable for short-lived operations that don't require streaming data. | ||
It's the go-to choice for straightforward "request and response" interactions. | ||
## Specifications | ||
In this example, the client sends a number and the server responds with the square of that number. | ||
```ts | ||
import { | ||
ContainerType, | ||
JSONValue, | ||
IdGen, | ||
StreamFactory, | ||
MiddlewareFactory, ClientManifest, | ||
} from "@matrixai/rpc/dist/types"; | ||
import WebSocket = require('ws'); | ||
import {ClientHandler, UnaryHandler} from "@matrixai/rpc/dist/handlers"; | ||
import type { ContextTimed } from '@matrixai/contexts'; | ||
import RPCServer from '@matrixai/rpc/dist/RPCServer' | ||
import RPCClient from '@matrixai/rpc/dist/RPCClient' | ||
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; | ||
import { createDestroy } from '@matrixai/async-init'; | ||
import {ClientCaller, UnaryCaller} from "@matrixai/rpc/dist/callers"; | ||
import {ReadableStream, WritableStream} from "stream/web"; | ||
import {ReadableStreamDefaultController} from "stream/web"; | ||
### Timeouts | ||
const logger = new Logger(`RPC Test`, LogLevel.WARN, [new StreamHandler()]); | ||
Whenever the time between the initial message and the following subsequent message of an RPC call exceeds a defined timeout time, the RPC call will have timed out. | ||
let streamFactory: StreamFactory; | ||
let middlewareFactory: MiddlewareFactory<any, any, any, any>; | ||
let idGen: IdGen; | ||
For Unary calls, this is similar to the timeout of a response after sending a request. | ||
class SquaredNumberUnary extends UnaryHandler<ContainerType, number, number> { | ||
public handle = async ( | ||
input: number, | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed, | ||
): Promise<number> => { | ||
return input * input; | ||
}; | ||
} | ||
// Create synthetic streams | ||
function createSyntheticStreams() { | ||
const readable = new ReadableStream(); | ||
const writable = new WritableStream(); | ||
return { readable, writable }; | ||
} | ||
// Server-side setup | ||
async function startServer() { | ||
const rpcServer = new RPCServer({ | ||
logger: new Logger('rpc-server'), | ||
timeoutTime: 1000, | ||
idGen, | ||
}); | ||
If the client were to time out, the stream is forcibly closed and `ErrorRPCTimedOut` is thrown from the call. | ||
await rpcServer.start({ | ||
manifest: { | ||
SquaredDuplex: new SquaredDuplex({}), | ||
}, | ||
}); | ||
If the server were to time out, is is advisory. Meaning that the server may choose to optionally eagerly throw `ErrorRPCTimedOut`, or continue processing as normal. | ||
// Replace WebSocket with synthetic stream | ||
const { readable, writable } = createSyntheticStreams(); | ||
rpcServer.handleStream({ readable, writable, cancel: () => {} }); | ||
#### Throwing Timeouts Server-Side | ||
return { rpcServer }; | ||
} | ||
// Client-side setup | ||
async function startClient() { | ||
return new Promise<RPCClient<Manifest>>((resolve, reject) => { | ||
const { readable, writable } = createSyntheticStreams(); | ||
const rpcClient = new RPCClient({ | ||
manifest: { | ||
SquaredDuplex: new DuplexCaller<number, Array<number>>(), | ||
}, | ||
streamFactory: async () => ({ readable, writable, cancel: () => {} }), | ||
middlewareFactory, | ||
logger, | ||
idGen, | ||
}); | ||
resolve(rpcClient); | ||
}); | ||
} | ||
// Function to execute the Sum RPC call | ||
async function executeSquare(rpcClient: RPCClient<Manifest>) { | ||
try { | ||
// Sending a number (e.g., 4) to be squared | ||
const squaredNumber = await rpcClient.methods.SquaredNumberUnary(4); | ||
// Log the squared number | ||
console.log(`Squared number is: ${squaredNumber}`); | ||
} catch (error) { | ||
// Handle any errors | ||
console.error(`An error occurred while executing SquaredNumberUnary: ${error}`); | ||
} | ||
} | ||
// Main function to tie everything together | ||
async function main() { | ||
try { | ||
const serverObject = await startServer(); | ||
const rpcClient = await startClient(); | ||
await executeSquare(rpcClient); | ||
await serverObject.rpcServer.destroy(); | ||
} catch (err) { | ||
console.error("An error occurred:", err); | ||
} | ||
} | ||
main(); | ||
``` | ||
![img.png](images/unaryTest.png) | ||
## Specifications | ||
### Throwing Timeouts | ||
By default, a timeout will not cause an RPC call to automatically throw, this must be manually done by the handler when it receives the abort signal from `ctx.signal`. An example of this is like so: | ||
@@ -930,3 +419,3 @@ | ||
### Timeout Priority | ||
#### Priority of Timeout Options | ||
@@ -958,5 +447,5 @@ A `timeoutTime` can be passed both to the constructors of `RPCServer` and `RPCClient`. This is the default `timeoutTime` for all callers/handlers. | ||
It's important to note that any of these timeouts will ultimately be overridden by the shortest timeout of the server and client combined using the timeout middleware below. | ||
However, it's important to note that any of these timeouts may ultimately be overridden by the shortest timeout of the server and client combined using the timeout middleware below. | ||
### Timeout Middleware | ||
#### Timeout Middleware | ||
@@ -963,0 +452,0 @@ The `timeoutMiddleware` sets an RPCServer's timeout based on the lowest timeout between the Client and the Server. This is so that handlers can eagerly time out and stop processing as soon as it is known that the client has timed out. |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
344773
519