Socket
Socket
Sign inDemoInstall

grain-rpc

Package Overview
Dependencies
2
Maintainers
2
Versions
7
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.1.5 to 0.1.6

8

dist/lib/message.d.ts

@@ -12,3 +12,4 @@ /**

RpcRespErr = 3,
Custom = 4
Custom = 4,
Ready = 5
}

@@ -39,3 +40,6 @@ export interface IMsgRpcCall {

}
export interface IMsgReady {
mtype: MsgType.Ready;
}
export declare type IMsgRpc = IMsgRpcCall | IMsgRpcRespData | IMsgRpcRespErr;
export declare type IMessage = IMsgRpc | IMsgCustom;
export declare type IMessage = IMsgRpc | IMsgCustom | IMsgReady;

@@ -17,2 +17,3 @@ "use strict";

MsgType[MsgType["Custom"] = 4] = "Custom";
MsgType[MsgType["Ready"] = 5] = "Ready";
})(MsgType = exports.MsgType || (exports.MsgType = {}));

@@ -79,3 +79,3 @@ /// <reference types="node" />

import { IMessage, IMsgCustom, IMsgRpcCall } from "./message";
export declare type SendMessageCB = (msg: IMessage) => PromiseLike<void> | void;
export declare type SendMessageCB = (msg: IMessage) => Promise<void> | void;
export interface IForwarderDest {

@@ -90,2 +90,3 @@ forwardCall: (c: IMsgRpcCall) => Promise<any>;

private _inactiveSendQueue;
private _waitForReadyMessage;
private _logger;

@@ -98,5 +99,5 @@ private _callWrapper;

/**
* To use Rpc, you must call start() with a function that sends a message to the other side. If
* you pass in such a function to the constructor, it's the same as calling start() right away.
* You must also call receiveMessage() for every message received from the other side.
* To use Rpc, you must provide a sendMessage function that sends a message to the other side;
* it may be given in the constructor, or later with setSendMessage. You must also call
* receiveMessage() for every message received from the other side.
*/

@@ -111,15 +112,32 @@ constructor(options?: {

*/
receiveMessage(msg: any): void;
receiveMessage(msg: IMessage): void;
/**
* Until start() is called, received and sent messages are queued. This gives you an opportunity
* to register implementations and add "message" listeners without the risk of missing messages,
* even if receiveMessage() has already started being called.
* If you've set up calls to receiveMessage(), but need time to call registerImpl() before
* processing new messages, you may use queueIncoming(), make the registerImpl() calls,
* and then call processIncoming() to handle queued messages and resume normal processing.
*/
start(sendMessage: SendMessageCB): void;
queueIncoming(): void;
/**
* Calling stop() resume the same state as before start was called: received and sent messages are
* queued.
* Process received messages queued since queueIncoming, and resume normal processing of
* received messages.
*/
stop(): void;
processIncoming(): void;
/**
* Set the callback to send messages. If set to null, sent messages will be queued. If you
* disconnect and want for sent messages to throw, set a callback that throws.
*/
setSendMessage(sendMessage: SendMessageCB | null): void;
/**
* If your peer may not be listening yet to your messages, you may call this to queue outgoing
* messages until receiving a "ready" message from the peer. I.e. one peer may call
* queueOutgoingUntilReadyMessage() while the other calls sendReadyMessage().
*/
queueOutgoingUntilReadyMessage(): void;
/**
* If your peer is using queueOutgoingUntilReadyMessage(), you should let it know that you are
* ready using sendReadyMessage() as soon as you've set up the processing of received messages.
* Note that at most one peer may use queueOutgoingUntilReadyMessage(), or they will deadlock.
*/
sendReadyMessage(): void | Promise<void>;
/**
* Messaging interface: send data to the other side, to be emitted there as a "message" event.

@@ -168,3 +186,7 @@ */

forwardMessage(msg: IMsgCustom): Promise<any>;
private _queueOutgoing;
private _processOutgoing;
private _sendMessage;
private _sendMessageOrReject;
private _sendReject;
private _makeCallRaw;

@@ -171,0 +193,0 @@ private _makeCall;

@@ -91,10 +91,13 @@ "use strict";

/**
* To use Rpc, you must call start() with a function that sends a message to the other side. If
* you pass in such a function to the constructor, it's the same as calling start() right away.
* You must also call receiveMessage() for every message received from the other side.
* To use Rpc, you must provide a sendMessage function that sends a message to the other side;
* it may be given in the constructor, or later with setSendMessage. You must also call
* receiveMessage() for every message received from the other side.
*/
constructor(options = {}) {
super();
this._inactiveRecvQueue = []; // queue of received message
this._inactiveSendQueue = []; // queue of messages to be sent
// Note the invariant: _inactiveSendQueue == null iff (_sendMessageCB != null && !_waitForReadyMessage)
this._sendMessageCB = null;
this._inactiveRecvQueue = null; // queue of received message
this._inactiveSendQueue = null; // queue of messages to be sent
this._waitForReadyMessage = false;
this._implMap = new Map();

@@ -106,4 +109,4 @@ this._forwarders = new Map();

this._logger = logger;
this._sendMessageCB = sendMessage;
this._callWrapper = callWrapper;
this.setSendMessage(sendMessage);
}

@@ -114,3 +117,3 @@ /**

receiveMessage(msg) {
if (!this._sendMessageCB) {
if (this._inactiveRecvQueue) {
this._inactiveRecvQueue.push(msg);

@@ -123,20 +126,52 @@ }

/**
* Until start() is called, received and sent messages are queued. This gives you an opportunity
* to register implementations and add "message" listeners without the risk of missing messages,
* even if receiveMessage() has already started being called.
* If you've set up calls to receiveMessage(), but need time to call registerImpl() before
* processing new messages, you may use queueIncoming(), make the registerImpl() calls,
* and then call processIncoming() to handle queued messages and resume normal processing.
*/
start(sendMessage) {
// Message sent by `_dispatch(...)` are appended to the send queue
processQueue(this._inactiveRecvQueue, this._dispatch.bind(this));
processQueue(this._inactiveSendQueue, sendMessage);
queueIncoming() {
if (!this._inactiveRecvQueue) {
this._inactiveRecvQueue = [];
}
}
/**
* Process received messages queued since queueIncoming, and resume normal processing of
* received messages.
*/
processIncoming() {
if (this._inactiveRecvQueue) {
processQueue(this._inactiveRecvQueue, this._dispatch.bind(this));
this._inactiveRecvQueue = null;
}
}
/**
* Set the callback to send messages. If set to null, sent messages will be queued. If you
* disconnect and want for sent messages to throw, set a callback that throws.
*/
setSendMessage(sendMessage) {
this._sendMessageCB = sendMessage;
if (this._sendMessageCB) {
this._processOutgoing();
}
else {
this._queueOutgoing();
}
}
/**
* Calling stop() resume the same state as before start was called: received and sent messages are
* queued.
* If your peer may not be listening yet to your messages, you may call this to queue outgoing
* messages until receiving a "ready" message from the peer. I.e. one peer may call
* queueOutgoingUntilReadyMessage() while the other calls sendReadyMessage().
*/
stop() {
this._sendMessageCB = null;
queueOutgoingUntilReadyMessage() {
this._waitForReadyMessage = true;
this._queueOutgoing();
}
/**
* If your peer is using queueOutgoingUntilReadyMessage(), you should let it know that you are
* ready using sendReadyMessage() as soon as you've set up the processing of received messages.
* Note that at most one peer may use queueOutgoingUntilReadyMessage(), or they will deadlock.
*/
sendReadyMessage() {
return this._sendMessage({ mtype: message_1.MsgType.Ready });
}
/**
* Messaging interface: send data to the other side, to be emitted there as a "message" event.

@@ -257,10 +292,46 @@ */

}
// Mark outgoing messages for queueing.
_queueOutgoing() {
if (!this._inactiveSendQueue) {
this._inactiveSendQueue = [];
}
}
// If sendMessageCB is set and we are no longer waiting for a ready message, send out any
// queued outgoing messages and resume normal sending.
_processOutgoing() {
if (this._inactiveSendQueue && this._sendMessageCB && !this._waitForReadyMessage) {
processQueue(this._inactiveSendQueue, this._sendMessageOrReject.bind(this, this._sendMessageCB));
this._inactiveSendQueue = null;
}
}
_sendMessage(msg) {
if (!this._sendMessageCB) {
if (this._inactiveSendQueue) {
this._inactiveSendQueue.push(msg);
}
else {
return this._sendMessageCB(msg);
return this._sendMessageOrReject(this._sendMessageCB, msg);
}
}
// This helper calls calls sendMessage(msg), and if that call fails, rejects the call
// represented by msg (when it's of type RpcCall).
_sendMessageOrReject(sendMessage, msg) {
if (this._logger.info) {
const desc = (msg.mtype === message_1.MsgType.RpcCall) ? ": " + this._callDesc(msg) : "";
this._logger.info(`Rpc sending ${message_1.MsgType[msg.mtype]}${desc}`);
}
return catchMaybePromise(() => sendMessage(msg), (err) => this._sendReject(msg, err));
}
// Rejects a RpcCall due to the given send error; this helper always re-throws.
_sendReject(msg, err) {
const newErr = new ErrorWithCode("RPC_SEND_FAILED", `Send failed: ${err.message}`);
if (msg.mtype === message_1.MsgType.RpcCall && msg.reqId !== undefined) {
const callObj = this._pendingCalls.get(msg.reqId);
if (callObj) {
this._pendingCalls.delete(msg.reqId);
callObj.reject(newErr);
}
}
this.emit("error", newErr);
throw newErr;
}
_makeCallRaw(iface, meth, args, resultChecker, fwdDest) {

@@ -278,6 +349,4 @@ return new Promise((resolve, reject) => {

}
Promise.resolve().then(() => this._sendMessage(msg)).catch((err) => {
this._pendingCalls.delete(reqId);
reject(err);
});
// If _sendMessage fails, reject, allowing it to throw synchronously or not.
catchMaybePromise(() => this._sendMessage(msg), reject);
});

@@ -303,2 +372,10 @@ }

}
case message_1.MsgType.Ready: {
this._waitForReadyMessage = false;
try {
this._processOutgoing();
}
catch (e) { /* swallowing error, an event 'error' was already emitted */ }
return;
}
}

@@ -459,4 +536,6 @@ }

try {
for (; i < queue.length; ++i) {
processFunc(queue[i]);
while (i < queue.length) {
// i gets read and then incremented before the call, so that if processFunc throws, the
// message still gets removed from the queue (to avoid processing it twice).
processFunc(queue[i++]);
}

@@ -468,1 +547,16 @@ }

}
/**
* Calls callback(), handling the exception both synchronously and not. If callback and handler
* both return or throw synchronously, then so does this method.
*/
function catchMaybePromise(callback, handler) {
try {
const p = callback();
if (p) {
return p.catch(handler);
}
}
catch (err) {
return handler(err);
}
}
{
"name": "grain-rpc",
"version": "0.1.5",
"version": "0.1.6",
"description": "Typed RPC interface on top of an arbitrary communication channel",

@@ -5,0 +5,0 @@ "main": "dist/lib/index",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc