grain-rpc
Advanced tools
Comparing version 0.1.5 to 0.1.6
@@ -12,3 +12,4 @@ /** | ||
RpcRespErr = 3, | ||
Custom = 4 | ||
Custom = 4, | ||
Ready = 5 | ||
} | ||
@@ -39,3 +40,6 @@ export interface IMsgRpcCall { | ||
} | ||
export interface IMsgReady { | ||
mtype: MsgType.Ready; | ||
} | ||
export declare type IMsgRpc = IMsgRpcCall | IMsgRpcRespData | IMsgRpcRespErr; | ||
export declare type IMessage = IMsgRpc | IMsgCustom; | ||
export declare type IMessage = IMsgRpc | IMsgCustom | IMsgReady; |
@@ -17,2 +17,3 @@ "use strict"; | ||
MsgType[MsgType["Custom"] = 4] = "Custom"; | ||
MsgType[MsgType["Ready"] = 5] = "Ready"; | ||
})(MsgType = exports.MsgType || (exports.MsgType = {})); |
@@ -79,3 +79,3 @@ /// <reference types="node" /> | ||
import { IMessage, IMsgCustom, IMsgRpcCall } from "./message"; | ||
export declare type SendMessageCB = (msg: IMessage) => PromiseLike<void> | void; | ||
export declare type SendMessageCB = (msg: IMessage) => Promise<void> | void; | ||
export interface IForwarderDest { | ||
@@ -90,2 +90,3 @@ forwardCall: (c: IMsgRpcCall) => Promise<any>; | ||
private _inactiveSendQueue; | ||
private _waitForReadyMessage; | ||
private _logger; | ||
@@ -98,5 +99,5 @@ private _callWrapper; | ||
/** | ||
* To use Rpc, you must call start() with a function that sends a message to the other side. If | ||
* you pass in such a function to the constructor, it's the same as calling start() right away. | ||
* You must also call receiveMessage() for every message received from the other side. | ||
* To use Rpc, you must provide a sendMessage function that sends a message to the other side; | ||
* it may be given in the constructor, or later with setSendMessage. You must also call | ||
* receiveMessage() for every message received from the other side. | ||
*/ | ||
@@ -111,15 +112,32 @@ constructor(options?: { | ||
*/ | ||
receiveMessage(msg: any): void; | ||
receiveMessage(msg: IMessage): void; | ||
/** | ||
* Until start() is called, received and sent messages are queued. This gives you an opportunity | ||
* to register implementations and add "message" listeners without the risk of missing messages, | ||
* even if receiveMessage() has already started being called. | ||
* If you've set up calls to receiveMessage(), but need time to call registerImpl() before | ||
* processing new messages, you may use queueIncoming(), make the registerImpl() calls, | ||
* and then call processIncoming() to handle queued messages and resume normal processing. | ||
*/ | ||
start(sendMessage: SendMessageCB): void; | ||
queueIncoming(): void; | ||
/** | ||
* Calling stop() resume the same state as before start was called: received and sent messages are | ||
* queued. | ||
* Process received messages queued since queueIncoming, and resume normal processing of | ||
* received messages. | ||
*/ | ||
stop(): void; | ||
processIncoming(): void; | ||
/** | ||
* Set the callback to send messages. If set to null, sent messages will be queued. If you | ||
* disconnect and want for sent messages to throw, set a callback that throws. | ||
*/ | ||
setSendMessage(sendMessage: SendMessageCB | null): void; | ||
/** | ||
* If your peer may not be listening yet to your messages, you may call this to queue outgoing | ||
* messages until receiving a "ready" message from the peer. I.e. one peer may call | ||
* queueOutgoingUntilReadyMessage() while the other calls sendReadyMessage(). | ||
*/ | ||
queueOutgoingUntilReadyMessage(): void; | ||
/** | ||
* If your peer is using queueOutgoingUntilReadyMessage(), you should let it know that you are | ||
* ready using sendReadyMessage() as soon as you've set up the processing of received messages. | ||
* Note that at most one peer may use queueOutgoingUntilReadyMessage(), or they will deadlock. | ||
*/ | ||
sendReadyMessage(): void | Promise<void>; | ||
/** | ||
* Messaging interface: send data to the other side, to be emitted there as a "message" event. | ||
@@ -168,3 +186,7 @@ */ | ||
forwardMessage(msg: IMsgCustom): Promise<any>; | ||
private _queueOutgoing; | ||
private _processOutgoing; | ||
private _sendMessage; | ||
private _sendMessageOrReject; | ||
private _sendReject; | ||
private _makeCallRaw; | ||
@@ -171,0 +193,0 @@ private _makeCall; |
@@ -91,10 +91,13 @@ "use strict"; | ||
/** | ||
* To use Rpc, you must call start() with a function that sends a message to the other side. If | ||
* you pass in such a function to the constructor, it's the same as calling start() right away. | ||
* You must also call receiveMessage() for every message received from the other side. | ||
* To use Rpc, you must provide a sendMessage function that sends a message to the other side; | ||
* it may be given in the constructor, or later with setSendMessage. You must also call | ||
* receiveMessage() for every message received from the other side. | ||
*/ | ||
constructor(options = {}) { | ||
super(); | ||
this._inactiveRecvQueue = []; // queue of received message | ||
this._inactiveSendQueue = []; // queue of messages to be sent | ||
// Note the invariant: _inactiveSendQueue == null iff (_sendMessageCB != null && !_waitForReadyMessage) | ||
this._sendMessageCB = null; | ||
this._inactiveRecvQueue = null; // queue of received message | ||
this._inactiveSendQueue = null; // queue of messages to be sent | ||
this._waitForReadyMessage = false; | ||
this._implMap = new Map(); | ||
@@ -106,4 +109,4 @@ this._forwarders = new Map(); | ||
this._logger = logger; | ||
this._sendMessageCB = sendMessage; | ||
this._callWrapper = callWrapper; | ||
this.setSendMessage(sendMessage); | ||
} | ||
@@ -114,3 +117,3 @@ /** | ||
receiveMessage(msg) { | ||
if (!this._sendMessageCB) { | ||
if (this._inactiveRecvQueue) { | ||
this._inactiveRecvQueue.push(msg); | ||
@@ -123,20 +126,52 @@ } | ||
/** | ||
* Until start() is called, received and sent messages are queued. This gives you an opportunity | ||
* to register implementations and add "message" listeners without the risk of missing messages, | ||
* even if receiveMessage() has already started being called. | ||
* If you've set up calls to receiveMessage(), but need time to call registerImpl() before | ||
* processing new messages, you may use queueIncoming(), make the registerImpl() calls, | ||
* and then call processIncoming() to handle queued messages and resume normal processing. | ||
*/ | ||
start(sendMessage) { | ||
// Message sent by `_dispatch(...)` are appended to the send queue | ||
processQueue(this._inactiveRecvQueue, this._dispatch.bind(this)); | ||
processQueue(this._inactiveSendQueue, sendMessage); | ||
queueIncoming() { | ||
if (!this._inactiveRecvQueue) { | ||
this._inactiveRecvQueue = []; | ||
} | ||
} | ||
/** | ||
* Process received messages queued since queueIncoming, and resume normal processing of | ||
* received messages. | ||
*/ | ||
processIncoming() { | ||
if (this._inactiveRecvQueue) { | ||
processQueue(this._inactiveRecvQueue, this._dispatch.bind(this)); | ||
this._inactiveRecvQueue = null; | ||
} | ||
} | ||
/** | ||
* Set the callback to send messages. If set to null, sent messages will be queued. If you | ||
* disconnect and want for sent messages to throw, set a callback that throws. | ||
*/ | ||
setSendMessage(sendMessage) { | ||
this._sendMessageCB = sendMessage; | ||
if (this._sendMessageCB) { | ||
this._processOutgoing(); | ||
} | ||
else { | ||
this._queueOutgoing(); | ||
} | ||
} | ||
/** | ||
* Calling stop() resume the same state as before start was called: received and sent messages are | ||
* queued. | ||
* If your peer may not be listening yet to your messages, you may call this to queue outgoing | ||
* messages until receiving a "ready" message from the peer. I.e. one peer may call | ||
* queueOutgoingUntilReadyMessage() while the other calls sendReadyMessage(). | ||
*/ | ||
stop() { | ||
this._sendMessageCB = null; | ||
queueOutgoingUntilReadyMessage() { | ||
this._waitForReadyMessage = true; | ||
this._queueOutgoing(); | ||
} | ||
/** | ||
* If your peer is using queueOutgoingUntilReadyMessage(), you should let it know that you are | ||
* ready using sendReadyMessage() as soon as you've set up the processing of received messages. | ||
* Note that at most one peer may use queueOutgoingUntilReadyMessage(), or they will deadlock. | ||
*/ | ||
sendReadyMessage() { | ||
return this._sendMessage({ mtype: message_1.MsgType.Ready }); | ||
} | ||
/** | ||
* Messaging interface: send data to the other side, to be emitted there as a "message" event. | ||
@@ -257,10 +292,46 @@ */ | ||
} | ||
// Mark outgoing messages for queueing. | ||
_queueOutgoing() { | ||
if (!this._inactiveSendQueue) { | ||
this._inactiveSendQueue = []; | ||
} | ||
} | ||
// If sendMessageCB is set and we are no longer waiting for a ready message, send out any | ||
// queued outgoing messages and resume normal sending. | ||
_processOutgoing() { | ||
if (this._inactiveSendQueue && this._sendMessageCB && !this._waitForReadyMessage) { | ||
processQueue(this._inactiveSendQueue, this._sendMessageOrReject.bind(this, this._sendMessageCB)); | ||
this._inactiveSendQueue = null; | ||
} | ||
} | ||
_sendMessage(msg) { | ||
if (!this._sendMessageCB) { | ||
if (this._inactiveSendQueue) { | ||
this._inactiveSendQueue.push(msg); | ||
} | ||
else { | ||
return this._sendMessageCB(msg); | ||
return this._sendMessageOrReject(this._sendMessageCB, msg); | ||
} | ||
} | ||
// This helper calls calls sendMessage(msg), and if that call fails, rejects the call | ||
// represented by msg (when it's of type RpcCall). | ||
_sendMessageOrReject(sendMessage, msg) { | ||
if (this._logger.info) { | ||
const desc = (msg.mtype === message_1.MsgType.RpcCall) ? ": " + this._callDesc(msg) : ""; | ||
this._logger.info(`Rpc sending ${message_1.MsgType[msg.mtype]}${desc}`); | ||
} | ||
return catchMaybePromise(() => sendMessage(msg), (err) => this._sendReject(msg, err)); | ||
} | ||
// Rejects a RpcCall due to the given send error; this helper always re-throws. | ||
_sendReject(msg, err) { | ||
const newErr = new ErrorWithCode("RPC_SEND_FAILED", `Send failed: ${err.message}`); | ||
if (msg.mtype === message_1.MsgType.RpcCall && msg.reqId !== undefined) { | ||
const callObj = this._pendingCalls.get(msg.reqId); | ||
if (callObj) { | ||
this._pendingCalls.delete(msg.reqId); | ||
callObj.reject(newErr); | ||
} | ||
} | ||
this.emit("error", newErr); | ||
throw newErr; | ||
} | ||
_makeCallRaw(iface, meth, args, resultChecker, fwdDest) { | ||
@@ -278,6 +349,4 @@ return new Promise((resolve, reject) => { | ||
} | ||
Promise.resolve().then(() => this._sendMessage(msg)).catch((err) => { | ||
this._pendingCalls.delete(reqId); | ||
reject(err); | ||
}); | ||
// If _sendMessage fails, reject, allowing it to throw synchronously or not. | ||
catchMaybePromise(() => this._sendMessage(msg), reject); | ||
}); | ||
@@ -303,2 +372,10 @@ } | ||
} | ||
case message_1.MsgType.Ready: { | ||
this._waitForReadyMessage = false; | ||
try { | ||
this._processOutgoing(); | ||
} | ||
catch (e) { /* swallowing error, an event 'error' was already emitted */ } | ||
return; | ||
} | ||
} | ||
@@ -459,4 +536,6 @@ } | ||
try { | ||
for (; i < queue.length; ++i) { | ||
processFunc(queue[i]); | ||
while (i < queue.length) { | ||
// i gets read and then incremented before the call, so that if processFunc throws, the | ||
// message still gets removed from the queue (to avoid processing it twice). | ||
processFunc(queue[i++]); | ||
} | ||
@@ -468,1 +547,16 @@ } | ||
} | ||
/** | ||
* Calls callback(), handling the exception both synchronously and not. If callback and handler | ||
* both return or throw synchronously, then so does this method. | ||
*/ | ||
function catchMaybePromise(callback, handler) { | ||
try { | ||
const p = callback(); | ||
if (p) { | ||
return p.catch(handler); | ||
} | ||
} | ||
catch (err) { | ||
return handler(err); | ||
} | ||
} |
{ | ||
"name": "grain-rpc", | ||
"version": "0.1.5", | ||
"version": "0.1.6", | ||
"description": "Typed RPC interface on top of an arbitrary communication channel", | ||
@@ -5,0 +5,0 @@ "main": "dist/lib/index", |
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
39167
839
0