Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

zerorpc

Package Overview
Dependencies
Maintainers
1
Versions
30
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

zerorpc - npm Package Compare versions

Comparing version 0.8.1 to 0.8.2

example.js

70

lib/channel.js

@@ -24,3 +24,4 @@ // Open Source Initiative OSI - The MIT License (MIT):Licensing

var util = require("util"),
var util = require("./util"),
nodeUtil = require("util"),
nodeEvents = require("events"),

@@ -39,4 +40,8 @@ events = require("./events"),

//Current verison of the protocol
var PROTOCOL_VERSION = 3;
//Heartbeat rate in milliseconds
var HEARTBEAT = 5000;
//Creates a new channel

@@ -58,4 +63,7 @@ //id : String

this._capacity = capacity;
this._fresh = true;
//Setup heartbeating on the channel
this._resetHeartbeat();
this._heartbeatRunner = this._runHeartbeat();
//Callbacks to call when the channel receives a message

@@ -66,6 +74,7 @@ this._callbacks = [];

this._inBuffer = new buffer.ChannelBuffer(capacity);
this._inBuffer.setCapacity(1);
this._outBuffer = new buffer.ChannelBuffer(1);
}
util.inherits(Channel, nodeEvents.EventEmitter);
nodeUtil.inherits(Channel, nodeEvents.EventEmitter);

@@ -90,7 +99,10 @@ //Registers a new callback to be run when this channel receives a message

if(event.args.length > 0 && typeof(event.args[0]) == "number") {
this._outBuffer.setCapacity(event.args[0]);
this.flush();
self._outBuffer.setCapacity(event.args[0]);
self.flush();
} else {
self.emit("error", "Invalid event: Bad buffer message");
self.emit("protocol-error", "Invalid event: Bad buffer message");
self.close();
}
} else if(event.name == "_zpc_hb") {
self._resetHeartbeat();
} else if(self._state == CHANNEL_OPEN) {

@@ -103,7 +115,4 @@ //Enqueue the message in the buffer

//buffer
if(self._fresh) {
self._fresh = false;
if(self._inBuffer.getCapacity() < self._inBuffer.length() / 2) {
self._resetCapacity();
} else if(self._inBuffer.getCapacity() < self._inBuffer.length() / 2) {
self._resetCapacity();
}

@@ -132,2 +141,3 @@

Channel.prototype.close = function() {
clearTimeout(this._heartbeatRunner);
this._state = CHANNEL_CLOSING;

@@ -172,7 +182,2 @@ this.emit("closing");

}
if(this._fresh) {
this._fresh = false;
this._resetCapacity();
}
};

@@ -185,2 +190,24 @@

//Runs the heartbeat on this channel
Channel.prototype._runHeartbeat = function() {
var self = this;
return setInterval(function() {
if(util.curTime() > this._heartbeatExpirationTime) {
//If we haven't received a response in 2 * heartbeat rate, send an
//error
self.emit("heartbeat-error", "Lost remote after " + (HEARTBEAT * 2) + "ms");
self.close();
} else {
//Heartbeat on the channel
self.send("_zpc_hb", [0]);
}
}, HEARTBEAT);
};
//Resets the heartbeat expiration time
Channel.prototype._resetHeartbeat = function() {
this._heartbeatExpirationTime = util.curTime() + HEARTBEAT * 2;
};
//Updates the capacity and sends a _zpc_more event

@@ -205,3 +232,3 @@ Channel.prototype._resetCapacity = function() {

util.inherits(ServerChannel, Channel);
nodeUtil.inherits(ServerChannel, Channel);

@@ -215,10 +242,15 @@ //Creates a new client-side buffer

Channel.call(this, events.fastUUID(), null, socket, capacity);
this._fresh = true;
}
util.inherits(ClientChannel, Channel);
nodeUtil.inherits(ClientChannel, Channel);
ClientChannel.prototype._createHeader = function(name, args) {
return this._fresh ? { v: PROTOCOL_VERSION, message_id: this.id }
: Channel.prototype._createHeader.call(this);
if(this._fresh) {
this._fresh = false;
return { v: PROTOCOL_VERSION, message_id: this.id };
} else {
return Channel.prototype._createHeader.call(this);
}
}

@@ -225,0 +257,0 @@

@@ -84,3 +84,2 @@ // Open Source Initiative OSI - The MIT License (MIT):Licensing

middleware.addTimeout(self._timeout * 1000, ch, callbackErrorWrapper);
middleware.addHeartbeat(ch, callbackErrorWrapper);

@@ -95,3 +94,3 @@ //Associated callbacks to execute for various events

var error = util.createErrorResponse(event.args[0], event.args[1], event.args[2]);
callback(error, undefined, false);
callbackErrorWrapper(error);
ch.close();

@@ -121,6 +120,22 @@ },

} else {
self.emit("error", "Invalid event: Unknown event name");
//Send an error if the server sent a bad event - this should
//never happen
var error = util.createErrorResponse("ProtocolError", "Invalid event: Unknown event name", "");
callbackErrorWrapper(error);
ch.close();
}
});
//Listen for protocol errors - this should never happen
ch.on("protocol-error", function(error) {
var error = util.createErrorResponse("ProtocolError", error, "");
callbackErrorWrapper(error);
});
//Listen for heartbeat errors
ch.on("heartbeat-error", function(error) {
var error = util.createErrorResponse("HeartbeatError", error, "");
callbackErrorWrapper(error);
});
ch.send(method, args);

@@ -127,0 +142,0 @@ };

@@ -0,1 +1,24 @@

// Open Source Initiative OSI - The MIT License (MIT):Licensing
//
// The MIT License (MIT)
// Copyright (c) 2012 DotCloud Inc (opensource@dotcloud.com)
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
var msgpack = require("msgpack2"),

@@ -2,0 +25,0 @@ uuid = require("node-uuid");

@@ -43,39 +43,2 @@ // Open Source Initiative OSI - The MIT License (MIT):Licensing

//Adds a heartbeat on a channel - if a heartbeat response isn't received in a
//reasonable amount of time, we execute the callback with a heartbeat error.
function addHeartbeat(channel, callback) {
var nextExpirationTime = function() {
return util.curTime() + HEARTBEAT * 2;
}
var expirationTime = nextExpirationTime();
var runner = setInterval(function() {
if(util.curTime() > expirationTime) {
//If we haven't received a response in 2 * heartbeat rate, send an
//error
var error = util.createErrorResponse("LostRemote", "Lost remote after " + HEARTBEAT + "ms heartbeat (waited twice as long)");
callback(error);
channel.close();
} else {
//Heartbeat on the channel
channel.send("_zpc_hb", [0]);
}
}, HEARTBEAT);
channel.register(function(event, next) {
if(event.name == "_zpc_hb") {
expirationTime = nextExpirationTime();
} else {
next();
}
});
///Clear the heartbeat when the channel is closed
channel.on("closing", function() {
clearTimeout(runner);
});
}
exports.addTimeout = addTimeout;
exports.addHeartbeat = addHeartbeat;
exports.addTimeout = addTimeout;

@@ -122,7 +122,2 @@ // Open Source Initiative OSI - The MIT License (MIT):Licensing

//Adds heartbeating
middleware.addHeartbeat(ch, function(error) {
if(error) self.emit("error", error);
});
//Sends an error

@@ -135,2 +130,12 @@ var sendError = function(error) {

//Listen for protocol errors - this should never happen
ch.on("protocol-error", function(error) {
self.emit("error", error);
});
//Listen for heartbeat errors
ch.on("heartbeat-error", function(error) {
self.emit("error", error);
});
//This is passed to RPC methods to call when they finish, or have a stream

@@ -137,0 +142,0 @@ //update

@@ -63,2 +63,4 @@ // Open Source Initiative OSI - The MIT License (MIT):Licensing

//console.log("RECV", event);
//Emit the event

@@ -75,2 +77,3 @@ self.emit("socket/receive", event);

Socket.prototype.send = function(event) {
//console.log("SEND", event);
var message = events.serialize(event);

@@ -77,0 +80,0 @@ this._zmqSocket.send.call(this._zmqSocket, message);

{
"name": "zerorpc",
"version": "0.8.1",
"version": "0.8.2",
"main": "./index.js",

@@ -5,0 +5,0 @@ "author": "dotCloud <opensource@dotcloud.com>",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc