socket.io-redis
Advanced tools
Comparing version 0.1.4 to 0.2.0
0.2.0 / 2015-12-03 | ||
================== | ||
* package: bump `debug` | ||
* replace `detect_buffers` with `return_buffers`, update redis | ||
* remove duplicated `#` | ||
* remove redundancy and minor performance optimization | ||
* better instrumentation | ||
* fire `del` callback when unsubscribing | ||
* improve error handling | ||
* expose constructor properties in resulting adapter | ||
* remove `socket` option, as we would need two sockets anyways | ||
* listen for separate channels for namespaces and rooms | ||
0.1.4 / 2014-11-25 | ||
@@ -3,0 +17,0 @@ ================== |
155
index.js
@@ -12,2 +12,3 @@ | ||
var debug = require('debug')('socket.io-redis'); | ||
var async = require('async'); | ||
@@ -45,3 +46,2 @@ /** | ||
// opts | ||
var socket = opts.socket; | ||
var host = opts.host || '127.0.0.1'; | ||
@@ -54,11 +54,7 @@ var port = Number(opts.port || 6379); | ||
// init clients if needed | ||
if (!pub) pub = socket ? redis(socket) : redis(port, host); | ||
if (!sub) sub = socket | ||
? redis(socket, { detect_buffers: true }) | ||
: redis(port, host, {detect_buffers: true}); | ||
if (!pub) pub = redis(port, host); | ||
if (!sub) sub = redis(port, host, { return_buffers: true }); | ||
// this server's key | ||
var uid = uid2(6); | ||
var key = prefix + '#' + uid; | ||
@@ -75,7 +71,12 @@ /** | ||
this.uid = uid; | ||
this.prefix = prefix; | ||
this.pubClient = pub; | ||
this.subClient = sub; | ||
var self = this; | ||
sub.psubscribe(prefix + '#*', function(err){ | ||
sub.subscribe(prefix + '#' + nsp.name + '#', function(err){ | ||
if (err) self.emit('error', err); | ||
}); | ||
sub.on('pmessage', this.onmessage.bind(this)); | ||
sub.on('message', this.onmessage.bind(this)); | ||
} | ||
@@ -95,12 +96,15 @@ | ||
Redis.prototype.onmessage = function(pattern, channel, msg){ | ||
var pieces = channel.split('#'); | ||
if (uid == pieces.pop()) return debug('ignore same uid'); | ||
Redis.prototype.onmessage = function(channel, msg){ | ||
var args = msgpack.decode(msg); | ||
var packet; | ||
if (args[0] && args[0].nsp === undefined) { | ||
args[0].nsp = '/'; | ||
if (uid == args.shift()) return debug('ignore same uid'); | ||
packet = args[0]; | ||
if (packet && packet.nsp === undefined) { | ||
packet.nsp = '/'; | ||
} | ||
if (!args[0] || args[0].nsp != this.nsp.name) { | ||
if (!packet || packet.nsp != this.nsp.name) { | ||
return debug('ignore different namespace'); | ||
@@ -125,7 +129,126 @@ } | ||
Adapter.prototype.broadcast.call(this, packet, opts); | ||
if (!remote) pub.publish(key, msgpack.encode([packet, opts])); | ||
if (!remote) { | ||
var chn = prefix + '#' + packet.nsp + '#'; | ||
var msg = msgpack.encode([uid, packet, opts]); | ||
if (opts.rooms) { | ||
opts.rooms.forEach(function(room) { | ||
var chnRoom = chn + room + '#'; | ||
pub.publish(chnRoom, msg); | ||
}); | ||
} else { | ||
pub.publish(chn, msg); | ||
} | ||
} | ||
}; | ||
/** | ||
* Subscribe client to room messages. | ||
* | ||
* @param {String} client id | ||
* @param {String} room | ||
* @param {Function} callback (optional) | ||
* @api public | ||
*/ | ||
Redis.prototype.add = function(id, room, fn){ | ||
debug('adding %s to %s ', id, room); | ||
var self = this; | ||
this.sids[id] = this.sids[id] || {}; | ||
this.sids[id][room] = true; | ||
this.rooms[room] = this.rooms[room] || {}; | ||
this.rooms[room][id] = true; | ||
var channel = prefix + '#' + this.nsp.name + '#' + room + '#'; | ||
sub.subscribe(channel, function(err){ | ||
if (err) { | ||
self.emit('error', err); | ||
if (fn) fn(err); | ||
return; | ||
} | ||
if (fn) fn(null); | ||
}); | ||
}; | ||
/** | ||
* Unsubscribe client from room messages. | ||
* | ||
* @param {String} session id | ||
* @param {String} room id | ||
* @param {Function} callback (optional) | ||
* @api public | ||
*/ | ||
Redis.prototype.del = function(id, room, fn){ | ||
debug('removing %s from %s', id, room); | ||
var self = this; | ||
this.sids[id] = this.sids[id] || {}; | ||
this.rooms[room] = this.rooms[room] || {}; | ||
delete this.sids[id][room]; | ||
delete this.rooms[room][id]; | ||
if (this.rooms.hasOwnProperty(room) && !Object.keys(this.rooms[room]).length) { | ||
delete this.rooms[room]; | ||
var channel = prefix + '#' + this.nsp.name + '#' + room + '#'; | ||
sub.unsubscribe(channel, function(err){ | ||
if (err) { | ||
self.emit('error', err); | ||
if (fn) fn(err); | ||
return; | ||
} | ||
if (fn) fn(null); | ||
}); | ||
} else { | ||
if (fn) process.nextTick(fn.bind(null, null)); | ||
} | ||
}; | ||
/** | ||
* Unsubscribe client completely. | ||
* | ||
* @param {String} client id | ||
* @param {Function} callback (optional) | ||
* @api public | ||
*/ | ||
Redis.prototype.delAll = function(id, fn){ | ||
debug('removing %s from all rooms', id); | ||
var self = this; | ||
var rooms = this.sids[id]; | ||
if (!rooms) return process.nextTick(fn.bind(null, null)); | ||
async.forEach(Object.keys(rooms), function(room, next){ | ||
if (rooms.hasOwnProperty(room)) { | ||
delete self.rooms[room][id]; | ||
} | ||
if (self.rooms.hasOwnProperty(room) && !Object.keys(self.rooms[room]).length) { | ||
delete self.rooms[room]; | ||
var channel = prefix + '#' + self.nsp.name + '#' + room + '#'; | ||
return sub.unsubscribe(channel, function(err){ | ||
if (err) return self.emit('error', err); | ||
next(); | ||
}); | ||
} else { | ||
process.nextTick(next); | ||
} | ||
}, function(err){ | ||
if (err) { | ||
self.emit('error', err); | ||
if (fn) fn(err); | ||
return; | ||
} | ||
delete self.sids[id]; | ||
if (fn) fn(null); | ||
}); | ||
}; | ||
Redis.uid = uid; | ||
Redis.pubClient = pub; | ||
Redis.subClient = sub; | ||
Redis.prefix = prefix; | ||
return Redis; | ||
} |
{ | ||
"name": "socket.io-redis", | ||
"version": "0.1.4", | ||
"version": "0.2.0", | ||
"description": "", | ||
@@ -13,7 +13,8 @@ "repository": { | ||
"dependencies": { | ||
"debug": "0.7.4", | ||
"uid2": "0.0.3", | ||
"redis": "0.10.1", | ||
"async": "0.9.0", | ||
"debug": "2.2.0", | ||
"msgpack-js": "0.3.0", | ||
"socket.io-adapter": "0.3.1" | ||
"redis": "2.3.0", | ||
"socket.io-adapter": "automattic/socket.io-adapter#de5cba", | ||
"uid2": "0.0.3" | ||
}, | ||
@@ -24,5 +25,4 @@ "devDependencies": { | ||
"mocha": "1.18.0", | ||
"expect.js": "0.3.1", | ||
"async": "0.2.10" | ||
"expect.js": "0.3.1" | ||
} | ||
} |
# socket.io-redis | ||
[![Build Status](https://travis-ci.org/Automattic/socket.io-redis.svg?branch=master)](https://travis-ci.org/Automattic/socket.io-redis) | ||
[![Build Status](https://travis-ci.org/socketio/socket.io-redis.svg?branch=master)](https://travis-ci.org/socketio/socket.io-redis) | ||
[![NPM version](https://badge.fury.io/js/socket.io-redis.svg)](http://badge.fury.io/js/socket.io-redis) | ||
@@ -19,3 +19,3 @@ | ||
If you need to emit events to socket.io instances from a non-socket.io | ||
process, you should use [socket.io-emitter](http:///github.com/Automattic/socket.io-emitter). | ||
process, you should use [socket.io-emitter](https:///github.com/Automattic/socket.io-emitter). | ||
@@ -36,4 +36,2 @@ ## API | ||
- `port`: port to connect to redis on (`6379`) | ||
- `socket`: unix domain socket to connect to redis (`"/tmp/redis.sock"`). Will | ||
be used instead of the host and port options if specified. | ||
- `pubClient`: optional, the redis client to publish events on | ||
@@ -49,18 +47,42 @@ - `subClient`: optional, the redis client to subscribe to events on | ||
### RedisAdapter | ||
##### Adapter with password | ||
The redis adapter instances expose the following properties | ||
that a regular `Adapter` does not | ||
If you need to create a redisAdapter to a redis instance that has a password, use pub/sub options. | ||
- `uid` | ||
- `prefix` | ||
- `pubClient` | ||
- `subClient` | ||
Example: | ||
## Client error handling | ||
Access the `pubClient` and `subClient` properties of the | ||
Redis Adapter instance to subscribe to its `error` event: | ||
```js | ||
var redis = require('socket.io-redis'); | ||
var adapter = redis('localhost:6379'); | ||
adapter.pubClient.on('error', function(){}); | ||
adapter.subClient.on('error', function(){}); | ||
``` | ||
var pub = redis.createClient(port, host, {auth_pass:"PASSWORD"}); | ||
var sub = redis.createClient(port, host, {detect_buffers: true, auth_pass:"PASSWORD"} ); | ||
io.adapter( redisAdapter({pubClient: pub, subClient: sub}) ); | ||
## Custom client (eg: with authentication) | ||
If you need to create a redisAdapter to a redis instance | ||
that has a password, use pub/sub options instead of passing | ||
a connection string. | ||
```js | ||
var redis = require('redis').createClient; | ||
var adapter = require('socket.io-redis'); | ||
var pub = redis(port, host, { auth_pass: "pwd" }); | ||
var sub = redis(port, host, { return_buffers: true, auth_pass: "pwd" }); | ||
io.adapter(adapter({ pubClient: pub, subClient: sub })); | ||
``` | ||
Make sure the `return_buffers` option is set to `true` for the sub client. | ||
## License | ||
MIT |
@@ -0,1 +1,2 @@ | ||
var http = require('http').Server; | ||
@@ -5,125 +6,79 @@ var io = require('socket.io'); | ||
var expect = require('expect.js'); | ||
var async = require('async'); | ||
var redis = require('redis'); | ||
var redisAdapter = require('../'); | ||
var redis = require('redis').createClient; | ||
var adapter = require('../'); | ||
describe('socket.io-redis', function(){ | ||
function client(srv, nsp, opts){ | ||
if ('object' == typeof nsp) { | ||
opts = nsp; | ||
nsp = null; | ||
} | ||
var addr = srv.address(); | ||
if (!addr) { | ||
addr = srv.listen().address(); | ||
} | ||
var url = 'ws://' + addr.address + ':' + addr.port + (nsp || ''); | ||
return ioc(url, opts); | ||
} | ||
it('broadcasts', function(done){ | ||
create(function(server1, client1){ | ||
create(function(server2, client2){ | ||
client1.on('woot', function(a, b){ | ||
expect(a).to.eql([]); | ||
expect(b).to.eql({ a: 'b' }); | ||
done(); | ||
}); | ||
server2.on('connection', function(c2){ | ||
c2.broadcast.emit('woot', [], { a: 'b' }); | ||
}); | ||
}); | ||
}); | ||
}); | ||
describe('socket.io-redis', function(){ | ||
describe('broadcast', function(){ | ||
beforeEach(function(done){ | ||
this.redisClients = []; | ||
var self = this; | ||
it('broadcasts to rooms', function(done){ | ||
create(function(server1, client1){ | ||
create(function(server2, client2){ | ||
create(function(server3, client3){ | ||
server1.on('connection', function(c1){ | ||
c1.join('woot'); | ||
}); | ||
async.times(2, function(n, next){ | ||
var pub = redis.createClient(); | ||
var sub = redis.createClient(null, null, {detect_buffers: true}); | ||
var srv = http(); | ||
var sio = io(srv, {adapter: redisAdapter({pubClient: pub, subClient: sub})}); | ||
self.redisClients.push(pub, sub); | ||
server2.on('connection', function(c2){ | ||
// does not join, performs broadcast | ||
c2.on('do broadcast', function(){ | ||
c2.broadcast.to('woot').emit('broadcast'); | ||
}); | ||
}); | ||
srv.listen(function(){ | ||
['/', '/nsp'].forEach(function(name){ | ||
sio.of(name).on('connection', function(socket){ | ||
socket.on('join', function(callback){ | ||
socket.join('room', callback); | ||
}); | ||
server3.on('connection', function(c3){ | ||
// does not join, signals broadcast | ||
client2.emit('do broadcast'); | ||
}); | ||
socket.on('socket broadcast', function(data){ | ||
socket.broadcast.to('room').emit('broadcast', data); | ||
}); | ||
client1.on('broadcast', function(){ | ||
setTimeout(done, 100); | ||
}); | ||
socket.on('namespace broadcast', function(data){ | ||
sio.of('/nsp').in('room').emit('broadcast', data); | ||
}); | ||
}); | ||
client2.on('broadcast', function(){ | ||
throw new Error('Not in room'); | ||
}); | ||
async.parallel([ | ||
function(callback){ | ||
async.times(2, function(n, next){ | ||
var socket = client(srv, '/nsp', {forceNew: true}); | ||
socket.on('connect', function(){ | ||
socket.emit('join', function(){ | ||
next(null, socket); | ||
}); | ||
}); | ||
}, callback); | ||
}, | ||
function(callback){ | ||
// a socket of the same namespace but not joined in the room. | ||
var socket = client(srv, '/nsp', {forceNew: true}); | ||
socket.on('connect', function(){ | ||
socket.on('broadcast', function(){ | ||
throw new Error('Called unexpectedly: different room'); | ||
}); | ||
callback(); | ||
}); | ||
}, | ||
function(callback){ | ||
// a socket joined in a room but for a different namespace. | ||
var socket = client(srv, {forceNew: true}); | ||
socket.on('connect', function(){ | ||
socket.on('broadcast', function(){ | ||
throw new Error('Called unexpectedly: different namespace'); | ||
}); | ||
socket.emit('join', function(){ | ||
callback(); | ||
}); | ||
}); | ||
} | ||
], function(err, results){ | ||
next(err, results[0]); | ||
client3.on('broadcast', function(){ | ||
throw new Error('Not in room'); | ||
}); | ||
}); | ||
}, function(err, sockets){ | ||
self.sockets = sockets.reduce(function(a, b){ return a.concat(b); }); | ||
done(err); | ||
}); | ||
}); | ||
}); | ||
afterEach(function(){ | ||
this.redisClients.forEach(function(client){ | ||
client.quit(); | ||
}); | ||
// create a pair of socket.io server+client | ||
function create(nsp, fn){ | ||
var srv = http(); | ||
var sio = io(srv); | ||
sio.adapter(adapter({ | ||
pubClient: redis(), | ||
subClient: redis(null, null, { return_buffers: true }) | ||
})); | ||
srv.listen(function(err){ | ||
if (err) throw err; // abort tests | ||
if ('function' == typeof nsp) { | ||
fn = nsp; | ||
nsp = ''; | ||
} | ||
nsp = nsp || '/'; | ||
var addr = srv.address(); | ||
var url = 'http://localhost:' + addr.port + nsp; | ||
fn(sio.of(nsp), ioc(url)); | ||
}); | ||
} | ||
it('should broadcast from a socket', function(done){ | ||
async.each(this.sockets.slice(1), function(socket, next){ | ||
socket.on('broadcast', function(message){ | ||
expect(message).to.equal('hi'); | ||
next(); | ||
}); | ||
}, done); | ||
var socket = this.sockets[0]; | ||
socket.on('broadcast', function(){ | ||
throw new Error('Called unexpectedly: same socket'); | ||
}); | ||
socket.emit('socket broadcast', 'hi'); | ||
}); | ||
it('should broadcast from a namespace', function(done){ | ||
async.each(this.sockets, function(socket, next){ | ||
socket.on('broadcast', function(message){ | ||
expect(message).to.equal('hi'); | ||
next(); | ||
}); | ||
}, done); | ||
this.sockets[0].emit('namespace broadcast', 'hi'); | ||
}); | ||
}); | ||
}); |
GitHub dependency
Supply chain riskContains a dependency which resolves to a GitHub URL. Dependencies fetched from GitHub specifiers are not immutable can be used to inject untrusted code or reduce the likelihood of a reproducible install.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Manifest confusion
Supply chain riskThis package has inconsistent metadata. This could be malicious or caused by an error when publishing the package.
Found 1 instance in 1 package
12199
4
278
86
6
1
3
+ Addedasync@0.9.0
+ Addedasync@0.9.0(transitive)
+ Addeddebug@2.2.0(transitive)
+ Addeddouble-ended-queue@2.1.0-0(transitive)
+ Addedms@0.7.1(transitive)
+ Addedredis@2.3.0(transitive)
- Removedbenchmark@1.0.0(transitive)
- Removedcomponent-emitter@1.1.2(transitive)
- Removeddebug@0.7.41.0.2(transitive)
- Removedisarray@0.0.1(transitive)
- Removedjson3@3.2.6(transitive)
- Removedms@0.6.2(transitive)
- Removedobject-keys@1.0.1(transitive)
- Removedredis@0.10.1(transitive)
- Removedsocket.io-adapter@0.3.1(transitive)
- Removedsocket.io-parser@2.2.2(transitive)
Updateddebug@2.2.0
Updatedredis@2.3.0
Updatedsocket.io-adapter@automattic/socket.io-adapter#de5cba