@matrixai/rpc
Advanced tools
Comparing version 0.2.4 to 0.2.5
@@ -1,3 +0,3 @@ | ||
import type { HandlerType, JSONValue } from '../types'; | ||
declare abstract class Caller<Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue> { | ||
import type { HandlerType, JSONObject, JSONRPCParams, JSONRPCResult } from '../types'; | ||
declare abstract class Caller<Input extends JSONObject = JSONRPCParams, Output extends JSONObject = JSONRPCResult> { | ||
protected _inputType: Input; | ||
@@ -4,0 +4,0 @@ protected _outputType: Output; |
@@ -1,6 +0,6 @@ | ||
import type { JSONValue } from '../types'; | ||
import type { JSONObject, JSONRPCParams, JSONRPCResult } from '../types'; | ||
import Caller from './Caller'; | ||
declare class ClientCaller<Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue> extends Caller<Input, Output> { | ||
declare class ClientCaller<Input extends JSONObject = JSONRPCParams, Output extends JSONObject = JSONRPCResult> extends Caller<Input, Output> { | ||
type: 'CLIENT'; | ||
} | ||
export default ClientCaller; |
@@ -1,6 +0,6 @@ | ||
import type { JSONValue } from '../types'; | ||
import type { JSONObject, JSONRPCParams, JSONRPCResult } from '../types'; | ||
import Caller from './Caller'; | ||
declare class DuplexCaller<Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue> extends Caller<Input, Output> { | ||
declare class DuplexCaller<Input extends JSONObject = JSONRPCParams, Output extends JSONObject = JSONRPCResult> extends Caller<Input, Output> { | ||
type: 'DUPLEX'; | ||
} | ||
export default DuplexCaller; |
@@ -1,6 +0,6 @@ | ||
import type { JSONValue } from '../types'; | ||
import type { JSONObject, JSONRPCParams, JSONRPCResult } from '../types'; | ||
import Caller from './Caller'; | ||
declare class ServerCaller<Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue> extends Caller<Input, Output> { | ||
declare class ServerCaller<Input extends JSONObject = JSONRPCParams, Output extends JSONObject = JSONRPCResult> extends Caller<Input, Output> { | ||
type: 'SERVER'; | ||
} | ||
export default ServerCaller; |
@@ -1,6 +0,6 @@ | ||
import type { JSONValue } from '../types'; | ||
import type { JSONObject, JSONRPCParams, JSONRPCResult } from '../types'; | ||
import Caller from './Caller'; | ||
declare class UnaryCaller<Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue> extends Caller<Input, Output> { | ||
declare class UnaryCaller<Input extends JSONObject = JSONRPCParams, Output extends JSONObject = JSONRPCResult> extends Caller<Input, Output> { | ||
type: 'UNARY'; | ||
} | ||
export default UnaryCaller; |
@@ -22,2 +22,8 @@ import type { Class } from '@matrixai/errors'; | ||
} | ||
declare class ErrorRPCInvalidTimeout<T> extends ErrorRPC<T> { | ||
static description: string; | ||
} | ||
declare class ErrorRPCInvalidHandlerTimeout<T> extends ErrorRPC<T> { | ||
static description: string; | ||
} | ||
declare abstract class ErrorRPCProtocol<T> extends ErrorRPC<T> { | ||
@@ -87,2 +93,3 @@ static error: string; | ||
code: JSONRPCErrorCode; | ||
toJSON(): JSONRPCError; | ||
} | ||
@@ -158,2 +165,2 @@ declare class ErrorUtilsUndefinedBehaviour<T> extends ErrorRPCProtocol<T> { | ||
}; | ||
export { ErrorRPC, ErrorRPCServer, ErrorRPCServerNotRunning, ErrorRPCProtocol, ErrorRPCStopping, ErrorRPCParse, ErrorRPCInvalidParams, ErrorRPCHandlerFailed, ErrorRPCMessageLength, ErrorRPCMissingResponse, ErrorRPCOutputStreamError, ErrorRPCRemote, ErrorRPCStreamEnded, ErrorRPCTimedOut, ErrorUtilsUndefinedBehaviour, ErrorRPCMethodNotImplemented, ErrorRPCConnectionLocal, ErrorRPCConnectionPeer, ErrorRPCConnectionKeepAliveTimeOut, ErrorRPCConnectionInternal, ErrorMissingHeader, ErrorHandlerAborted, ErrorRPCCallerFailed, ErrorMissingCaller, ErrorRPCUnknown, JSONRPCErrorCode, rpcProtocolErrors, }; | ||
export { ErrorRPC, ErrorRPCServer, ErrorRPCServerNotRunning, ErrorRPCProtocol, ErrorRPCStopping, ErrorRPCParse, ErrorRPCInvalidParams, ErrorRPCHandlerFailed, ErrorRPCMessageLength, ErrorRPCMissingResponse, ErrorRPCOutputStreamError, ErrorRPCRemote, ErrorRPCStreamEnded, ErrorRPCTimedOut, ErrorUtilsUndefinedBehaviour, ErrorRPCMethodNotImplemented, ErrorRPCConnectionLocal, ErrorRPCConnectionPeer, ErrorRPCConnectionKeepAliveTimeOut, ErrorRPCInvalidTimeout, ErrorRPCInvalidHandlerTimeout, ErrorRPCConnectionInternal, ErrorMissingHeader, ErrorHandlerAborted, ErrorRPCCallerFailed, ErrorMissingCaller, ErrorRPCUnknown, JSONRPCErrorCode, rpcProtocolErrors, }; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.rpcProtocolErrors = exports.ErrorRPCUnknown = exports.ErrorMissingCaller = exports.ErrorRPCCallerFailed = exports.ErrorHandlerAborted = exports.ErrorMissingHeader = exports.ErrorRPCConnectionInternal = exports.ErrorRPCConnectionKeepAliveTimeOut = exports.ErrorRPCConnectionPeer = exports.ErrorRPCConnectionLocal = exports.ErrorRPCMethodNotImplemented = exports.ErrorUtilsUndefinedBehaviour = exports.ErrorRPCTimedOut = exports.ErrorRPCStreamEnded = exports.ErrorRPCRemote = exports.ErrorRPCOutputStreamError = exports.ErrorRPCMissingResponse = exports.ErrorRPCMessageLength = exports.ErrorRPCHandlerFailed = exports.ErrorRPCInvalidParams = exports.ErrorRPCParse = exports.ErrorRPCStopping = exports.ErrorRPCProtocol = exports.ErrorRPCServerNotRunning = exports.ErrorRPCServer = exports.ErrorRPC = void 0; | ||
exports.rpcProtocolErrors = exports.ErrorRPCUnknown = exports.ErrorMissingCaller = exports.ErrorRPCCallerFailed = exports.ErrorHandlerAborted = exports.ErrorMissingHeader = exports.ErrorRPCConnectionInternal = exports.ErrorRPCInvalidHandlerTimeout = exports.ErrorRPCInvalidTimeout = exports.ErrorRPCConnectionKeepAliveTimeOut = exports.ErrorRPCConnectionPeer = exports.ErrorRPCConnectionLocal = exports.ErrorRPCMethodNotImplemented = exports.ErrorUtilsUndefinedBehaviour = exports.ErrorRPCTimedOut = exports.ErrorRPCStreamEnded = exports.ErrorRPCRemote = exports.ErrorRPCOutputStreamError = exports.ErrorRPCMissingResponse = exports.ErrorRPCMessageLength = exports.ErrorRPCHandlerFailed = exports.ErrorRPCInvalidParams = exports.ErrorRPCParse = exports.ErrorRPCStopping = exports.ErrorRPCProtocol = exports.ErrorRPCServerNotRunning = exports.ErrorRPCServer = exports.ErrorRPC = void 0; | ||
const errors_1 = require("@matrixai/errors"); | ||
@@ -29,2 +29,10 @@ class ErrorRPC extends errors_1.AbstractError { | ||
exports.ErrorRPCCallerFailed = ErrorRPCCallerFailed; | ||
class ErrorRPCInvalidTimeout extends ErrorRPC { | ||
static description = 'Invalid timeout provided'; | ||
} | ||
exports.ErrorRPCInvalidTimeout = ErrorRPCInvalidTimeout; | ||
class ErrorRPCInvalidHandlerTimeout extends ErrorRPC { | ||
static description = 'Invalid handler timeout provided'; | ||
} | ||
exports.ErrorRPCInvalidHandlerTimeout = ErrorRPCInvalidHandlerTimeout; | ||
class ErrorRPCProtocol extends ErrorRPC { | ||
@@ -131,2 +139,9 @@ static error = 'RPC Protocol Error'; | ||
code = -32008 /* JSONRPCErrorCode.RPCTimedOut */; | ||
toJSON() { | ||
const json = super.toJSON(); | ||
if (typeof json === 'object' && !Array.isArray(json)) { | ||
json.type = this.constructor.name; | ||
} | ||
return json; | ||
} | ||
} | ||
@@ -133,0 +148,0 @@ exports.ErrorRPCTimedOut = ErrorRPCTimedOut; |
@@ -1,7 +0,7 @@ | ||
import type { ContainerType, JSONValue } from '../types'; | ||
import type { ContainerType, JSONValue, JSONRPCParams, JSONRPCResult } from '../types'; | ||
import type { ContextTimed } from '@matrixai/contexts'; | ||
import Handler from './Handler'; | ||
declare abstract class ClientHandler<Container extends ContainerType = ContainerType, Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue> extends Handler<Container, Input, Output> { | ||
declare abstract class ClientHandler<Container extends ContainerType = ContainerType, Input extends JSONRPCParams = JSONRPCParams, Output extends JSONRPCResult = JSONRPCResult> extends Handler<Container, Input, Output> { | ||
handle(input: AsyncIterableIterator<Input>, cancel: (reason?: any) => void, meta: Record<string, JSONValue> | undefined, ctx: ContextTimed): Promise<Output>; | ||
} | ||
export default ClientHandler; |
@@ -1,5 +0,5 @@ | ||
import type { ContainerType, JSONValue } from '../types'; | ||
import type { ContainerType, JSONValue, JSONRPCParams, JSONRPCResult } from '../types'; | ||
import type { ContextTimed } from '@matrixai/contexts'; | ||
import Handler from './Handler'; | ||
declare abstract class DuplexHandler<Container extends ContainerType = ContainerType, Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue> extends Handler<Container, Input, Output> { | ||
declare abstract class DuplexHandler<Container extends ContainerType = ContainerType, Input extends JSONRPCParams = JSONRPCParams, Output extends JSONRPCResult = JSONRPCResult> extends Handler<Container, Input, Output> { | ||
/** | ||
@@ -6,0 +6,0 @@ * Note that if the output has an error, the handler will not see this as an |
@@ -1,3 +0,3 @@ | ||
import type { ContainerType, JSONValue } from '../types'; | ||
declare abstract class Handler<Container extends ContainerType = ContainerType, Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue> { | ||
import type { ContainerType, JSONRPCParams, JSONRPCResult } from '../types'; | ||
declare abstract class Handler<Container extends ContainerType = ContainerType, Input extends JSONRPCParams = JSONRPCParams, Output extends JSONRPCResult = JSONRPCResult> { | ||
protected container: Container; | ||
@@ -4,0 +4,0 @@ protected _inputType: Input; |
/// <reference types="node" /> | ||
import type { ContextTimed } from '@matrixai/contexts'; | ||
import type { ReadableStream } from 'stream/web'; | ||
import type { ContainerType, JSONRPCRequest, JSONValue } from '../types'; | ||
import type { ContainerType, JSONRPCRequest, JSONRPCResult, JSONValue } from '../types'; | ||
import Handler from './Handler'; | ||
declare abstract class RawHandler<Container extends ContainerType = ContainerType> extends Handler<Container> { | ||
handle(input: [JSONRPCRequest, ReadableStream<Uint8Array>], cancel: (reason?: any) => void, meta: Record<string, JSONValue> | undefined, ctx: ContextTimed): Promise<[JSONValue, ReadableStream<Uint8Array>]>; | ||
handle(input: [JSONRPCRequest, ReadableStream<Uint8Array>], cancel: (reason?: any) => void, meta: Record<string, JSONValue> | undefined, ctx: ContextTimed): Promise<[JSONRPCResult, ReadableStream<Uint8Array>]>; | ||
} | ||
export default RawHandler; |
import type { ContextTimed } from '@matrixai/contexts'; | ||
import type { ContainerType, JSONValue } from '../types'; | ||
import type { ContainerType, JSONValue, JSONRPCParams, JSONRPCResult } from '../types'; | ||
import Handler from './Handler'; | ||
declare abstract class ServerHandler<Container extends ContainerType = ContainerType, Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue> extends Handler<Container, Input, Output> { | ||
declare abstract class ServerHandler<Container extends ContainerType = ContainerType, Input extends JSONRPCParams = JSONRPCParams, Output extends JSONRPCResult = JSONRPCResult> extends Handler<Container, Input, Output> { | ||
handle(input: Input, cancel: (reason?: any) => void, meta: Record<string, JSONValue> | undefined, ctx: ContextTimed): AsyncIterableIterator<Output>; | ||
} | ||
export default ServerHandler; |
import type { ContextTimed } from '@matrixai/contexts'; | ||
import type { ContainerType, JSONValue } from '../types'; | ||
import type { ContainerType, JSONValue, JSONRPCParams, JSONRPCResult } from '../types'; | ||
import Handler from './Handler'; | ||
declare abstract class UnaryHandler<Container extends ContainerType = ContainerType, Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue> extends Handler<Container, Input, Output> { | ||
declare abstract class UnaryHandler<Container extends ContainerType = ContainerType, Input extends JSONRPCParams = JSONRPCParams, Output extends JSONRPCResult = JSONRPCResult> extends Handler<Container, Input, Output> { | ||
handle(input: Input, cancel: (reason?: any) => void, meta: Record<string, JSONValue> | undefined, ctx: ContextTimed): Promise<Output>; | ||
} | ||
export default UnaryHandler; |
/// <reference types="node" /> | ||
import type { JSONRPCMessage, JSONRPCRequest, JSONRPCResponse, MiddlewareFactory } from './types'; | ||
import type { JSONRPCMessage, JSONRPCRequest, JSONRPCResponse, MiddlewareFactory, JSONValue } from './types'; | ||
import type { ContextTimed } from '@matrixai/contexts'; | ||
import { TransformStream } from 'stream/web'; | ||
@@ -21,3 +22,18 @@ /** | ||
declare function jsonMessageToBinaryStream(): TransformStream<JSONRPCMessage, Uint8Array>; | ||
declare function timeoutMiddlewareServer(ctx: ContextTimed, _cancel: (reason?: any) => void, _meta: Record<string, JSONValue> | undefined): { | ||
forward: TransformStream<JSONRPCRequest<JSONRPCRequest<import("./types").JSONObject>>, JSONRPCRequest<JSONRPCRequest<import("./types").JSONObject>>>; | ||
reverse: TransformStream<JSONRPCResponse<JSONRPCResponse<import("./types").JSONObject>>, JSONRPCResponse<JSONRPCResponse<import("./types").JSONObject>>>; | ||
}; | ||
/** | ||
* This adds its own timeout to the forward metadata and updates it's timeout | ||
* based on the reverse metadata. | ||
* @param ctx | ||
* @param _cancel | ||
* @param _meta | ||
*/ | ||
declare function timeoutMiddlewareClient(ctx: ContextTimed, _cancel: (reason?: any) => void, _meta: Record<string, JSONValue> | undefined): { | ||
forward: TransformStream<JSONRPCRequest<import("./types").JSONObject>, JSONRPCRequest<import("./types").JSONObject>>; | ||
reverse: TransformStream<JSONRPCResponse<JSONRPCResponse<import("./types").JSONObject>>, JSONRPCResponse<JSONRPCResponse<import("./types").JSONObject>>>; | ||
}; | ||
/** | ||
* This function is a factory for creating a pass-through streamPair. It is used | ||
@@ -27,4 +43,4 @@ * as the default middleware for the middleware wrappers. | ||
declare function defaultMiddleware(): { | ||
forward: TransformStream<JSONRPCRequest<import("./types").JSONValue>, JSONRPCRequest<import("./types").JSONValue>>; | ||
reverse: TransformStream<JSONRPCResponse<import("./types").JSONValue>, JSONRPCResponse<import("./types").JSONValue>>; | ||
forward: TransformStream<JSONRPCRequest<import("./types").JSONObject>, JSONRPCRequest<import("./types").JSONObject>>; | ||
reverse: TransformStream<JSONRPCResponse<import("./types").JSONObject>, JSONRPCResponse<import("./types").JSONObject>>; | ||
}; | ||
@@ -54,2 +70,2 @@ /** | ||
declare const defaultClientMiddlewareWrapper: (middleware?: MiddlewareFactory<JSONRPCRequest, JSONRPCRequest, JSONRPCResponse, JSONRPCResponse>, parserBufferByteLimit?: number) => MiddlewareFactory<Uint8Array, JSONRPCRequest, JSONRPCResponse, Uint8Array>; | ||
export { binaryToJsonMessageStream, jsonMessageToBinaryStream, defaultMiddleware, defaultServerMiddlewareWrapper, defaultClientMiddlewareWrapper, }; | ||
export { binaryToJsonMessageStream, jsonMessageToBinaryStream, timeoutMiddlewareClient, timeoutMiddlewareServer, defaultMiddleware, defaultServerMiddlewareWrapper, defaultClientMiddlewareWrapper, }; |
@@ -26,3 +26,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.defaultClientMiddlewareWrapper = exports.defaultServerMiddlewareWrapper = exports.defaultMiddleware = exports.jsonMessageToBinaryStream = exports.binaryToJsonMessageStream = void 0; | ||
exports.defaultClientMiddlewareWrapper = exports.defaultServerMiddlewareWrapper = exports.defaultMiddleware = exports.timeoutMiddlewareServer = exports.timeoutMiddlewareClient = exports.jsonMessageToBinaryStream = exports.binaryToJsonMessageStream = void 0; | ||
const web_1 = require("stream/web"); | ||
@@ -91,3 +91,67 @@ const json_1 = require("@streamparser/json"); | ||
exports.jsonMessageToBinaryStream = jsonMessageToBinaryStream; | ||
function timeoutMiddlewareServer(ctx, _cancel, _meta) { | ||
const currentTimeout = ctx.timer.delay; | ||
// Flags for tracking if the first message has been processed | ||
let forwardFirst = true; | ||
return { | ||
forward: new web_1.TransformStream({ | ||
transform: (chunk, controller) => { | ||
controller.enqueue(chunk); | ||
if (forwardFirst) { | ||
forwardFirst = false; | ||
let clientTimeout = chunk.params?.metadata?.timeout; | ||
if (clientTimeout === undefined) | ||
return; | ||
if (clientTimeout === null) | ||
clientTimeout = Infinity; | ||
if (clientTimeout < currentTimeout) | ||
ctx.timer.reset(clientTimeout); | ||
} | ||
}, | ||
}), | ||
reverse: new web_1.TransformStream({ | ||
transform: (chunk, controller) => { | ||
// Passthrough chunk, no need for server to send ctx.timeout | ||
controller.enqueue(chunk); | ||
}, | ||
}), | ||
}; | ||
} | ||
exports.timeoutMiddlewareServer = timeoutMiddlewareServer; | ||
/** | ||
* This adds its own timeout to the forward metadata and updates it's timeout | ||
* based on the reverse metadata. | ||
* @param ctx | ||
* @param _cancel | ||
* @param _meta | ||
*/ | ||
function timeoutMiddlewareClient(ctx, _cancel, _meta) { | ||
const currentTimeout = ctx.timer.delay; | ||
// Flags for tracking if the first message has been processed | ||
let forwardFirst = true; | ||
return { | ||
forward: new web_1.TransformStream({ | ||
transform: (chunk, controller) => { | ||
if (forwardFirst) { | ||
forwardFirst = false; | ||
if (chunk == null) | ||
chunk = { jsonrpc: '2.0', params: {}, method: '' }; | ||
if (chunk.params == null) | ||
chunk.params = {}; | ||
if (chunk.params.metadata == null) | ||
chunk.params.metadata = {}; | ||
chunk.params.metadata.timeout = currentTimeout; | ||
} | ||
controller.enqueue(chunk); | ||
}, | ||
}), | ||
reverse: new web_1.TransformStream({ | ||
transform: (chunk, controller) => { | ||
controller.enqueue(chunk); // Passthrough chunk, no need for client to set ctx.timeout | ||
}, | ||
}), | ||
}; | ||
} | ||
exports.timeoutMiddlewareClient = timeoutMiddlewareClient; | ||
/** | ||
* This function is a factory for creating a pass-through streamPair. It is used | ||
@@ -118,5 +182,9 @@ * as the default middleware for the middleware wrappers. | ||
const middleMiddleware = middlewareFactory(ctx, cancel, meta); | ||
const forwardReadable = inputTransformStream.readable.pipeThrough(middleMiddleware.forward); // Usual middleware here | ||
const timeoutMiddleware = timeoutMiddlewareServer(ctx, cancel, meta); | ||
const forwardReadable = inputTransformStream.readable | ||
.pipeThrough(timeoutMiddleware.forward) // Timeout middleware here | ||
.pipeThrough(middleMiddleware.forward); // Usual middleware here | ||
const reverseReadable = outputTransformStream.readable | ||
.pipeThrough(middleMiddleware.reverse) // Usual middleware here | ||
.pipeThrough(timeoutMiddleware.reverse) // Timeout middleware here | ||
.pipeThrough(jsonMessageToBinaryStream()); | ||
@@ -151,7 +219,11 @@ return { | ||
const inputTransformStream = new web_1.TransformStream(); | ||
const timeoutMiddleware = timeoutMiddlewareClient(ctx, cancel, meta); | ||
const middleMiddleware = middleware(ctx, cancel, meta); | ||
const forwardReadable = inputTransformStream.readable | ||
.pipeThrough(timeoutMiddleware.forward) | ||
.pipeThrough(middleMiddleware.forward) // Usual middleware here | ||
.pipeThrough(jsonMessageToBinaryStream()); | ||
const reverseReadable = outputTransformStream.readable.pipeThrough(middleMiddleware.reverse); // Usual middleware here | ||
const reverseReadable = outputTransformStream.readable | ||
.pipeThrough(middleMiddleware.reverse) | ||
.pipeThrough(timeoutMiddleware.reverse); // Usual middleware here | ||
return { | ||
@@ -158,0 +230,0 @@ forward: { |
/// <reference types="node" /> | ||
import type { ReadableStream, WritableStream } from 'stream/web'; | ||
import type { ContextTimedInput } from '@matrixai/contexts'; | ||
import type { ClientManifest, HandlerType, IdGen, JSONRPCRequest, JSONRPCResponse, JSONValue, MapCallers, MiddlewareFactory, RPCStream, StreamFactory, ToError } from './types'; | ||
import type { ClientManifest, HandlerType, IdGen, JSONObject, JSONRPCRequest, JSONRPCResponse, JSONValue, MapCallers, MiddlewareFactory, RPCStream, StreamFactory, ToError } from './types'; | ||
import Logger from '@matrixai/logger'; | ||
@@ -15,3 +15,3 @@ declare class RPCClient<M extends ClientManifest> { | ||
registerOnTimeoutCallback(callback: () => void): void; | ||
readonly streamKeepAliveTimeoutTime: number; | ||
readonly timeoutTime: number; | ||
readonly methodsProxy: {}; | ||
@@ -28,3 +28,3 @@ /** | ||
* path and `Uint8Array` to `JSONRPCResponse` on the reverse path. | ||
* @param obj.streamKeepAliveTimeoutTime - Timeout time used if no timeout timer was provided when making a call. | ||
* @param obj.timeoutTime - Timeout time used if no timeout timer was provided when making a call. | ||
* Defaults to 60,000 milliseconds. | ||
@@ -34,7 +34,7 @@ * for a client call. | ||
*/ | ||
constructor({ manifest, streamFactory, middlewareFactory, streamKeepAliveTimeoutTime, logger, toError, idGen, }: { | ||
constructor({ manifest, streamFactory, middlewareFactory, timeoutTime, logger, toError, idGen, }: { | ||
manifest: M; | ||
streamFactory: StreamFactory; | ||
middlewareFactory?: MiddlewareFactory<Uint8Array, JSONRPCRequest, JSONRPCResponse, Uint8Array>; | ||
streamKeepAliveTimeoutTime?: number; | ||
timeoutTime?: number; | ||
logger?: Logger; | ||
@@ -54,3 +54,3 @@ idGen?: IdGen; | ||
*/ | ||
unaryCaller<I extends JSONValue, O extends JSONValue>(method: string, parameters: I, ctx?: Partial<ContextTimedInput>): Promise<O>; | ||
unaryCaller<I extends JSONObject, O extends JSONObject>(method: string, parameters: I, ctx?: Partial<ContextTimedInput>): Promise<O>; | ||
/** | ||
@@ -66,3 +66,3 @@ * Generic caller for server streaming RPC calls. | ||
*/ | ||
serverStreamCaller<I extends JSONValue, O extends JSONValue>(method: string, parameters: I, ctx?: Partial<ContextTimedInput>): Promise<ReadableStream<O>>; | ||
serverStreamCaller<I extends JSONObject, O extends JSONObject>(method: string, parameters: I, ctx?: Partial<ContextTimedInput>): Promise<ReadableStream<O>>; | ||
/** | ||
@@ -78,3 +78,3 @@ * Generic caller for Client streaming RPC calls. | ||
*/ | ||
clientStreamCaller<I extends JSONValue, O extends JSONValue>(method: string, ctx?: Partial<ContextTimedInput>): Promise<{ | ||
clientStreamCaller<I extends JSONObject, O extends JSONObject>(method: string, ctx?: Partial<ContextTimedInput>): Promise<{ | ||
output: Promise<O>; | ||
@@ -94,3 +94,3 @@ writable: WritableStream<I>; | ||
*/ | ||
duplexStreamCaller<I extends JSONValue, O extends JSONValue>(method: string, ctx?: Partial<ContextTimedInput>): Promise<RPCStream<O, I>>; | ||
duplexStreamCaller<I extends JSONObject, O extends JSONObject>(method: string, ctx?: Partial<ContextTimedInput>): Promise<RPCStream<O, I>>; | ||
/** | ||
@@ -111,3 +111,3 @@ * Generic caller for raw RPC calls. | ||
*/ | ||
rawStreamCaller(method: string, headerParams: JSONValue, ctx?: Partial<ContextTimedInput>): Promise<RPCStream<Uint8Array, Uint8Array, Record<string, JSONValue> & { | ||
rawStreamCaller(method: string, headerParams: JSONObject, ctx?: Partial<ContextTimedInput>): Promise<RPCStream<Uint8Array, Uint8Array, Record<string, JSONValue> & { | ||
result: JSONValue; | ||
@@ -114,0 +114,0 @@ command: string; |
@@ -47,3 +47,3 @@ "use strict"; | ||
// Method proxies | ||
streamKeepAliveTimeoutTime; | ||
timeoutTime; | ||
methodsProxy = new Proxy({}, { | ||
@@ -79,3 +79,3 @@ get: (_, method) => { | ||
* path and `Uint8Array` to `JSONRPCResponse` on the reverse path. | ||
* @param obj.streamKeepAliveTimeoutTime - Timeout time used if no timeout timer was provided when making a call. | ||
* @param obj.timeoutTime - Timeout time used if no timeout timer was provided when making a call. | ||
* Defaults to 60,000 milliseconds. | ||
@@ -85,3 +85,6 @@ * for a client call. | ||
*/ | ||
constructor({ manifest, streamFactory, middlewareFactory = middleware.defaultClientMiddlewareWrapper(), streamKeepAliveTimeoutTime = Infinity, logger, toError = utils.toError, idGen = () => null, }) { | ||
constructor({ manifest, streamFactory, middlewareFactory = middleware.defaultClientMiddlewareWrapper(), timeoutTime = Infinity, logger, toError = utils.toError, idGen = () => null, }) { | ||
if (timeoutTime < 0) { | ||
throw new errors.ErrorRPCInvalidTimeout(); | ||
} | ||
this.idGen = idGen; | ||
@@ -91,3 +94,3 @@ this.callerTypes = utils.getHandlerTypes(manifest); | ||
this.middlewareFactory = middlewareFactory; | ||
this.streamKeepAliveTimeoutTime = streamKeepAliveTimeoutTime; | ||
this.timeoutTime = timeoutTime; | ||
this.logger = logger ?? new logger_1.default(this.constructor.name); | ||
@@ -219,3 +222,3 @@ this.toError = toError; | ||
timer = new timer_1.Timer({ | ||
delay: ctx.timer ?? this.streamKeepAliveTimeoutTime, | ||
delay: ctx.timer ?? this.timeoutTime, | ||
}); | ||
@@ -334,3 +337,3 @@ } | ||
timer = new timer_1.Timer({ | ||
delay: ctx.timer ?? this.streamKeepAliveTimeoutTime, | ||
delay: ctx.timer ?? this.timeoutTime, | ||
}); | ||
@@ -337,0 +340,0 @@ } |
@@ -1,2 +0,2 @@ | ||
import type { IdGen, ClientHandlerImplementation, DuplexHandlerImplementation, JSONValue, JSONRPCRequest, JSONRPCResponseResult, ServerManifest, RawHandlerImplementation, ServerHandlerImplementation, UnaryHandlerImplementation, RPCStream, MiddlewareFactory, FromError } from './types'; | ||
import type { IdGen, ClientHandlerImplementation, DuplexHandlerImplementation, JSONRPCRequest, JSONRPCResponseResult, ServerManifest, RawHandlerImplementation, ServerHandlerImplementation, UnaryHandlerImplementation, RPCStream, MiddlewareFactory, FromError, JSONObject } from './types'; | ||
import Logger from '@matrixai/logger'; | ||
@@ -20,3 +20,3 @@ import { PromiseCancellable } from '@matrixai/async-cancellable'; | ||
protected defaultTimeoutMap: Map<string, number | undefined>; | ||
protected handlerTimeoutTime: number; | ||
protected timeoutTime: number; | ||
protected activeStreams: Set<PromiseCancellable<void>>; | ||
@@ -34,14 +34,11 @@ protected fromError: FromError; | ||
* path and `JSONRPCResponse` to `Uint8Array` on the reverse path. | ||
* @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the | ||
* @param obj.timeoutTime - Time before a stream is cleaned up due to no activity. This is the | ||
* value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a | ||
* signal sent to the handler. Stream is forced to end after the timeoutForceCloseTime. Defaults to 60,000 | ||
* milliseconds. | ||
* @param obj.timeoutForceCloseTime - Time before the stream is forced to end after the initial timeout time. | ||
* The stream will be forced to close after this amount of time after the initial timeout. This is a grace period for | ||
* the handler to handle timeout before it is forced to end. Defaults to 2,000 milliseconds. | ||
* @param obj.logger | ||
*/ | ||
constructor({ middlewareFactory, handlerTimeoutTime, logger, idGen, fromError, replacer, }: { | ||
constructor({ middlewareFactory, timeoutTime, logger, idGen, fromError, replacer, }: { | ||
middlewareFactory?: MiddlewareFactory<JSONRPCRequest, Uint8Array, Uint8Array, JSONRPCResponseResult>; | ||
handlerTimeoutTime?: number; | ||
timeoutTime?: number; | ||
logger?: Logger; | ||
@@ -87,6 +84,6 @@ idGen?: IdGen; | ||
*/ | ||
protected registerDuplexStreamHandler<I extends JSONValue, O extends JSONValue>(method: string, handler: DuplexHandlerImplementation<I, O>, timeout: number | undefined): void; | ||
protected registerUnaryHandler<I extends JSONValue, O extends JSONValue>(method: string, handler: UnaryHandlerImplementation<I, O>, timeout: number | undefined): void; | ||
protected registerServerStreamHandler<I extends JSONValue, O extends JSONValue>(method: string, handler: ServerHandlerImplementation<I, O>, timeout: number | undefined): void; | ||
protected registerClientStreamHandler<I extends JSONValue, O extends JSONValue>(method: string, handler: ClientHandlerImplementation<I, O>, timeout: number | undefined): void; | ||
protected registerDuplexStreamHandler<I extends JSONObject, O extends JSONObject>(method: string, handler: DuplexHandlerImplementation<I, O>, timeout: number | undefined): void; | ||
protected registerUnaryHandler<I extends JSONObject, O extends JSONObject>(method: string, handler: UnaryHandlerImplementation<I, O>, timeout: number | undefined): void; | ||
protected registerServerStreamHandler<I extends JSONObject, O extends JSONObject>(method: string, handler: ServerHandlerImplementation<I, O>, timeout: number | undefined): void; | ||
protected registerClientStreamHandler<I extends JSONObject, O extends JSONObject>(method: string, handler: ClientHandlerImplementation<I, O>, timeout: number | undefined): void; | ||
/** | ||
@@ -93,0 +90,0 @@ * ID is associated with the stream, not individual messages. |
@@ -54,3 +54,3 @@ "use strict"; | ||
defaultTimeoutMap = new Map(); | ||
handlerTimeoutTime; | ||
timeoutTime; | ||
activeStreams = new Set(); | ||
@@ -71,15 +71,15 @@ fromError; | ||
* path and `JSONRPCResponse` to `Uint8Array` on the reverse path. | ||
* @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the | ||
* @param obj.timeoutTime - Time before a stream is cleaned up due to no activity. This is the | ||
* value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a | ||
* signal sent to the handler. Stream is forced to end after the timeoutForceCloseTime. Defaults to 60,000 | ||
* milliseconds. | ||
* @param obj.timeoutForceCloseTime - Time before the stream is forced to end after the initial timeout time. | ||
* The stream will be forced to close after this amount of time after the initial timeout. This is a grace period for | ||
* the handler to handle timeout before it is forced to end. Defaults to 2,000 milliseconds. | ||
* @param obj.logger | ||
*/ | ||
constructor({ middlewareFactory = middleware.defaultServerMiddlewareWrapper(), handlerTimeoutTime = Infinity, logger, idGen = () => null, fromError = utils.fromError, replacer, }) { | ||
constructor({ middlewareFactory = middleware.defaultServerMiddlewareWrapper(), timeoutTime = Infinity, logger, idGen = () => null, fromError = utils.fromError, replacer, }) { | ||
if (timeoutTime < 0) { | ||
throw new errors.ErrorRPCInvalidTimeout(); | ||
} | ||
this.idGen = idGen; | ||
this.middlewareFactory = middlewareFactory; | ||
this.handlerTimeoutTime = handlerTimeoutTime; | ||
this.timeoutTime = timeoutTime; | ||
this.fromError = fromError; | ||
@@ -97,33 +97,44 @@ this.replacer = replacer; | ||
this.logger.info(`Start ${this.constructor.name}`); | ||
for (const [key, manifestItem] of Object.entries(manifest)) { | ||
if (manifestItem instanceof handlers_1.RawHandler) { | ||
this.registerRawStreamHandler(key, manifestItem.handle, manifestItem.timeout); | ||
continue; | ||
try { | ||
for (const [key, manifestItem] of Object.entries(manifest)) { | ||
if (manifestItem.timeout != null && manifestItem.timeout < 0) { | ||
throw new errors.ErrorRPCInvalidHandlerTimeout(); | ||
} | ||
if (manifestItem instanceof handlers_1.RawHandler) { | ||
this.registerRawStreamHandler(key, manifestItem.handle, manifestItem.timeout); | ||
continue; | ||
} | ||
if (manifestItem instanceof handlers_2.DuplexHandler) { | ||
this.registerDuplexStreamHandler(key, | ||
// Bind the `this` to the generator handler to make the container available | ||
manifestItem.handle.bind(manifestItem), manifestItem.timeout); | ||
continue; | ||
} | ||
if (manifestItem instanceof handlers_2.ServerHandler) { | ||
this.registerServerStreamHandler(key, | ||
// Bind the `this` to the generator handler to make the container available | ||
manifestItem.handle.bind(manifestItem), manifestItem.timeout); | ||
continue; | ||
} | ||
if (manifestItem instanceof handlers_2.ClientHandler) { | ||
this.registerClientStreamHandler(key, manifestItem.handle, manifestItem.timeout); | ||
continue; | ||
} | ||
if (manifestItem instanceof handlers_2.ClientHandler) { | ||
this.registerClientStreamHandler(key, manifestItem.handle, manifestItem.timeout); | ||
continue; | ||
} | ||
if (manifestItem instanceof handlers_2.UnaryHandler) { | ||
this.registerUnaryHandler(key, manifestItem.handle, manifestItem.timeout); | ||
continue; | ||
} | ||
utils.never(); | ||
} | ||
if (manifestItem instanceof handlers_2.DuplexHandler) { | ||
this.registerDuplexStreamHandler(key, | ||
// Bind the `this` to the generator handler to make the container available | ||
manifestItem.handle.bind(manifestItem), manifestItem.timeout); | ||
continue; | ||
} | ||
if (manifestItem instanceof handlers_2.ServerHandler) { | ||
this.registerServerStreamHandler(key, | ||
// Bind the `this` to the generator handler to make the container available | ||
manifestItem.handle.bind(manifestItem), manifestItem.timeout); | ||
continue; | ||
} | ||
if (manifestItem instanceof handlers_2.ClientHandler) { | ||
this.registerClientStreamHandler(key, manifestItem.handle, manifestItem.timeout); | ||
continue; | ||
} | ||
if (manifestItem instanceof handlers_2.ClientHandler) { | ||
this.registerClientStreamHandler(key, manifestItem.handle, manifestItem.timeout); | ||
continue; | ||
} | ||
if (manifestItem instanceof handlers_2.UnaryHandler) { | ||
this.registerUnaryHandler(key, manifestItem.handle, manifestItem.timeout); | ||
continue; | ||
} | ||
utils.never(); | ||
} | ||
catch (e) { | ||
// No need to clean up streams, as streams can only be handled after RPCServer has been started. | ||
this.handlerMap.clear(); | ||
this.defaultTimeoutMap.clear(); | ||
throw e; | ||
} | ||
this.logger.info(`Started ${this.constructor.name}`); | ||
@@ -209,3 +220,5 @@ } | ||
for await (const data of forwardStream) { | ||
ctx.timer.refresh(); | ||
if (ctx.timer.status !== 'settled') { | ||
ctx.timer.refresh(); | ||
} | ||
yield data.params; | ||
@@ -219,3 +232,5 @@ } | ||
for await (const response of handlerG) { | ||
ctx.timer.refresh(); | ||
if (ctx.timer.status !== 'settled') { | ||
ctx.timer.refresh(); | ||
} | ||
const responseMessage = { | ||
@@ -328,3 +343,3 @@ jsonrpc: '2.0', | ||
const timer = new timer_1.Timer({ | ||
delay: this.handlerTimeoutTime, | ||
delay: this.timeoutTime, | ||
handler: () => { | ||
@@ -419,10 +434,12 @@ abortController.abort(new errors.ErrorRPCTimedOut()); | ||
const timeout = this.defaultTimeoutMap.get(method); | ||
if (timeout != null && timeout < this.handlerTimeoutTime) { | ||
// Reset timeout with new delay if it is less than the default | ||
timer.reset(timeout); | ||
if (timer.status !== 'settled') { | ||
if (timeout != null) { | ||
// Reset timeout with new delay if it is less than the default | ||
timer.reset(timeout); | ||
} | ||
else { | ||
// Otherwise refresh | ||
timer.refresh(); | ||
} | ||
} | ||
else { | ||
// Otherwise refresh | ||
timer.refresh(); | ||
} | ||
this.logger.info(`Handling stream with method (${method})`); | ||
@@ -429,0 +446,0 @@ let handlerResult; |
@@ -18,3 +18,3 @@ /// <reference types="node" /> | ||
*/ | ||
type JSONRPCRequestMessage<T extends JSONValue = JSONValue> = { | ||
type JSONRPCRequestMessage<T extends JSONObject = JSONObject> = { | ||
/** | ||
@@ -34,3 +34,3 @@ * A String specifying the version of the JSON-RPC protocol. MUST be exactly "2.0" | ||
*/ | ||
params?: T; | ||
params?: JSONRPCParams<T>; | ||
/** | ||
@@ -47,3 +47,3 @@ * An identifier established by the Client that MUST contain a String, Number, or NULL value if included. | ||
*/ | ||
type JSONRPCRequestNotification<T extends JSONValue = JSONValue> = { | ||
type JSONRPCRequestNotification<T extends JSONObject = JSONObject> = { | ||
/** | ||
@@ -63,3 +63,3 @@ * A String specifying the version of the JSON-RPC protocol. MUST be exactly "2.0" | ||
*/ | ||
params?: T; | ||
params: JSONRPCParams<T>; | ||
}; | ||
@@ -70,3 +70,3 @@ /** | ||
*/ | ||
type JSONRPCResponseResult<T extends JSONValue = JSONValue> = { | ||
type JSONRPCResponseResult<T extends JSONObject = JSONObject> = { | ||
/** | ||
@@ -81,3 +81,3 @@ * A String specifying the version of the JSON-RPC protocol. MUST be exactly "2.0". | ||
*/ | ||
result: T; | ||
result: JSONRPCResult<T>; | ||
/** | ||
@@ -114,2 +114,16 @@ * This member is REQUIRED. | ||
}; | ||
type JSONRPCParams<T extends JSONObject = JSONObject> = { | ||
metadata?: { | ||
[Key: string]: JSONValue; | ||
} & Partial<{ | ||
timeout: number | null; | ||
}>; | ||
} & Omit<T, 'metadata'>; | ||
type JSONRPCResult<T extends JSONObject = JSONObject> = { | ||
metadata?: { | ||
[Key: string]: JSONValue; | ||
} & Partial<{ | ||
timeout: number | null; | ||
}>; | ||
} & Omit<T, 'metadata'>; | ||
/** | ||
@@ -140,7 +154,7 @@ * This is a JSON RPC error object, it encodes the error data for the JSONRPCResponseError object. | ||
*/ | ||
type JSONRPCRequest<T extends JSONValue = JSONValue> = JSONRPCRequestMessage<T> | JSONRPCRequestNotification<T>; | ||
type JSONRPCRequest<T extends JSONObject = JSONObject> = JSONRPCRequestMessage<T> | JSONRPCRequestNotification<T>; | ||
/** | ||
* This is a JSON RPC response object. It can be a response result or error. | ||
*/ | ||
type JSONRPCResponse<T extends JSONValue = JSONValue> = JSONRPCResponseResult<T> | JSONRPCResponseError; | ||
type JSONRPCResponse<T extends JSONObject = JSONObject> = JSONRPCResponseResult<T> | JSONRPCResponseError; | ||
/** | ||
@@ -150,3 +164,3 @@ * This is a JSON RPC Message object. This is top level and can be any kind of | ||
*/ | ||
type JSONRPCMessage<T extends JSONValue = JSONValue> = JSONRPCRequest<T> | JSONRPCResponse<T>; | ||
type JSONRPCMessage<T extends JSONObject = JSONObject> = JSONRPCRequest<T> | JSONRPCResponse<T>; | ||
type HandlerImplementation<I, O> = (input: I, cancel: (reason?: any) => void, meta: Record<string, JSONValue> | undefined, ctx: ContextTimed) => O; | ||
@@ -156,7 +170,7 @@ type RawHandlerImplementation = HandlerImplementation<[ | ||
ReadableStream<Uint8Array> | ||
], Promise<[JSONValue | undefined, ReadableStream<Uint8Array>]>>; | ||
type DuplexHandlerImplementation<I extends JSONValue = JSONValue, O extends JSONValue = JSONValue> = HandlerImplementation<AsyncIterable<I>, AsyncIterable<O>>; | ||
type ServerHandlerImplementation<I extends JSONValue = JSONValue, O extends JSONValue = JSONValue> = HandlerImplementation<I, AsyncIterable<O>>; | ||
type ClientHandlerImplementation<I extends JSONValue = JSONValue, O extends JSONValue = JSONValue> = HandlerImplementation<AsyncIterable<I>, Promise<O>>; | ||
type UnaryHandlerImplementation<I extends JSONValue = JSONValue, O extends JSONValue = JSONValue> = HandlerImplementation<I, Promise<O>>; | ||
], Promise<[JSONObject | undefined, ReadableStream<Uint8Array>]>>; | ||
type DuplexHandlerImplementation<I extends JSONObject = JSONObject, O extends JSONObject = JSONObject> = HandlerImplementation<AsyncIterable<I>, AsyncIterable<O>>; | ||
type ServerHandlerImplementation<I extends JSONObject = JSONObject, O extends JSONObject = JSONObject> = HandlerImplementation<I, AsyncIterable<O>>; | ||
type ClientHandlerImplementation<I extends JSONObject = JSONObject, O extends JSONObject = JSONObject> = HandlerImplementation<AsyncIterable<I>, Promise<O>>; | ||
type UnaryHandlerImplementation<I extends JSONObject = JSONObject, O extends JSONObject = JSONObject> = HandlerImplementation<I, Promise<O>>; | ||
type ContainerType = Record<string, any>; | ||
@@ -195,9 +209,9 @@ /** | ||
}; | ||
type UnaryCallerImplementation<I extends JSONValue = JSONValue, O extends JSONValue = JSONValue> = (parameters: I, ctx?: Partial<ContextTimedInput>) => Promise<O>; | ||
type ServerCallerImplementation<I extends JSONValue = JSONValue, O extends JSONValue = JSONValue> = (parameters: I, ctx?: Partial<ContextTimedInput>) => Promise<ReadableStream<O>>; | ||
type ClientCallerImplementation<I extends JSONValue = JSONValue, O extends JSONValue = JSONValue> = (ctx?: Partial<ContextTimedInput>) => Promise<{ | ||
type UnaryCallerImplementation<I extends JSONObject = JSONObject, O extends JSONObject = JSONObject> = (parameters: I, ctx?: Partial<ContextTimedInput>) => Promise<O>; | ||
type ServerCallerImplementation<I extends JSONObject = JSONObject, O extends JSONObject = JSONObject> = (parameters: I, ctx?: Partial<ContextTimedInput>) => Promise<ReadableStream<O>>; | ||
type ClientCallerImplementation<I extends JSONObject = JSONObject, O extends JSONObject = JSONObject> = (ctx?: Partial<ContextTimedInput>) => Promise<{ | ||
output: Promise<O>; | ||
writable: WritableStream<I>; | ||
}>; | ||
type DuplexCallerImplementation<I extends JSONValue = JSONValue, O extends JSONValue = JSONValue> = (ctx?: Partial<ContextTimedInput>) => Promise<RPCStream<O, I>>; | ||
type DuplexCallerImplementation<I extends JSONObject = JSONObject, O extends JSONObject = JSONObject> = (ctx?: Partial<ContextTimedInput>) => Promise<RPCStream<O, I>>; | ||
type RawCallerImplementation = (headerParams: JSONValue, ctx?: Partial<ContextTimedInput>) => Promise<RPCStream<Uint8Array, Uint8Array, Record<string, JSONValue> & { | ||
@@ -228,5 +242,6 @@ result: JSONValue; | ||
}; | ||
type JSONValue = { | ||
[key: string]: JSONValue | undefined; | ||
} | Array<JSONValue> | string | number | boolean | null | undefined; | ||
type JSONObject = { | ||
[key: string]: JSONValue; | ||
}; | ||
type JSONValue = JSONObject | Array<JSONValue> | string | number | boolean | null | undefined; | ||
type POJO = { | ||
@@ -247,2 +262,2 @@ [key: string]: any; | ||
type ToError = (errorData: JSONValue, clientMetadata: JSONValue) => any; | ||
export type { IdGen, JSONRPCRequestMessage, JSONRPCRequestNotification, JSONRPCResponseResult, JSONRPCResponseError, JSONRPCError, JSONRPCRequest, JSONRPCResponse, JSONRPCMessage, HandlerImplementation, RawHandlerImplementation, DuplexHandlerImplementation, ServerHandlerImplementation, ClientHandlerImplementation, UnaryHandlerImplementation, ContainerType, RPCStream, StreamFactory, MiddlewareFactory, ServerManifest, ClientManifest, HandlerType, MapCallers, Opaque, JSONValue, POJO, PromiseDeconstructed, HandlerTypes, FromError, ToError, }; | ||
export type { IdGen, JSONRPCRequestMessage, JSONRPCRequestNotification, JSONRPCResponseResult, JSONRPCResponseError, JSONRPCParams, JSONRPCResult, JSONRPCError, JSONRPCRequest, JSONRPCResponse, JSONRPCMessage, HandlerImplementation, RawHandlerImplementation, DuplexHandlerImplementation, ServerHandlerImplementation, ClientHandlerImplementation, UnaryHandlerImplementation, ContainerType, RPCStream, StreamFactory, MiddlewareFactory, ServerManifest, ClientManifest, HandlerType, MapCallers, Opaque, JSONObject, JSONValue, POJO, PromiseDeconstructed, HandlerTypes, FromError, ToError, }; |
/// <reference types="node" /> | ||
import type { Timer } from '@matrixai/timer'; | ||
import type { ClientManifest, HandlerType, JSONRPCMessage, JSONRPCRequest, JSONRPCRequestMessage, JSONRPCRequestNotification, JSONRPCResponse, JSONRPCResponseError, JSONRPCResponseResult, JSONValue, PromiseDeconstructed, ToError } from './types'; | ||
import type { ClientManifest, HandlerType, JSONObject, JSONRPCMessage, JSONRPCRequest, JSONRPCRequestMessage, JSONRPCRequestNotification, JSONRPCResponse, JSONRPCResponseError, JSONRPCResponseResult, JSONValue, PromiseDeconstructed, ToError } from './types'; | ||
import { TransformStream } from 'stream/web'; | ||
@@ -8,9 +8,9 @@ declare function isObject(o: unknown): o is object; | ||
declare function sleep(ms: number): Promise<void>; | ||
declare function parseJSONRPCRequest<T extends JSONValue>(message: unknown): JSONRPCRequest<T>; | ||
declare function parseJSONRPCRequestMessage<T extends JSONValue>(message: unknown): JSONRPCRequestMessage<T>; | ||
declare function parseJSONRPCRequestNotification<T extends JSONValue>(message: unknown): JSONRPCRequestNotification<T>; | ||
declare function parseJSONRPCResponseResult<T extends JSONValue>(message: unknown): JSONRPCResponseResult<T>; | ||
declare function parseJSONRPCRequest<T extends JSONObject>(message: unknown): JSONRPCRequest<T>; | ||
declare function parseJSONRPCRequestMessage<T extends JSONObject>(message: unknown): JSONRPCRequestMessage<T>; | ||
declare function parseJSONRPCRequestNotification<T extends JSONObject>(message: unknown): JSONRPCRequestNotification<T>; | ||
declare function parseJSONRPCResponseResult<T extends JSONObject>(message: unknown): JSONRPCResponseResult<T>; | ||
declare function parseJSONRPCResponseError(message: unknown): JSONRPCResponseError; | ||
declare function parseJSONRPCResponse<T extends JSONValue>(message: unknown): JSONRPCResponse<T>; | ||
declare function parseJSONRPCMessage<T extends JSONValue>(message: unknown): JSONRPCMessage<T>; | ||
declare function parseJSONRPCResponse<T extends JSONObject>(message: unknown): JSONRPCResponse<T>; | ||
declare function parseJSONRPCMessage<T extends JSONObject>(message: unknown): JSONRPCMessage<T>; | ||
/** | ||
@@ -43,3 +43,3 @@ * Serializes an Error instance into a JSONValue object suitable for RPC. | ||
*/ | ||
declare function clientInputTransformStream<I extends JSONValue>(method: string, timer?: Timer): TransformStream<I, JSONRPCRequest>; | ||
declare function clientInputTransformStream<I extends JSONObject>(method: string, timer?: Timer): TransformStream<I, JSONRPCRequest>; | ||
/** | ||
@@ -54,3 +54,3 @@ * This constructs a transformation stream that converts any error messages | ||
*/ | ||
declare function clientOutputTransformStream<O extends JSONValue>(clientMetadata: JSONValue, toError: ToError, timer?: Timer): TransformStream<JSONRPCResponse<O>, O>; | ||
declare function clientOutputTransformStream<O extends JSONObject>(clientMetadata: JSONValue, toError: ToError, timer?: Timer): TransformStream<JSONRPCResponse<O>, O>; | ||
declare function getHandlerTypes(manifest: ClientManifest): Record<string, HandlerType>; | ||
@@ -57,0 +57,0 @@ /** |
@@ -263,2 +263,3 @@ "use strict"; | ||
AbstractError: errors_1.AbstractError, | ||
ErrorRPCTimedOut: errors.ErrorRPCTimedOut, | ||
}; | ||
@@ -318,2 +319,3 @@ /** | ||
case errors_1.AbstractError: | ||
case errors.ErrorRPCTimedOut: | ||
e = eClass.fromJSON(errorData); | ||
@@ -413,2 +415,3 @@ break; | ||
else { | ||
chunk.result; | ||
controller.enqueue(chunk.result); | ||
@@ -415,0 +418,0 @@ } |
{ | ||
"name": "@matrixai/rpc", | ||
"version": "0.2.4", | ||
"version": "0.2.5", | ||
"author": "Matrix AI", | ||
@@ -5,0 +5,0 @@ "contributors": [ |
@@ -271,3 +271,3 @@ # js-rpc | ||
logger: new Logger('rpc-server'), | ||
handlerTimeoutTime: 60000, | ||
timeoutTime: 60000, | ||
idGen, | ||
@@ -425,3 +425,3 @@ }); | ||
const rpcServer = new RPCServer({ | ||
handlerTimeoutTime: 200, | ||
timeoutTime: 200, | ||
logger, | ||
@@ -676,3 +676,3 @@ idGen, | ||
logger: new Logger('rpc-server'), | ||
handlerTimeoutTime: 1000, | ||
timeoutTime: 1000, | ||
idGen, | ||
@@ -841,3 +841,3 @@ }); | ||
logger: new Logger('rpc-server'), | ||
handlerTimeoutTime: 1000, | ||
timeoutTime: 1000, | ||
idGen, | ||
@@ -906,2 +906,70 @@ }); | ||
![img.png](images/unaryTest.png) | ||
## Specifications | ||
### Throwing Timeouts | ||
By default, a timeout will not cause an RPC call to automatically throw, this must be manually done by the handler when it receives the abort signal from `ctx.signal`. An example of this is like so: | ||
```ts | ||
class TestMethod extends UnaryHandler { | ||
public handle = async ( | ||
input: JSONValue, | ||
cancel: (reason?: any) => void, | ||
meta: Record<string, JSONValue> | undefined, | ||
ctx: ContextTimed, | ||
): Promise<JSONValue> => { | ||
const abortProm = utils.promise<never>(); | ||
ctx.signal.addEventListener('abort', () => { | ||
resolveCtxP(ctx); | ||
abortProm.resolveP(ctx.signal.reason); | ||
}); | ||
throw await abortProm.p; | ||
}; | ||
} | ||
``` | ||
### Timeout Priority | ||
A `timeoutTime` can be passed both to the constructors of `RPCServer` and `RPCClient`. This is the default `timeoutTime` for all callers/handlers. | ||
In the case of `RPCServer`, a `timeout` can be specified when extending any `Handler` class. This will override the default `timeoutTime` set on `RPCServer` for that handler only. | ||
```ts | ||
class TestMethodArbitraryTimeout extends UnaryHandler { | ||
public timeout = 100; | ||
public handle = async ( | ||
input: JSONValue, | ||
_cancel, | ||
_meta, | ||
ctx_, | ||
): Promise<JSONValue> => { | ||
return input; | ||
}; | ||
} | ||
``` | ||
In the case of `RPCClient`, a `ctx` with the property `timer` can be supplied with a `Timer` instance or `number` when making making an RPC call. This will override the default `timeoutTime` set on `RPCClient` for that call only. | ||
```ts | ||
await rpcClient.methods.testMethod({}, { timer: 100 }); | ||
await rpcClient.methods.testMethod({}, { timer: new Timer(undefined, 100) }); | ||
``` | ||
It's important to note that any of these timeouts will ultimately be overridden by the shortest timeout of the server and client combined using the timeout middleware below. | ||
### Timeout Middleware | ||
The `timeoutMiddleware` sets an RPCServer's timeout based on the lowest timeout between the Client and the Server. This is so that handlers can eagerly time out and stop processing as soon as it is known that the client has timed out. | ||
This case can be seen in the first diagram, where the server is able to stop the processing of the handler, and close the associated stream of the RPC call based on the shorter timeout sent by the client: | ||
![RPCServer sets timeout based on RPCClient](images/timeoutMiddlewareClientTimeout.svg) | ||
Where the `RPCClient` sends a timeout that is longer than that set on the `RPCServer`, it will be rejected. This is as the timeout of the client should never be expected to exceed that of the server, so that the server's timeout is an absolute limit. | ||
![RPCServer rejects longer timeout sent by RPCClient](images/timeoutMiddlewareServerTimeout.svg) | ||
The `timeoutMiddleware` is enabled by default, and uses the `.metadata.timeout` property on a JSON-RPC request object for the client to send it's timeout. | ||
## Development | ||
@@ -908,0 +976,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
359720
76
3041
1030