@matrixai/rpc
Advanced tools
Comparing version 0.1.6 to 0.1.8
@@ -28,3 +28,3 @@ import type { Class } from '@matrixai/errors'; | ||
interface RPCError extends Error { | ||
code?: number; | ||
code: number; | ||
} | ||
@@ -31,0 +31,0 @@ declare class ErrorRPC<T> extends AbstractError<T> implements RPCError { |
import type { ClientHandlerImplementation, DuplexHandlerImplementation, JSONRPCRequest, JSONRPCResponse, JSONRPCResponseResult, ServerManifest, RawHandlerImplementation, ServerHandlerImplementation, UnaryHandlerImplementation, RPCStream, MiddlewareFactory } from './types'; | ||
import type { JSONValue } from './types'; | ||
import type { IdGen } from './types'; | ||
import type { ErrorRPC, ErrorRPCRemote } from './errors'; | ||
import Logger from '@matrixai/logger'; | ||
@@ -35,4 +36,2 @@ import { PromiseCancellable } from '@matrixai/async-cancellable'; | ||
* path and `JSONRPCResponse` to `Uint8Array` on the reverse path. | ||
* @param obj.sensitive - If true, sanitises any rpc error messages of any | ||
* sensitive information. | ||
* @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the | ||
@@ -47,12 +46,12 @@ * value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a | ||
*/ | ||
static createRPCServer({ manifest, middlewareFactory, sensitive, handlerTimeoutTime, // 1 minute | ||
logger, idGen, fromError, replacer, }: { | ||
static createRPCServer({ manifest, middlewareFactory, handlerTimeoutTime, // 1 minute | ||
logger, idGen, fromError, filterSensitive, toError, }: { | ||
manifest: ServerManifest; | ||
middlewareFactory?: MiddlewareFactory<JSONRPCRequest, Uint8Array, Uint8Array, JSONRPCResponse>; | ||
sensitive?: boolean; | ||
handlerTimeoutTime?: number; | ||
logger?: Logger; | ||
idGen: IdGen; | ||
fromError?: (error: Error) => JSONValue; | ||
replacer?: (key: string, value: any) => any; | ||
fromError?: (error: ErrorRPC<any>) => JSONValue; | ||
filterSensitive?: (key: string, value: any) => any; | ||
toError?: (errorResponse: any, metadata?: any) => ErrorRPCRemote<any>; | ||
}): Promise<RPCServer>; | ||
@@ -66,17 +65,16 @@ protected onTimeoutCallback?: () => void; | ||
protected activeStreams: Set<PromiseCancellable<void>>; | ||
protected sensitive: boolean; | ||
protected fromError: (error: Error, sensitive?: boolean) => JSONValue; | ||
protected replacer: (key: string, value: any) => any; | ||
protected fromError: (error: ErrorRPC<any>) => JSONValue; | ||
protected filterSensitive: (key: string, value: any) => any; | ||
protected toError: (errorResponse: any, metadata?: any) => ErrorRPCRemote<any>; | ||
protected middlewareFactory: MiddlewareFactory<JSONRPCRequest, Uint8Array, Uint8Array, JSONRPCResponseResult>; | ||
registerOnTimeoutCallback(callback: () => void): void; | ||
constructor({ manifest, middlewareFactory, sensitive, handlerTimeoutTime, // 1 minuet | ||
logger, idGen, fromError, replacer, }: { | ||
constructor({ manifest, middlewareFactory, handlerTimeoutTime, logger, idGen, fromError, filterSensitive, toError, }: { | ||
manifest: ServerManifest; | ||
middlewareFactory: MiddlewareFactory<JSONRPCRequest, Uint8Array, Uint8Array, JSONRPCResponseResult>; | ||
handlerTimeoutTime?: number; | ||
sensitive: boolean; | ||
logger: Logger; | ||
idGen: IdGen; | ||
fromError?: (error: Error) => JSONValue; | ||
replacer?: (key: string, value: any) => any; | ||
fromError?: (error: ErrorRPC<any>) => JSONValue; | ||
filterSensitive?: (key: string, value: any) => any; | ||
toError?: (errorResponse: any, metadata?: any) => ErrorRPCRemote<any>; | ||
}); | ||
@@ -83,0 +81,0 @@ destroy(force?: boolean): Promise<void>; |
@@ -72,4 +72,2 @@ "use strict"; | ||
* path and `JSONRPCResponse` to `Uint8Array` on the reverse path. | ||
* @param obj.sensitive - If true, sanitises any rpc error messages of any | ||
* sensitive information. | ||
* @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the | ||
@@ -84,4 +82,4 @@ * value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a | ||
*/ | ||
static async createRPCServer({ manifest, middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(), sensitive = false, handlerTimeoutTime = Infinity, // 1 minute | ||
logger = new logger_1.default(this.name), idGen = () => Promise.resolve(null), fromError = rpcUtils.fromError, replacer = rpcUtils.replacer, }) { | ||
static async createRPCServer({ manifest, middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(), handlerTimeoutTime = Infinity, // 1 minute | ||
logger = new logger_1.default(this.name), idGen = () => Promise.resolve(null), fromError = rpcUtils.fromError, filterSensitive = rpcUtils.filterSensitive, toError = rpcUtils.toError, }) { | ||
logger.info(`Creating ${this.name}`); | ||
@@ -91,3 +89,2 @@ const rpcServer = new this({ | ||
middlewareFactory, | ||
sensitive, | ||
handlerTimeoutTime, | ||
@@ -97,3 +94,4 @@ logger, | ||
fromError, | ||
replacer, | ||
filterSensitive, | ||
toError, | ||
}); | ||
@@ -110,5 +108,5 @@ logger.info(`Created ${this.name}`); | ||
activeStreams = new Set(); | ||
sensitive; | ||
fromError; | ||
replacer; | ||
filterSensitive; | ||
toError; | ||
middlewareFactory; | ||
@@ -119,4 +117,3 @@ // Function to register a callback for timeout | ||
} | ||
constructor({ manifest, middlewareFactory, sensitive, handlerTimeoutTime = Infinity, // 1 minuet | ||
logger, idGen = () => Promise.resolve(null), fromError = rpcUtils.fromError, replacer = rpcUtils.replacer, }) { | ||
constructor({ manifest, middlewareFactory, handlerTimeoutTime = Infinity, logger, idGen = () => Promise.resolve(null), fromError = rpcUtils.fromError, filterSensitive = rpcUtils.filterSensitive, toError = rpcUtils.toError, }) { | ||
super(); | ||
@@ -152,7 +149,7 @@ for (const [key, manifestItem] of Object.entries(manifest)) { | ||
this.middlewareFactory = middlewareFactory; | ||
this.sensitive = sensitive; | ||
this.handlerTimeoutTime = handlerTimeoutTime; | ||
this.logger = logger; | ||
this.fromError = fromError || rpcUtils.fromError; | ||
this.replacer = replacer || rpcUtils.replacer; | ||
this.filterSensitive = filterSensitive || rpcUtils.filterSensitive; | ||
this.toError = toError || rpcUtils.toError; | ||
} | ||
@@ -270,3 +267,3 @@ async destroy(force = true) { | ||
message: e.description ?? '', | ||
data: JSON.stringify(this.fromError(e), this.replacer), | ||
data: JSON.stringify(this.fromError(e), this.filterSensitive), | ||
type: e.type, | ||
@@ -457,3 +454,3 @@ }; | ||
message: e.description ?? '', | ||
data: JSON.stringify(this.fromError(e), this.replacer), | ||
data: JSON.stringify(this.fromError(e), this.filterSensitive), | ||
type: e.type, | ||
@@ -460,0 +457,0 @@ }; |
@@ -28,3 +28,3 @@ /// <reference types="node" /> | ||
*/ | ||
declare const replacer: (keyToRemove: any) => (key: any, value: any) => any; | ||
declare const filterSensitive: (keyToRemove: any) => (key: any, value: any) => any; | ||
/** | ||
@@ -68,2 +68,2 @@ * Deserializes an error response object into an ErrorRPCRemote instance. | ||
declare function parseHeadStream<T extends JSONRPCMessage>(messageParser: (message: unknown) => T, bufferByteLimit?: number): TransformStream<Uint8Array, T | Uint8Array>; | ||
export { parseJSONRPCRequest, parseJSONRPCRequestMessage, parseJSONRPCRequestNotification, parseJSONRPCResponseResult, parseJSONRPCResponseError, parseJSONRPCResponse, parseJSONRPCMessage, replacer, fromError, toError, clientInputTransformStream, clientOutputTransformStream, getHandlerTypes, parseHeadStream, promise, isObject, sleep, }; | ||
export { parseJSONRPCRequest, parseJSONRPCRequestMessage, parseJSONRPCRequestNotification, parseJSONRPCResponseResult, parseJSONRPCResponseError, parseJSONRPCResponse, parseJSONRPCMessage, filterSensitive, fromError, toError, clientInputTransformStream, clientOutputTransformStream, getHandlerTypes, parseHeadStream, promise, isObject, sleep, }; |
@@ -26,3 +26,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.sleep = exports.isObject = exports.promise = exports.parseHeadStream = exports.getHandlerTypes = exports.clientOutputTransformStream = exports.clientInputTransformStream = exports.toError = exports.fromError = exports.replacer = exports.parseJSONRPCMessage = exports.parseJSONRPCResponse = exports.parseJSONRPCResponseError = exports.parseJSONRPCResponseResult = exports.parseJSONRPCRequestNotification = exports.parseJSONRPCRequestMessage = exports.parseJSONRPCRequest = void 0; | ||
exports.sleep = exports.isObject = exports.promise = exports.parseHeadStream = exports.getHandlerTypes = exports.clientOutputTransformStream = exports.clientInputTransformStream = exports.toError = exports.fromError = exports.filterSensitive = exports.parseJSONRPCMessage = exports.parseJSONRPCResponse = exports.parseJSONRPCResponseError = exports.parseJSONRPCResponseResult = exports.parseJSONRPCRequestNotification = exports.parseJSONRPCRequestMessage = exports.parseJSONRPCRequest = void 0; | ||
const web_1 = require("stream/web"); | ||
@@ -32,3 +32,2 @@ const json_1 = require("@streamparser/json"); | ||
const rpcErrors = __importStar(require("./errors")); | ||
const errors = __importStar(require("./errors")); | ||
const errors_2 = require("./errors"); | ||
@@ -280,68 +279,5 @@ const errors_3 = require("./errors"); | ||
*/ | ||
const replacer = createReplacer(); | ||
exports.replacer = replacer; | ||
const filterSensitive = createReplacer(); | ||
exports.filterSensitive = filterSensitive; | ||
/** | ||
* Reviver function for deserializing errors sent over RPC. | ||
* @param {string} key - The key in the JSON object. | ||
* @param {any} value - The value corresponding to the key in the JSON object. | ||
* @returns {any} The reconstructed error object or the original value. | ||
*/ | ||
function reviver(key, value) { | ||
// If the value is an error then reconstruct it | ||
if (typeof value === 'object' && | ||
typeof value.type === 'string' && | ||
typeof value.data === 'object') { | ||
try { | ||
let eClass = errors[value.type]; | ||
if (eClass != null) | ||
return eClass.fromJSON(value); | ||
eClass = standardErrors[value.type]; | ||
if (eClass != null) { | ||
let e; | ||
switch (eClass) { | ||
case errors_1.AbstractError: | ||
return eClass.fromJSON(); | ||
case AggregateError: | ||
if (!Array.isArray(value.data.errors) || | ||
typeof value.data.message !== 'string' || | ||
('stack' in value.data && typeof value.data.stack !== 'string')) { | ||
throw new TypeError(`cannot decode JSON to ${value.type}`); | ||
} | ||
e = new eClass(value.data.errors, value.data.message); | ||
e.stack = value.data.stack; | ||
break; | ||
default: | ||
if (typeof value.data.message !== 'string' || | ||
('stack' in value.data && typeof value.data.stack !== 'string')) { | ||
throw new TypeError(`Cannot decode JSON to ${value.type}`); | ||
} | ||
e = new eClass(value.data.message); | ||
e.stack = value.data.stack; | ||
break; | ||
} | ||
return e; | ||
} | ||
} | ||
catch (e) { | ||
// If `TypeError` which represents decoding failure | ||
// then return value as-is | ||
// Any other exception is a bug | ||
if (!(e instanceof TypeError)) { | ||
throw e; | ||
} | ||
} | ||
// Other values are returned as-is | ||
return value; | ||
} | ||
else if (key === '') { | ||
// Root key will be '' | ||
// Reaching here means the root JSON value is not a valid exception | ||
// Therefore ErrorPolykeyUnknown is only ever returned at the top-level | ||
return new rpcErrors.ErrorRPC('Unknown error JSON'); | ||
} | ||
else { | ||
return value; | ||
} | ||
} | ||
/** | ||
* Deserializes an error response object into an ErrorRPCRemote instance. | ||
@@ -348,0 +284,0 @@ * @param {any} errorResponse - The error response object. |
{ | ||
"name": "@matrixai/rpc", | ||
"version": "0.1.6", | ||
"version": "0.1.8", | ||
"author": "Matrix AI", | ||
@@ -5,0 +5,0 @@ "contributors": [ |
441
README.md
@@ -11,3 +11,444 @@ # js-rpc | ||
``` | ||
## Usage Examples | ||
Because decorators are experimental, you must enable: `"experimentalDecorators": true` in your `tsconfig.json` to use this library. | ||
### Client Stream | ||
In a Client Stream, the client can write multiple messages to a single stream, | ||
while the server reads from that stream and then returns a single response. | ||
This pattern is useful when the client needs to send a sequence of data to the server, | ||
after which the server processes the data and replies with a single result. | ||
This pattern is good for scenarios like file uploads. | ||
This example shows how to create an RPC pair and handle streaming integers and summing them up. | ||
```ts | ||
import {RPCServer, ClientHandler, ContainerType, RPCClient, ClientCaller} from "./index"; | ||
import {AsyncIterable} from "ix/Ix"; | ||
const webSocketServer = awwait | ||
class Sum extends ClientHandler<ContainerType, number, number> { | ||
public handle = async ( | ||
// handle takes input an AsyncIterable, which is a generator, | ||
// would produce numbers input in the Writeable stream | ||
input: AsyncIterable<number>, | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed, | ||
): Promise<number> => { | ||
let sum = 0; | ||
for await (const num of input) { | ||
sum += num; | ||
} | ||
return acc; | ||
}; | ||
}; | ||
// Setting up an instance of RPC Client with Sum as the handler method | ||
async function startServer() { | ||
const rpcServer = await RPCServer.createRPCServer({ | ||
manifest: { | ||
Sum: new Sum({}), | ||
}, | ||
logger, | ||
idGen, | ||
handlerTimeoutTime: 60000, | ||
}); | ||
// Simulating receiving a stream from a client. | ||
// Provided by network layer | ||
const simulatedStream = sendStreamHere; | ||
rpcServer.handleStream(simulatedStream); | ||
return rpcServer; | ||
} | ||
async function startClient() { | ||
// Simulate client-server pair of streams. | ||
// Simulating network stream | ||
const clientPair =sendStreamHere /* your logic for creating or obtaining a client-side stream */; | ||
const rpcClient = await RPCClient.createRPCClient({ | ||
manifest: { | ||
Sum: new ClientCaller<number, number>(), | ||
}, | ||
streamFactory, | ||
middlewareFactory, | ||
logger, | ||
idGen | ||
}) | ||
return rpcClient; | ||
} | ||
// Function to execute the Sum RPC call | ||
async function executeSum(rpcClient: typeof RPCClient){ | ||
const { output, writable } = await rpcClient.methods.Sum(); | ||
const writer = writable.getWriter(); | ||
await writer.write(5); | ||
await writer.write(10); | ||
await writer.close(); | ||
const ans = await output; | ||
console.log('Sum is: $(ans)'); | ||
} | ||
// Main function to tie everything together | ||
async function main(){ | ||
const rpcServer = await startServer(); | ||
const rpcClient = await startClient(); | ||
await executeSum(rpcClient); | ||
await rpcServer.destroy(); | ||
await rpcClient.destroy(); | ||
} | ||
main(); | ||
``` | ||
### Duplex Stream | ||
A Duplex Stream enables both the client and the server to read | ||
and write messages in their respective streams independently of each other. | ||
Both parties can read and write multiple messages in any order. | ||
It's useful in scenarios that require ongoing communication in both directions, like chat applications. | ||
In this example, the client sends a sequence of numbers and the server responds with the squares of those numbers. | ||
```ts | ||
import { RPCServer, DuplexHandler, ContainerType } from "./index"; | ||
class SquareNumbersDuplex extends DuplexHandler<ContainerType, number, Array<number>> { | ||
public handle = async function* ( | ||
input: AsyncIterableIterator<number>, | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed, | ||
): AsyncIterableIterator<Array<number>> { | ||
for await (const num of input) { | ||
const squares: Array<number> = []; | ||
for (let i = 1; i <= num; i++) { | ||
squares.push(i * i); | ||
} | ||
yield squares; | ||
} | ||
}; | ||
} | ||
async function startServer() { | ||
const rpcServer = await RPCServer.createRPCServer({ | ||
manifest: { | ||
SquareNumbersDuplex: new SquareNumbersDuplex({}), | ||
}, | ||
logger, | ||
idGen, | ||
}); | ||
const simulatedStream = sendStreamHere/* your logic for creating or obtaining a server-side stream */; | ||
rpcServer.handleStream(simulatedStream); | ||
return rpcServer; | ||
} | ||
// Run the server | ||
startServer(); | ||
import { RPCClient, DuplexCaller } from "./index"; | ||
async function startClient() { | ||
const rpcClient = await RPCClient.createRPCClient({ | ||
manifest: { | ||
SquareNumbersDuplex: new DuplexCaller<number, Array<number>>(), | ||
}, | ||
streamFactory, | ||
middlewareFactory, | ||
logger, | ||
idGen, | ||
}); | ||
const squareStream = await rpcClient.methods.SquareNumbersDuplex(); | ||
// Write to the server | ||
const writer = squareStream.writable.getWriter(); | ||
writer.write(2); | ||
writer.write(3); | ||
writer.write(4); | ||
// Read squared numbers from the server | ||
for await (const squares of squareStream.readable) { | ||
console.log(`Squares up to n are: ${squares.join(", ")}`); | ||
} | ||
writer.close(); | ||
return rpcClient; | ||
} | ||
// Run the client | ||
startClient(); | ||
async function main(){ | ||
const rpcServer = await startServer(); | ||
const rpcClient = await startClient(); | ||
await rpcServer.destroy(); | ||
await rpcClient.destroy(); | ||
} | ||
// Run the main function to kick off the example | ||
main(); | ||
``` | ||
### Raw Stream | ||
Raw Stream is designed for low-level handling of RPC calls, enabling granular control over data streaming. | ||
Unlike other patterns, Raw Streams allow both the server and client to work directly with raw data, | ||
providing a more flexible yet complex way to handle communications. | ||
This is especially useful when the RPC protocol itself needs customization | ||
or when handling different types of data streams within the same connection. | ||
In this example, the client sends a sequence of numbers and the server responds with the factorial of each number. | ||
```ts | ||
import {RawHandler, JSONRPCRequest, ContextTimed, JSONValue} from '../types'; | ||
import {ContainerType} from "./types"; | ||
import RPCServer from "./RPCServer"; // Assuming these are imported correctly | ||
class FactorialStream extends RawHandler<ContainerType> { | ||
public async handle( | ||
[request, inputStream]: [JSONRPCRequest, ReadableStream<Uint8Array>], | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed, | ||
): Promise<[JSONValue, ReadableStream<Uint8Array>]> { | ||
const {readable, writable} = new TransformStream<Uint8Array, Uint8Array>(); | ||
(async () => { | ||
const reader = inputStream.getReader(); | ||
const writer = writable.getWriter(); | ||
while (true) { | ||
const {done, value} = await reader.read(); | ||
if (done) { | ||
break; | ||
} | ||
const num = parseInt(new TextDecoder().decode(value), 10); | ||
const factorial = factorialOf(num).toString(); | ||
const outputBuffer = new TextEncoder().encode(factorial); | ||
writer.write(outputBuffer); | ||
} | ||
writer.close(); | ||
})(); | ||
return ["Starting factorial computation", readable]; | ||
} | ||
} | ||
function factorialOf(n: number): number { | ||
return (n === 0) ? 1 : n * factorialOf(n - 1); | ||
} | ||
async function startSErver() { | ||
const rpcServer = await RPCServer.createRPCServer({ | ||
manifest: { | ||
FactorialStream: new FactorialStream({}), | ||
}, | ||
logger, | ||
idGen, | ||
}); | ||
const simulatedStream = sendStreamHere/* your logic for creating or obtaining a server-side stream */; | ||
rpcServer.handleStream(simulatedStream); | ||
return rpcServer; | ||
} | ||
async function startClient() { | ||
const rpcClient = await RPCClient.createRPCClient({ | ||
manifest: { | ||
FactorialStream: new RawCaller(), | ||
}, | ||
streamFactory, | ||
middlewareFactory, | ||
logger, | ||
idGen, | ||
}); | ||
const { readable, writable, meta } = await rpcClient.methods.FactorialRaw({}); | ||
//Printing 'Starting factorial computation' | ||
console.log(meta); | ||
const writer = writable.getWriter(); | ||
const numbers = [1, 2, 3, 4, 5]; | ||
for (const num of numbers){ | ||
writer.write(new TextEncoder().encode(num.toString())); | ||
} | ||
writer.close(); | ||
const reader = readable.getReader(); | ||
while (true) { | ||
const {done, value} = await reader.read(); | ||
if (done) { | ||
break; | ||
} | ||
console.log('The factorial is: $(new TextDecoder().decode(value))'); | ||
} | ||
return rpcClient; | ||
} | ||
async function main(){ | ||
const rpcServer = await startServer(); | ||
const rpcClient = await startClient(); | ||
await rpcServer.destroy(); | ||
await rpcClient.destroy(); | ||
} | ||
// Run the main function to kick off the example | ||
main(); | ||
``` | ||
### Server Stream | ||
In Server Stream calls, | ||
the client sends a single request and receives multiple responses in a read-only stream from the server. | ||
The server can keep pushing messages as long as it needs, allowing real-time updates from the server to the client. | ||
This is useful for things like monitoring, | ||
where the server needs to update the client in real-time based on events or data changes. | ||
In this example, the client sends a number and the server responds with the squares of all numbers up to that number. | ||
```ts | ||
import ServerHandler from "./ServerHandler"; | ||
import {ContainerType} from "./types"; | ||
import {AsyncIterable} from "ix/Ix"; | ||
class SquaredNums extends ServerHandler<ContainerType, number, number> { | ||
public handle = async function* ( | ||
input: number, | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed, | ||
): AsyncIterable<number>{ | ||
for (let i = 0; i<= input; i++){ | ||
yield i*i; | ||
} | ||
}; | ||
} | ||
async function startServer() { | ||
const rpcServer = await RPCServer.createRPCServer({ | ||
manifest: { | ||
SquaredNums: new SquaredNums({}), | ||
}, | ||
logger, | ||
idGen, | ||
handlerTimeoutTime: 60000, | ||
}); | ||
// Simulating receiving a stream from a client. | ||
// Provided by network layer | ||
const simulatedStream =sendStreamHere; | ||
rpcServer.handleStream(simulatedStream); | ||
return rpcServer; | ||
} | ||
async function startClient() { | ||
// Simulate client-server pair of streams. | ||
// Simulating network stream | ||
const clientPair = sendStreamHere/* your logic for creating or obtaining a client-side stream */; | ||
const rpcClient = await RPCClient.createRPCClient({ | ||
manifest: { | ||
SquaredNums: new ServerCaller | ||
}, | ||
streamFactory, | ||
middlewareFactory, | ||
logger, | ||
idGen | ||
}) | ||
const squaredStream = await rpcClient.methods.SquaredNums(4); | ||
// Read squared numbers from the server | ||
const outputs: Array<number> = []; | ||
for await(const num of squaredStream) { | ||
outputs.push(num); | ||
} | ||
console.log('Squared numbers are: $(outputs.join(', ')}'); | ||
return | ||
return rpcClient; | ||
} | ||
async function main(){ | ||
const rpcServer = await startServer(); | ||
const rpcClient = await startClient(); | ||
await rpcServer.destroy(); | ||
await rpcClient.destroy(); | ||
} | ||
main(); | ||
``` | ||
### Unary Stream | ||
In a Unary Stream, the client sends a single request to the server and gets a single response back, | ||
just like HTTP/REST calls but over a connection. | ||
It's the simplest form of RPC, suitable for short-lived operations that don't require streaming data. | ||
It's the go-to choice for straightforward "request and response" interactions. | ||
In this example, the client sends a number and the server responds with the square of that number. | ||
```ts | ||
class SquaredNumberUnary extends UnaryHandler<ContainerType, number, number> { | ||
public handle = async ( | ||
input: number, | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed, | ||
): Promise<number> => { | ||
return input * input; | ||
}; | ||
} | ||
async function startServer() { | ||
const rpcServer = await RPCServer.createRPCServer({ | ||
manifest: { | ||
SquaredNumberUnary: new SquaredNumberUnary({}), | ||
}, | ||
logger, | ||
idGen, | ||
}); | ||
const simulatedStream = sendStreamHere/* your logic for creating or obtaining a server-side stream */; | ||
rpcServer.handleStream(simulatedStream); | ||
return rpcServer; | ||
} | ||
async function startClient() { | ||
const rpcClient = await RPCClient.createRPCClient({ | ||
manifest: { | ||
SquaredNumberUnary: new UnaryCaller<number, number>(), | ||
}, | ||
streamFactory, | ||
middlewareFactory, | ||
logger, | ||
idGen, | ||
}); | ||
const squaredNumber = await rpcClient.methods.SquaredNumberUnary(4); | ||
console.log('Squared number is: $(squaredNumber)'); | ||
return rpcClient; | ||
} | ||
async function main(){ | ||
const rpcServer = await startServer(); | ||
const rpcClient = await startClient(); | ||
await rpcServer.destroy(); | ||
await rpcClient.destroy(); | ||
} | ||
main(); | ||
``` | ||
## Development | ||
@@ -14,0 +455,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
241839
513
2986