socket.io-redis
Advanced tools
Comparing version 1.1.1 to 2.0.0
2.0.0 / 2016-11-28 | ||
=================== | ||
* [chore] Bump socket.io-adapter to version 0.5.0 (#145) | ||
* [chore] Bump debug to version 2.3.3 (#147) | ||
* [chore] Bump redis to version 2.6.3 (#148) | ||
* [chore] Bump async library to 2.1.4 (#62) | ||
* [feature] Add a `local` flag (#119) | ||
* [feature] Refactor requests between nodes and add `clientRooms` method (#146) | ||
* [feature] Add an option to disable channel multiplexing (#140) | ||
* [fix] receive a message only once per-emit (not per-joined rooms) (#151) | ||
* [chore] Bump mocha to 3.2.0 (#152) | ||
1.1.1 / 2016-09-26 | ||
@@ -3,0 +16,0 @@ ================== |
273
index.js
@@ -20,2 +20,11 @@ | ||
/** | ||
* Request types, for messages between nodes | ||
*/ | ||
var requestTypes = { | ||
clients: 0, | ||
clientRooms: 1, | ||
}; | ||
/** | ||
* Returns a redis Adapter class. | ||
@@ -43,3 +52,4 @@ * | ||
var subEvent = opts.subEvent || 'message'; | ||
var clientsTimeout = opts.clientsTimeout || 1000; | ||
var requestsTimeout = opts.requestsTimeout || 1000; | ||
var withChannelMultiplexing = false !== opts.withChannelMultiplexing; | ||
@@ -55,7 +65,5 @@ // init clients if needed | ||
} | ||
if (!pub) pub = createClient(); | ||
if (!sub) sub = createClient({ return_buffers: true }); | ||
var subJson = sub.duplicate({ return_buffers: false }); | ||
@@ -77,6 +85,9 @@ // this server's key | ||
this.prefix = prefix; | ||
this.clientsTimeout = clientsTimeout; | ||
this.requestsTimeout = requestsTimeout; | ||
this.withChannelMultiplexing = withChannelMultiplexing; | ||
this.channel = prefix + '#' + nsp.name + '#'; | ||
this.syncChannel = prefix + '-sync#request#' + this.nsp.name + '#'; | ||
this.requestChannel = prefix + '-request#' + this.nsp.name + '#'; | ||
this.responseChannel = prefix + '-response#' + this.nsp.name + '#'; | ||
this.requests = {}; | ||
@@ -97,12 +108,7 @@ if (String.prototype.startsWith) { | ||
sub.subscribe(this.channel, function(err){ | ||
sub.subscribe([this.channel, this.requestChannel, this.responseChannel], function(err){ | ||
if (err) self.emit('error', err); | ||
}); | ||
subJson.subscribe(this.syncChannel, function(err){ | ||
if (err) self.emit('error', err); | ||
}); | ||
sub.on(subEvent, this.onmessage.bind(this)); | ||
subJson.on(subEvent, this.onclients.bind(this)); | ||
} | ||
@@ -123,5 +129,12 @@ | ||
Redis.prototype.onmessage = function(channel, msg){ | ||
if (!this.channelMatches(channel.toString(), this.channel)) { | ||
channel = channel.toString(); | ||
if (this.channelMatches(channel, this.requestChannel)) { | ||
return this.onrequest(channel, msg); | ||
} else if (this.channelMatches(channel, this.responseChannel)) { | ||
return this.onresponse(channel, msg); | ||
} else if (!this.channelMatches(channel, this.channel)) { | ||
return debug('ignore different channel'); | ||
} | ||
var args = msgpack.decode(msg); | ||
@@ -148,3 +161,3 @@ var packet; | ||
/** | ||
* Called with a subscription message on sync | ||
* Called on request from another node | ||
* | ||
@@ -154,12 +167,68 @@ * @api private | ||
Redis.prototype.onclients = function(channel, msg){ | ||
Redis.prototype.onrequest = function(channel, msg){ | ||
var self = this; | ||
var request; | ||
if (!self.channelMatches(channel.toString(), self.syncChannel)) { | ||
return debug('ignore different channel'); | ||
try { | ||
request = JSON.parse(msg); | ||
} catch(err){ | ||
self.emit('error', err); | ||
return; | ||
} | ||
debug('received request %j', request); | ||
switch (request.type) { | ||
case requestTypes.clients: | ||
Adapter.prototype.clients.call(self, request.rooms, function(err, clients){ | ||
if(err){ | ||
self.emit('error', err); | ||
return; | ||
} | ||
var response = JSON.stringify({ | ||
requestid: request.requestid, | ||
clients: clients | ||
}); | ||
pub.publish(self.responseChannel, response); | ||
}); | ||
break; | ||
case requestTypes.clientRooms: | ||
Adapter.prototype.clientRooms.call(self, request.sid, function(err, rooms){ | ||
if(err){ | ||
self.emit('error', err); | ||
return; | ||
} | ||
if (!rooms) { return; } | ||
var response = JSON.stringify({ | ||
requestid: request.requestid, | ||
rooms: rooms | ||
}); | ||
pub.publish(self.responseChannel, response); | ||
}); | ||
break; | ||
default: | ||
debug('ignoring unknown request type: %s', request.type); | ||
} | ||
}; | ||
/** | ||
* Called on response from another node | ||
* | ||
* @api private | ||
*/ | ||
Redis.prototype.onresponse = function(channel, msg){ | ||
var self = this; | ||
var response; | ||
try { | ||
var decoded = JSON.parse(msg); | ||
response = JSON.parse(msg); | ||
} catch(err){ | ||
@@ -170,16 +239,39 @@ self.emit('error', err); | ||
Adapter.prototype.clients.call(self, decoded.rooms, function(err, clients){ | ||
if(err){ | ||
self.emit('error', err); | ||
return; | ||
} | ||
if (!response.requestid || !self.requests[response.requestid]) { | ||
debug('ignoring unknown request'); | ||
return; | ||
} | ||
var responseChn = prefix + '-sync#response#' + decoded.transaction; | ||
var response = JSON.stringify({ | ||
clients : clients | ||
}); | ||
debug('received response %j', response); | ||
pub.publish(responseChn, response); | ||
}); | ||
var request = self.requests[response.requestid]; | ||
switch (request.type) { | ||
case requestTypes.clients: | ||
request.msgCount++; | ||
// ignore if response does not contain 'clients' key | ||
if(!response.clients || !Array.isArray(response.clients)) return; | ||
for(var i = 0; i < response.clients.length; i++){ | ||
request.clients[response.clients[i]] = true; | ||
} | ||
if (request.msgCount === request.numsub) { | ||
clearTimeout(request.timeout); | ||
if (request.callback) process.nextTick(request.callback.bind(null, null, Object.keys(request.clients))); | ||
delete self.requests[request.requestid]; | ||
} | ||
break; | ||
case requestTypes.clientRooms: | ||
clearTimeout(request.timeout); | ||
if (request.callback) process.nextTick(request.callback.bind(null, null, response.rooms)); | ||
delete self.requests[request.requestid]; | ||
break; | ||
default: | ||
debug('ignoring unknown request type: %s', request.type); | ||
} | ||
}; | ||
@@ -198,10 +290,7 @@ | ||
packet.nsp = this.nsp.name; | ||
if (!remote) { | ||
if (!(remote || (opts && opts.flags && opts.flags.local))) { | ||
var self = this; | ||
var msg = msgpack.encode([uid, packet, opts]); | ||
if (opts.rooms) { | ||
opts.rooms.forEach(function(room) { | ||
var chnRoom = self.channel + room + '#'; | ||
pub.publish(chnRoom, msg); | ||
}); | ||
if (self.withChannelMultiplexing && opts.rooms && opts.rooms.length === 1) { | ||
pub.publish(self.channel + opts.rooms[0] + '#', msg); | ||
} else { | ||
@@ -227,2 +316,7 @@ pub.publish(self.channel, msg); | ||
Adapter.prototype.add.call(this, id, room); | ||
if (!this.withChannelMultiplexing) { | ||
if (fn) fn(null); | ||
return; | ||
} | ||
var channel = this.channel + room + '#'; | ||
@@ -255,3 +349,3 @@ sub.subscribe(channel, function(err){ | ||
if (hasRoom && !this.rooms[room]) { | ||
if (this.withChannelMultiplexing && hasRoom && !this.rooms[room]) { | ||
var channel = this.channel + room + '#'; | ||
@@ -290,3 +384,3 @@ sub.unsubscribe(channel, function(err){ | ||
async.forEach(Object.keys(rooms), function(room, next){ | ||
async.each(Object.keys(rooms), function(room, next){ | ||
self.del(id, room, next); | ||
@@ -308,2 +402,3 @@ }, function(err){ | ||
* @param {Array} explicit set of rooms to check. | ||
* @param {Function} callback | ||
* @api public | ||
@@ -321,7 +416,5 @@ */ | ||
var self = this; | ||
var requestid = uid2(6); | ||
var transaction = uid2(6); | ||
var responseChn = prefix + '-sync#response#' + transaction; | ||
pub.send_command('pubsub', ['numsub', self.syncChannel], function(err, numsub){ | ||
pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){ | ||
if (err) { | ||
@@ -335,53 +428,67 @@ self.emit('error', err); | ||
var msg_count = 0; | ||
var clients = {}; | ||
var request = JSON.stringify({ | ||
requestid : requestid, | ||
type: requestTypes.clients, | ||
rooms : rooms | ||
}); | ||
subJson.subscribe(responseChn, function(err) { | ||
if (err) { | ||
self.emit('error', err); | ||
if (fn) fn(err); | ||
return; | ||
} | ||
// if there is no response for x second, return result | ||
var timeout = setTimeout(function() { | ||
var request = self.requests[requestid]; | ||
if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for clients response'), Object.keys(request.clients))); | ||
delete self.requests[requestid]; | ||
}, self.requestsTimeout); | ||
var request = JSON.stringify({ | ||
transaction : transaction, | ||
rooms : rooms | ||
}); | ||
self.requests[requestid] = { | ||
type: requestTypes.clients, | ||
numsub: numsub, | ||
msgCount: 0, | ||
clients: {}, | ||
callback: fn, | ||
timeout: timeout | ||
}; | ||
/*If there is no response for 1 second, return result;*/ | ||
var timeout = setTimeout(function() { | ||
if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients))); | ||
}, self.clientsTimeout); | ||
pub.publish(self.requestChannel, request); | ||
}); | ||
}; | ||
subJson.on(subEvent, function onEvent(channel, msg) { | ||
/** | ||
* Gets the list of rooms a given client has joined. | ||
* | ||
* @param {String} client id | ||
* @param {Function} callback | ||
* @api public | ||
*/ | ||
if (!self.channelMatches(channel.toString(), responseChn)) { | ||
return debug('ignore different channel'); | ||
} | ||
Redis.prototype.clientRooms = function(id, fn){ | ||
var response = JSON.parse(msg); | ||
var self = this; | ||
var requestid = uid2(6); | ||
//Ignore if response does not contain 'clients' key | ||
if(!response.clients || !Array.isArray(response.clients)) return; | ||
for(var i = 0; i < response.clients.length; i++){ | ||
clients[response.clients[i]] = true; | ||
} | ||
var rooms = this.sids[id]; | ||
msg_count++; | ||
if(msg_count == numsub){ | ||
clearTimeout(timeout); | ||
subJson.unsubscribe(responseChn); | ||
subJson.removeListener(subEvent, onEvent); | ||
if (rooms) { | ||
if (fn) process.nextTick(fn.bind(null, null, Object.keys(rooms))); | ||
return; | ||
} | ||
if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients))); | ||
} | ||
}); | ||
var request = JSON.stringify({ | ||
requestid : requestid, | ||
type: requestTypes.clientRooms, | ||
sid : id | ||
}); | ||
pub.publish(self.syncChannel, request); | ||
// if there is no response for x second, return result | ||
var timeout = setTimeout(function() { | ||
if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for rooms response'))); | ||
delete self.requests[requestid]; | ||
}, self.requestsTimeout); | ||
}); | ||
self.requests[requestid] = { | ||
type: requestTypes.clientRooms, | ||
callback: fn, | ||
timeout: timeout | ||
}; | ||
}); | ||
pub.publish(self.requestChannel, request); | ||
}; | ||
@@ -393,3 +500,3 @@ | ||
Redis.prefix = prefix; | ||
Redis.clientsTimeout = clientsTimeout; | ||
Redis.requestsTimeout = requestsTimeout; | ||
@@ -396,0 +503,0 @@ return Redis; |
{ | ||
"name": "socket.io-redis", | ||
"version": "1.1.1", | ||
"version": "2.0.0", | ||
"description": "", | ||
@@ -17,7 +17,7 @@ "license": "MIT", | ||
"dependencies": { | ||
"async": "0.9.0", | ||
"debug": "2.2.0", | ||
"async": "2.1.4", | ||
"debug": "2.3.3", | ||
"msgpack-js": "0.3.0", | ||
"redis": "2.4.2", | ||
"socket.io-adapter": "0.4.0", | ||
"redis": "2.6.3", | ||
"socket.io-adapter": "0.5.0", | ||
"uid2": "0.0.3" | ||
@@ -28,3 +28,3 @@ }, | ||
"ioredis": "2.4.0", | ||
"mocha": "1.18.0", | ||
"mocha": "3.2.0", | ||
"socket.io": "socketio/socket.io", | ||
@@ -31,0 +31,0 @@ "socket.io-client": "socketio/socket.io-client" |
@@ -38,3 +38,4 @@ # socket.io-redis | ||
- `subClient`: optional, the redis client to subscribe to events on | ||
- `clientsTimeout`: optional, after this timeout the adapter will stop waiting from responses to `clients` request (`1000ms`) | ||
- `requestsTimeout`: optional, after this timeout the adapter will stop waiting from responses to request (`1000ms`) | ||
- `withChannelMultiplexing`: optional, whether channel multiplexing is enabled (a new subscription will be trigggered for each room) (`true`) | ||
@@ -60,3 +61,3 @@ If you decide to supply `pubClient` and `subClient`, make sure you use | ||
- `subClient` | ||
- `clientsTimeout` | ||
- `requestsTimeout` | ||
@@ -67,2 +68,6 @@ ### RedisAdapter#clients(rooms:Array, fn:Function) | ||
### RedisAdapter#clientRooms(id:String, fn:Function) | ||
Returns the list of rooms the client with the given ID has joined (even on another node). | ||
## Client error handling | ||
@@ -69,0 +74,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
19923
392
124
+ Addedasync@2.1.4(transitive)
+ Addeddebug@2.3.3(transitive)
+ Addedjson3@3.3.2(transitive)
+ Addedlodash@4.17.21(transitive)
+ Addedms@0.7.2(transitive)
+ Addedredis@2.6.3(transitive)
+ Addedredis-parser@2.6.0(transitive)
+ Addedsocket.io-adapter@0.5.0(transitive)
+ Addedsocket.io-parser@2.3.1(transitive)
- Removedasync@0.9.0(transitive)
- Removedbenchmark@1.0.0(transitive)
- Removeddebug@0.7.4(transitive)
- Removedjson3@3.2.6(transitive)
- Removedredis@2.4.2(transitive)
- Removedsocket.io-adapter@0.4.0(transitive)
- Removedsocket.io-parser@2.2.2(transitive)
Updatedasync@2.1.4
Updateddebug@2.3.3
Updatedredis@2.6.3
Updatedsocket.io-adapter@0.5.0