Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

socket.io-redis

Package Overview
Dependencies
Maintainers
1
Versions
25
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

socket.io-redis - npm Package Compare versions

Comparing version 0.1.4 to 0.2.0

14

History.md
0.2.0 / 2015-12-03
==================
* package: bump `debug`
* replace `detect_buffers` with `return_buffers`, update redis
* remove duplicated `#`
* remove redundancy and minor performance optimization
* better instrumentation
* fire `del` callback when unsubscribing
* improve error handling
* expose constructor properties in resulting adapter
* remove `socket` option, as we would need two sockets anyways
* listen for separate channels for namespaces and rooms
0.1.4 / 2014-11-25

@@ -3,0 +17,0 @@ ==================

155

index.js

@@ -12,2 +12,3 @@

var debug = require('debug')('socket.io-redis');
var async = require('async');

@@ -45,3 +46,2 @@ /**

// opts
var socket = opts.socket;
var host = opts.host || '127.0.0.1';

@@ -54,11 +54,7 @@ var port = Number(opts.port || 6379);

// init clients if needed
if (!pub) pub = socket ? redis(socket) : redis(port, host);
if (!sub) sub = socket
? redis(socket, { detect_buffers: true })
: redis(port, host, {detect_buffers: true});
if (!pub) pub = redis(port, host);
if (!sub) sub = redis(port, host, { return_buffers: true });
// this server's key
var uid = uid2(6);
var key = prefix + '#' + uid;

@@ -75,7 +71,12 @@ /**

this.uid = uid;
this.prefix = prefix;
this.pubClient = pub;
this.subClient = sub;
var self = this;
sub.psubscribe(prefix + '#*', function(err){
sub.subscribe(prefix + '#' + nsp.name + '#', function(err){
if (err) self.emit('error', err);
});
sub.on('pmessage', this.onmessage.bind(this));
sub.on('message', this.onmessage.bind(this));
}

@@ -95,12 +96,15 @@

Redis.prototype.onmessage = function(pattern, channel, msg){
var pieces = channel.split('#');
if (uid == pieces.pop()) return debug('ignore same uid');
Redis.prototype.onmessage = function(channel, msg){
var args = msgpack.decode(msg);
var packet;
if (args[0] && args[0].nsp === undefined) {
args[0].nsp = '/';
if (uid == args.shift()) return debug('ignore same uid');
packet = args[0];
if (packet && packet.nsp === undefined) {
packet.nsp = '/';
}
if (!args[0] || args[0].nsp != this.nsp.name) {
if (!packet || packet.nsp != this.nsp.name) {
return debug('ignore different namespace');

@@ -125,7 +129,126 @@ }

Adapter.prototype.broadcast.call(this, packet, opts);
if (!remote) pub.publish(key, msgpack.encode([packet, opts]));
if (!remote) {
var chn = prefix + '#' + packet.nsp + '#';
var msg = msgpack.encode([uid, packet, opts]);
if (opts.rooms) {
opts.rooms.forEach(function(room) {
var chnRoom = chn + room + '#';
pub.publish(chnRoom, msg);
});
} else {
pub.publish(chn, msg);
}
}
};
/**
* Subscribe client to room messages.
*
* @param {String} client id
* @param {String} room
* @param {Function} callback (optional)
* @api public
*/
Redis.prototype.add = function(id, room, fn){
debug('adding %s to %s ', id, room);
var self = this;
this.sids[id] = this.sids[id] || {};
this.sids[id][room] = true;
this.rooms[room] = this.rooms[room] || {};
this.rooms[room][id] = true;
var channel = prefix + '#' + this.nsp.name + '#' + room + '#';
sub.subscribe(channel, function(err){
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}
if (fn) fn(null);
});
};
/**
* Unsubscribe client from room messages.
*
* @param {String} session id
* @param {String} room id
* @param {Function} callback (optional)
* @api public
*/
Redis.prototype.del = function(id, room, fn){
debug('removing %s from %s', id, room);
var self = this;
this.sids[id] = this.sids[id] || {};
this.rooms[room] = this.rooms[room] || {};
delete this.sids[id][room];
delete this.rooms[room][id];
if (this.rooms.hasOwnProperty(room) && !Object.keys(this.rooms[room]).length) {
delete this.rooms[room];
var channel = prefix + '#' + this.nsp.name + '#' + room + '#';
sub.unsubscribe(channel, function(err){
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}
if (fn) fn(null);
});
} else {
if (fn) process.nextTick(fn.bind(null, null));
}
};
/**
* Unsubscribe client completely.
*
* @param {String} client id
* @param {Function} callback (optional)
* @api public
*/
Redis.prototype.delAll = function(id, fn){
debug('removing %s from all rooms', id);
var self = this;
var rooms = this.sids[id];
if (!rooms) return process.nextTick(fn.bind(null, null));
async.forEach(Object.keys(rooms), function(room, next){
if (rooms.hasOwnProperty(room)) {
delete self.rooms[room][id];
}
if (self.rooms.hasOwnProperty(room) && !Object.keys(self.rooms[room]).length) {
delete self.rooms[room];
var channel = prefix + '#' + self.nsp.name + '#' + room + '#';
return sub.unsubscribe(channel, function(err){
if (err) return self.emit('error', err);
next();
});
} else {
process.nextTick(next);
}
}, function(err){
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}
delete self.sids[id];
if (fn) fn(null);
});
};
Redis.uid = uid;
Redis.pubClient = pub;
Redis.subClient = sub;
Redis.prefix = prefix;
return Redis;
}

14

package.json
{
"name": "socket.io-redis",
"version": "0.1.4",
"version": "0.2.0",
"description": "",

@@ -13,7 +13,8 @@ "repository": {

"dependencies": {
"debug": "0.7.4",
"uid2": "0.0.3",
"redis": "0.10.1",
"async": "0.9.0",
"debug": "2.2.0",
"msgpack-js": "0.3.0",
"socket.io-adapter": "0.3.1"
"redis": "2.3.0",
"socket.io-adapter": "automattic/socket.io-adapter#de5cba",
"uid2": "0.0.3"
},

@@ -24,5 +25,4 @@ "devDependencies": {

"mocha": "1.18.0",
"expect.js": "0.3.1",
"async": "0.2.10"
"expect.js": "0.3.1"
}
}
# socket.io-redis
[![Build Status](https://travis-ci.org/Automattic/socket.io-redis.svg?branch=master)](https://travis-ci.org/Automattic/socket.io-redis)
[![Build Status](https://travis-ci.org/socketio/socket.io-redis.svg?branch=master)](https://travis-ci.org/socketio/socket.io-redis)
[![NPM version](https://badge.fury.io/js/socket.io-redis.svg)](http://badge.fury.io/js/socket.io-redis)

@@ -19,3 +19,3 @@

If you need to emit events to socket.io instances from a non-socket.io
process, you should use [socket.io-emitter](http:///github.com/Automattic/socket.io-emitter).
process, you should use [socket.io-emitter](https:///github.com/Automattic/socket.io-emitter).

@@ -36,4 +36,2 @@ ## API

- `port`: port to connect to redis on (`6379`)
- `socket`: unix domain socket to connect to redis (`"/tmp/redis.sock"`). Will
be used instead of the host and port options if specified.
- `pubClient`: optional, the redis client to publish events on

@@ -49,18 +47,42 @@ - `subClient`: optional, the redis client to subscribe to events on

### RedisAdapter
##### Adapter with password
The redis adapter instances expose the following properties
that a regular `Adapter` does not
If you need to create a redisAdapter to a redis instance that has a password, use pub/sub options.
- `uid`
- `prefix`
- `pubClient`
- `subClient`
Example:
## Client error handling
Access the `pubClient` and `subClient` properties of the
Redis Adapter instance to subscribe to its `error` event:
```js
var redis = require('socket.io-redis');
var adapter = redis('localhost:6379');
adapter.pubClient.on('error', function(){});
adapter.subClient.on('error', function(){});
```
var pub = redis.createClient(port, host, {auth_pass:"PASSWORD"});
var sub = redis.createClient(port, host, {detect_buffers: true, auth_pass:"PASSWORD"} );
io.adapter( redisAdapter({pubClient: pub, subClient: sub}) );
## Custom client (eg: with authentication)
If you need to create a redisAdapter to a redis instance
that has a password, use pub/sub options instead of passing
a connection string.
```js
var redis = require('redis').createClient;
var adapter = require('socket.io-redis');
var pub = redis(port, host, { auth_pass: "pwd" });
var sub = redis(port, host, { return_buffers: true, auth_pass: "pwd" });
io.adapter(adapter({ pubClient: pub, subClient: sub }));
```
Make sure the `return_buffers` option is set to `true` for the sub client.
## License
MIT

@@ -0,1 +1,2 @@

var http = require('http').Server;

@@ -5,125 +6,79 @@ var io = require('socket.io');

var expect = require('expect.js');
var async = require('async');
var redis = require('redis');
var redisAdapter = require('../');
var redis = require('redis').createClient;
var adapter = require('../');
describe('socket.io-redis', function(){
function client(srv, nsp, opts){
if ('object' == typeof nsp) {
opts = nsp;
nsp = null;
}
var addr = srv.address();
if (!addr) {
addr = srv.listen().address();
}
var url = 'ws://' + addr.address + ':' + addr.port + (nsp || '');
return ioc(url, opts);
}
it('broadcasts', function(done){
create(function(server1, client1){
create(function(server2, client2){
client1.on('woot', function(a, b){
expect(a).to.eql([]);
expect(b).to.eql({ a: 'b' });
done();
});
server2.on('connection', function(c2){
c2.broadcast.emit('woot', [], { a: 'b' });
});
});
});
});
describe('socket.io-redis', function(){
describe('broadcast', function(){
beforeEach(function(done){
this.redisClients = [];
var self = this;
it('broadcasts to rooms', function(done){
create(function(server1, client1){
create(function(server2, client2){
create(function(server3, client3){
server1.on('connection', function(c1){
c1.join('woot');
});
async.times(2, function(n, next){
var pub = redis.createClient();
var sub = redis.createClient(null, null, {detect_buffers: true});
var srv = http();
var sio = io(srv, {adapter: redisAdapter({pubClient: pub, subClient: sub})});
self.redisClients.push(pub, sub);
server2.on('connection', function(c2){
// does not join, performs broadcast
c2.on('do broadcast', function(){
c2.broadcast.to('woot').emit('broadcast');
});
});
srv.listen(function(){
['/', '/nsp'].forEach(function(name){
sio.of(name).on('connection', function(socket){
socket.on('join', function(callback){
socket.join('room', callback);
});
server3.on('connection', function(c3){
// does not join, signals broadcast
client2.emit('do broadcast');
});
socket.on('socket broadcast', function(data){
socket.broadcast.to('room').emit('broadcast', data);
});
client1.on('broadcast', function(){
setTimeout(done, 100);
});
socket.on('namespace broadcast', function(data){
sio.of('/nsp').in('room').emit('broadcast', data);
});
});
client2.on('broadcast', function(){
throw new Error('Not in room');
});
async.parallel([
function(callback){
async.times(2, function(n, next){
var socket = client(srv, '/nsp', {forceNew: true});
socket.on('connect', function(){
socket.emit('join', function(){
next(null, socket);
});
});
}, callback);
},
function(callback){
// a socket of the same namespace but not joined in the room.
var socket = client(srv, '/nsp', {forceNew: true});
socket.on('connect', function(){
socket.on('broadcast', function(){
throw new Error('Called unexpectedly: different room');
});
callback();
});
},
function(callback){
// a socket joined in a room but for a different namespace.
var socket = client(srv, {forceNew: true});
socket.on('connect', function(){
socket.on('broadcast', function(){
throw new Error('Called unexpectedly: different namespace');
});
socket.emit('join', function(){
callback();
});
});
}
], function(err, results){
next(err, results[0]);
client3.on('broadcast', function(){
throw new Error('Not in room');
});
});
}, function(err, sockets){
self.sockets = sockets.reduce(function(a, b){ return a.concat(b); });
done(err);
});
});
});
afterEach(function(){
this.redisClients.forEach(function(client){
client.quit();
});
// create a pair of socket.io server+client
function create(nsp, fn){
var srv = http();
var sio = io(srv);
sio.adapter(adapter({
pubClient: redis(),
subClient: redis(null, null, { return_buffers: true })
}));
srv.listen(function(err){
if (err) throw err; // abort tests
if ('function' == typeof nsp) {
fn = nsp;
nsp = '';
}
nsp = nsp || '/';
var addr = srv.address();
var url = 'http://localhost:' + addr.port + nsp;
fn(sio.of(nsp), ioc(url));
});
}
it('should broadcast from a socket', function(done){
async.each(this.sockets.slice(1), function(socket, next){
socket.on('broadcast', function(message){
expect(message).to.equal('hi');
next();
});
}, done);
var socket = this.sockets[0];
socket.on('broadcast', function(){
throw new Error('Called unexpectedly: same socket');
});
socket.emit('socket broadcast', 'hi');
});
it('should broadcast from a namespace', function(done){
async.each(this.sockets, function(socket, next){
socket.on('broadcast', function(message){
expect(message).to.equal('hi');
next();
});
}, done);
this.sockets[0].emit('namespace broadcast', 'hi');
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc