Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

socket.io-redis

Package Overview
Dependencies
Maintainers
2
Versions
25
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

socket.io-redis - npm Package Compare versions

Comparing version 1.1.1 to 2.0.0

13

History.md
2.0.0 / 2016-11-28
===================
* [chore] Bump socket.io-adapter to version 0.5.0 (#145)
* [chore] Bump debug to version 2.3.3 (#147)
* [chore] Bump redis to version 2.6.3 (#148)
* [chore] Bump async library to 2.1.4 (#62)
* [feature] Add a `local` flag (#119)
* [feature] Refactor requests between nodes and add `clientRooms` method (#146)
* [feature] Add an option to disable channel multiplexing (#140)
* [fix] receive a message only once per-emit (not per-joined rooms) (#151)
* [chore] Bump mocha to 3.2.0 (#152)
1.1.1 / 2016-09-26

@@ -3,0 +16,0 @@ ==================

273

index.js

@@ -20,2 +20,11 @@

/**
* Request types, for messages between nodes
*/
var requestTypes = {
clients: 0,
clientRooms: 1,
};
/**
* Returns a redis Adapter class.

@@ -43,3 +52,4 @@ *

var subEvent = opts.subEvent || 'message';
var clientsTimeout = opts.clientsTimeout || 1000;
var requestsTimeout = opts.requestsTimeout || 1000;
var withChannelMultiplexing = false !== opts.withChannelMultiplexing;

@@ -55,7 +65,5 @@ // init clients if needed

}
if (!pub) pub = createClient();
if (!sub) sub = createClient({ return_buffers: true });
var subJson = sub.duplicate({ return_buffers: false });

@@ -77,6 +85,9 @@ // this server's key

this.prefix = prefix;
this.clientsTimeout = clientsTimeout;
this.requestsTimeout = requestsTimeout;
this.withChannelMultiplexing = withChannelMultiplexing;
this.channel = prefix + '#' + nsp.name + '#';
this.syncChannel = prefix + '-sync#request#' + this.nsp.name + '#';
this.requestChannel = prefix + '-request#' + this.nsp.name + '#';
this.responseChannel = prefix + '-response#' + this.nsp.name + '#';
this.requests = {};

@@ -97,12 +108,7 @@ if (String.prototype.startsWith) {

sub.subscribe(this.channel, function(err){
sub.subscribe([this.channel, this.requestChannel, this.responseChannel], function(err){
if (err) self.emit('error', err);
});
subJson.subscribe(this.syncChannel, function(err){
if (err) self.emit('error', err);
});
sub.on(subEvent, this.onmessage.bind(this));
subJson.on(subEvent, this.onclients.bind(this));
}

@@ -123,5 +129,12 @@

Redis.prototype.onmessage = function(channel, msg){
if (!this.channelMatches(channel.toString(), this.channel)) {
channel = channel.toString();
if (this.channelMatches(channel, this.requestChannel)) {
return this.onrequest(channel, msg);
} else if (this.channelMatches(channel, this.responseChannel)) {
return this.onresponse(channel, msg);
} else if (!this.channelMatches(channel, this.channel)) {
return debug('ignore different channel');
}
var args = msgpack.decode(msg);

@@ -148,3 +161,3 @@ var packet;

/**
* Called with a subscription message on sync
* Called on request from another node
*

@@ -154,12 +167,68 @@ * @api private

Redis.prototype.onclients = function(channel, msg){
Redis.prototype.onrequest = function(channel, msg){
var self = this;
var request;
if (!self.channelMatches(channel.toString(), self.syncChannel)) {
return debug('ignore different channel');
try {
request = JSON.parse(msg);
} catch(err){
self.emit('error', err);
return;
}
debug('received request %j', request);
switch (request.type) {
case requestTypes.clients:
Adapter.prototype.clients.call(self, request.rooms, function(err, clients){
if(err){
self.emit('error', err);
return;
}
var response = JSON.stringify({
requestid: request.requestid,
clients: clients
});
pub.publish(self.responseChannel, response);
});
break;
case requestTypes.clientRooms:
Adapter.prototype.clientRooms.call(self, request.sid, function(err, rooms){
if(err){
self.emit('error', err);
return;
}
if (!rooms) { return; }
var response = JSON.stringify({
requestid: request.requestid,
rooms: rooms
});
pub.publish(self.responseChannel, response);
});
break;
default:
debug('ignoring unknown request type: %s', request.type);
}
};
/**
* Called on response from another node
*
* @api private
*/
Redis.prototype.onresponse = function(channel, msg){
var self = this;
var response;
try {
var decoded = JSON.parse(msg);
response = JSON.parse(msg);
} catch(err){

@@ -170,16 +239,39 @@ self.emit('error', err);

Adapter.prototype.clients.call(self, decoded.rooms, function(err, clients){
if(err){
self.emit('error', err);
return;
}
if (!response.requestid || !self.requests[response.requestid]) {
debug('ignoring unknown request');
return;
}
var responseChn = prefix + '-sync#response#' + decoded.transaction;
var response = JSON.stringify({
clients : clients
});
debug('received response %j', response);
pub.publish(responseChn, response);
});
var request = self.requests[response.requestid];
switch (request.type) {
case requestTypes.clients:
request.msgCount++;
// ignore if response does not contain 'clients' key
if(!response.clients || !Array.isArray(response.clients)) return;
for(var i = 0; i < response.clients.length; i++){
request.clients[response.clients[i]] = true;
}
if (request.msgCount === request.numsub) {
clearTimeout(request.timeout);
if (request.callback) process.nextTick(request.callback.bind(null, null, Object.keys(request.clients)));
delete self.requests[request.requestid];
}
break;
case requestTypes.clientRooms:
clearTimeout(request.timeout);
if (request.callback) process.nextTick(request.callback.bind(null, null, response.rooms));
delete self.requests[request.requestid];
break;
default:
debug('ignoring unknown request type: %s', request.type);
}
};

@@ -198,10 +290,7 @@

packet.nsp = this.nsp.name;
if (!remote) {
if (!(remote || (opts && opts.flags && opts.flags.local))) {
var self = this;
var msg = msgpack.encode([uid, packet, opts]);
if (opts.rooms) {
opts.rooms.forEach(function(room) {
var chnRoom = self.channel + room + '#';
pub.publish(chnRoom, msg);
});
if (self.withChannelMultiplexing && opts.rooms && opts.rooms.length === 1) {
pub.publish(self.channel + opts.rooms[0] + '#', msg);
} else {

@@ -227,2 +316,7 @@ pub.publish(self.channel, msg);

Adapter.prototype.add.call(this, id, room);
if (!this.withChannelMultiplexing) {
if (fn) fn(null);
return;
}
var channel = this.channel + room + '#';

@@ -255,3 +349,3 @@ sub.subscribe(channel, function(err){

if (hasRoom && !this.rooms[room]) {
if (this.withChannelMultiplexing && hasRoom && !this.rooms[room]) {
var channel = this.channel + room + '#';

@@ -290,3 +384,3 @@ sub.unsubscribe(channel, function(err){

async.forEach(Object.keys(rooms), function(room, next){
async.each(Object.keys(rooms), function(room, next){
self.del(id, room, next);

@@ -308,2 +402,3 @@ }, function(err){

* @param {Array} explicit set of rooms to check.
* @param {Function} callback
* @api public

@@ -321,7 +416,5 @@ */

var self = this;
var requestid = uid2(6);
var transaction = uid2(6);
var responseChn = prefix + '-sync#response#' + transaction;
pub.send_command('pubsub', ['numsub', self.syncChannel], function(err, numsub){
pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
if (err) {

@@ -335,53 +428,67 @@ self.emit('error', err);

var msg_count = 0;
var clients = {};
var request = JSON.stringify({
requestid : requestid,
type: requestTypes.clients,
rooms : rooms
});
subJson.subscribe(responseChn, function(err) {
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}
// if there is no response for x second, return result
var timeout = setTimeout(function() {
var request = self.requests[requestid];
if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for clients response'), Object.keys(request.clients)));
delete self.requests[requestid];
}, self.requestsTimeout);
var request = JSON.stringify({
transaction : transaction,
rooms : rooms
});
self.requests[requestid] = {
type: requestTypes.clients,
numsub: numsub,
msgCount: 0,
clients: {},
callback: fn,
timeout: timeout
};
/*If there is no response for 1 second, return result;*/
var timeout = setTimeout(function() {
if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients)));
}, self.clientsTimeout);
pub.publish(self.requestChannel, request);
});
};
subJson.on(subEvent, function onEvent(channel, msg) {
/**
* Gets the list of rooms a given client has joined.
*
* @param {String} client id
* @param {Function} callback
* @api public
*/
if (!self.channelMatches(channel.toString(), responseChn)) {
return debug('ignore different channel');
}
Redis.prototype.clientRooms = function(id, fn){
var response = JSON.parse(msg);
var self = this;
var requestid = uid2(6);
//Ignore if response does not contain 'clients' key
if(!response.clients || !Array.isArray(response.clients)) return;
for(var i = 0; i < response.clients.length; i++){
clients[response.clients[i]] = true;
}
var rooms = this.sids[id];
msg_count++;
if(msg_count == numsub){
clearTimeout(timeout);
subJson.unsubscribe(responseChn);
subJson.removeListener(subEvent, onEvent);
if (rooms) {
if (fn) process.nextTick(fn.bind(null, null, Object.keys(rooms)));
return;
}
if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients)));
}
});
var request = JSON.stringify({
requestid : requestid,
type: requestTypes.clientRooms,
sid : id
});
pub.publish(self.syncChannel, request);
// if there is no response for x second, return result
var timeout = setTimeout(function() {
if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for rooms response')));
delete self.requests[requestid];
}, self.requestsTimeout);
});
self.requests[requestid] = {
type: requestTypes.clientRooms,
callback: fn,
timeout: timeout
};
});
pub.publish(self.requestChannel, request);
};

@@ -393,3 +500,3 @@

Redis.prefix = prefix;
Redis.clientsTimeout = clientsTimeout;
Redis.requestsTimeout = requestsTimeout;

@@ -396,0 +503,0 @@ return Redis;

{
"name": "socket.io-redis",
"version": "1.1.1",
"version": "2.0.0",
"description": "",

@@ -17,7 +17,7 @@ "license": "MIT",

"dependencies": {
"async": "0.9.0",
"debug": "2.2.0",
"async": "2.1.4",
"debug": "2.3.3",
"msgpack-js": "0.3.0",
"redis": "2.4.2",
"socket.io-adapter": "0.4.0",
"redis": "2.6.3",
"socket.io-adapter": "0.5.0",
"uid2": "0.0.3"

@@ -28,3 +28,3 @@ },

"ioredis": "2.4.0",
"mocha": "1.18.0",
"mocha": "3.2.0",
"socket.io": "socketio/socket.io",

@@ -31,0 +31,0 @@ "socket.io-client": "socketio/socket.io-client"

@@ -38,3 +38,4 @@ # socket.io-redis

- `subClient`: optional, the redis client to subscribe to events on
- `clientsTimeout`: optional, after this timeout the adapter will stop waiting from responses to `clients` request (`1000ms`)
- `requestsTimeout`: optional, after this timeout the adapter will stop waiting from responses to request (`1000ms`)
- `withChannelMultiplexing`: optional, whether channel multiplexing is enabled (a new subscription will be trigggered for each room) (`true`)

@@ -60,3 +61,3 @@ If you decide to supply `pubClient` and `subClient`, make sure you use

- `subClient`
- `clientsTimeout`
- `requestsTimeout`

@@ -67,2 +68,6 @@ ### RedisAdapter#clients(rooms:Array, fn:Function)

### RedisAdapter#clientRooms(id:String, fn:Function)
Returns the list of rooms the client with the given ID has joined (even on another node).
## Client error handling

@@ -69,0 +74,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc