primus-cluster
Advanced tools
Comparing version 1.0.2 to 1.1.0
@@ -5,4 +5,3 @@ /** | ||
var socketKey = require('./keys/socket'); | ||
var roomKey = require('./keys/room'); | ||
var _ = require('lodash'); | ||
@@ -20,5 +19,10 @@ /** | ||
function Adapter(options){ | ||
this.ttl = options.ttl; | ||
this.ttl = options.ttl || 86400; | ||
this.publish = options.publish; | ||
this.client = options.client; | ||
// numberOfSets # of sets you want to seperate your keys | ||
this.numberOfSets = options.numberOfSets || 10; | ||
var ttlMs = this.ttl * 1000; | ||
this.interval = ttlMs / this.numberOfSets; | ||
} | ||
@@ -41,9 +45,9 @@ | ||
multi | ||
.sadd(socketKey.format(id), room) | ||
.sadd(roomKey.format(room), id); | ||
.sadd(this._socketKey(id), room) | ||
.sadd(this._roomKey(room), id); | ||
if (this.ttl) { | ||
multi | ||
.expire(socketKey.format(id), this.ttl) | ||
.expire(roomKey.format(room), this.ttl); | ||
.expire(this._socketKey(id), this.ttl) | ||
.expire(this._roomKey(room), this.ttl); | ||
} | ||
@@ -66,11 +70,20 @@ | ||
if (id) return this.client.smembers(socketKey.format(id), cb); | ||
if (id) { | ||
var socketKeys = this._getSocketKeys(id); | ||
this.client.sunion(socketKeys, cb); | ||
return ; | ||
} | ||
this.client.keys('room:*', function (err, rooms) { | ||
if (err) return cb(err); | ||
rooms = rooms.map(function (room) { | ||
return room.slice(5).replace(/\:(\d*)$/g, ''); | ||
}); | ||
cb(undefined, rooms.map(function (room) { | ||
return room.slice(5); | ||
})); | ||
rooms = _.uniq(rooms); | ||
cb(undefined, rooms); | ||
}); | ||
}; | ||
@@ -93,6 +106,11 @@ | ||
this.client.multi() | ||
.srem(socketKey.format(id), room) | ||
.srem(roomKey.format(room), id) | ||
.exec(cb); | ||
var multi = this.client.multi(); | ||
var i = 0; | ||
for( ; i < this.numberOfSets ; i++) { | ||
multi.srem(this._roomKey(room, i), id); | ||
multi.srem(this._socketKey(id, i), room) | ||
} | ||
multi.exec(cb); | ||
}; | ||
@@ -122,2 +140,3 @@ | ||
var ids = {}; | ||
var self = this; | ||
@@ -129,4 +148,6 @@ this.publish(data, 'room', opts); | ||
rooms.forEach(function (room) { | ||
multi.smembers(roomKey.format(room)); | ||
var roomKeys = self._getRoomKeys(room); | ||
multi.sunion(roomKeys); | ||
}); | ||
multi.exec(function (err, replies) { | ||
@@ -147,6 +168,10 @@ if (err) return cb(err); | ||
} else { | ||
// TODO replace this `KEYS` should be only used for debugging | ||
this.client.keys('socket:*', function (err, sockets) { | ||
if (err) return cb(err); | ||
var ids = sockets.map(socketKey.parse); | ||
var ids = sockets.map(function(key) { | ||
return key.replace(/^socket\:/, '').replace(/\:(\d+)$/, ''); | ||
}); | ||
ids = _.uniq(ids); | ||
ids.forEach(function (id) { | ||
@@ -172,3 +197,4 @@ if (except.indexOf(id) !== -1) return; | ||
Adapter.prototype.clients = function clients(room, cb) { | ||
this.client.smembers(roomKey.format(room), cb); | ||
var roomKeys = this._getRoomKeys(room) | ||
this.client.sunion(roomKeys, cb); | ||
}; | ||
@@ -185,3 +211,8 @@ | ||
Adapter.prototype.empty = function empty(room, cb) { | ||
this.client.del(roomKey.format(room), cb); | ||
var multi = this.client.multi(); | ||
var roomKeys = this._getRoomKeys(room); | ||
roomKeys.forEach(function(key) { | ||
multi.del(key); | ||
}); | ||
multi.exec(cb); | ||
}; | ||
@@ -196,6 +227,6 @@ | ||
*/ | ||
Adapter.prototype.isEmpty = function isEmpty(room, cb) { | ||
this.client.exists(roomKey.format(room), function (err, exists) { | ||
cb(err, ! exists); | ||
var roomKeys = this._getRoomKeys(room); | ||
this.client.sunion(roomKeys, function (err, clients) { | ||
cb(err, clients.length === 0); | ||
}); | ||
@@ -213,3 +244,3 @@ }; | ||
cb = cb || noop; | ||
var self = this; | ||
// Get rooms. | ||
@@ -223,7 +254,14 @@ this.get(id, function (err, rooms) { | ||
rooms.forEach(function (room) { | ||
multi.srem(roomKey.format(room), id); | ||
var roomKeys = self._getRoomKeys(room); | ||
roomKeys.forEach(function(key) { | ||
multi.srem(key, id); | ||
}); | ||
}); | ||
var socketKeys = self._getSocketKeys(id); | ||
// Remove id key. | ||
multi.del(socketKey.format(id)); | ||
socketKeys.forEach(function(key) { | ||
multi.del(key); | ||
}); | ||
@@ -234,2 +272,38 @@ multi.exec(cb); | ||
Adapter.prototype._getRoomKeys = function(room) { | ||
var i = 0; | ||
var roomKeys = []; | ||
for ( ; i< this.numberOfSets; i++) { | ||
roomKeys.push(this._roomKey(room, i)); | ||
} | ||
return roomKeys; | ||
}; | ||
Adapter.prototype._roomKey = function(room, offset) { | ||
var time = this._getTimeFraction(offset); | ||
return ['room', room, time].join(':'); | ||
}; | ||
Adapter.prototype._getSocketKeys = function(room) { | ||
var i = 0; | ||
var roomKeys = []; | ||
for ( ; i< this.numberOfSets; i++) { | ||
roomKeys.push(this._socketKey(room, i)); | ||
} | ||
return roomKeys; | ||
}; | ||
Adapter.prototype._socketKey = function(id, offset) { | ||
var time = this._getTimeFraction(offset); | ||
return ['socket', id, time].join(':'); | ||
}; | ||
Adapter.prototype._getTimeFraction = function(offset) { | ||
var now = Date.now(); | ||
offset = offset || 0; | ||
var interval = this.interval; | ||
return Math.floor(now / interval) - offset; | ||
}; | ||
/** | ||
@@ -239,2 +313,2 @@ * Noop function. | ||
function noop() {} | ||
function noop() {} |
{ | ||
"name": "primus-cluster", | ||
"version": "1.0.2", | ||
"version": "1.1.0", | ||
"description": "Scale Primus across multiple servers.", | ||
@@ -5,0 +5,0 @@ "dependencies": { |
@@ -29,9 +29,6 @@ var redis = require('redis'); | ||
if (err) return done(err); | ||
client.multi() | ||
.sismember('socket:12', 'my:room:name') | ||
.sismember('room:my:room:name', '12') | ||
.exec(function (err, replies) { | ||
adapter.get('12', function(err, rooms){ | ||
if (err) return done(err); | ||
expect(replies[0]).to.equal(1); | ||
expect(replies[1]).to.equal(1); | ||
expect(rooms).to.contain('my:room:name'); | ||
done(); | ||
@@ -42,17 +39,34 @@ }); | ||
it('should add a socket into a room with correct expire', function (done) { | ||
adapter.ttl = 30; | ||
it('should expire after given ttl', function (done) { | ||
adapter.ttl = 1; | ||
adapter.add('12', 'my:room:name', function (err) { | ||
if (err) return done(err); | ||
client.multi() | ||
.ttl('socket:12') | ||
.ttl('room:my:room:name') | ||
.exec(function (err, replies) { | ||
if (err) return done(err); | ||
expect(replies[0]).to.be.most(30); | ||
expect(replies[1]).to.be.most(30); | ||
done(); | ||
}); | ||
setTimeout(function(){ | ||
adapter.get('12', function(err, rooms) { | ||
if (err) return done(err); | ||
expect(rooms).to.not.contain('my:room:name'); | ||
done(); | ||
}); | ||
}, 1100); | ||
}); | ||
}); | ||
it('should not expire', function (done) { | ||
adapter.ttl = 100; | ||
adapter.add('12', 'my:room:name', function (err) { | ||
if (err) return done(err); | ||
setTimeout(function(){ | ||
adapter.get('12', function(err, rooms) { | ||
if (err) return done(err); | ||
expect(rooms).to.contain('my:room:name'); | ||
done(); | ||
}); | ||
}, 1100); | ||
}); | ||
}); | ||
}); | ||
@@ -93,16 +107,9 @@ | ||
it('should remove client from a room', function (done) { | ||
it('should remove room from a client', function (done) { | ||
adapter.del('12', 'my:room:name', function (err) { | ||
if (err) return done(err); | ||
client.multi() | ||
.sismember('socket:12', 'my:room:name') | ||
.sismember('room:my:room:name', '12') | ||
.sismember('socket:12', 'my:second:room:name') | ||
.sismember('room:my:second:room:name', '12') | ||
.exec(function (err, replies) { | ||
adapter.get('12', function(err, rooms) { | ||
if (err) return done(err); | ||
expect(replies[0]).to.equal(0); | ||
expect(replies[1]).to.equal(0); | ||
expect(replies[2]).to.equal(1); | ||
expect(replies[3]).to.equal(1); | ||
expect(rooms).to.not.contain('my:room:name'); | ||
done(); | ||
@@ -113,16 +120,20 @@ }); | ||
it('should remove client from all room if called without room', function (done) { | ||
it('should remove client from a room', function(done){ | ||
adapter.del('12', 'my:room:name', function (err) { | ||
if (err) return done(err); | ||
adapter.clients('my:room:name', function(err, client) { | ||
if (err) return done(err); | ||
expect(client).to.not.contain('12'); | ||
done(); | ||
}); | ||
}); | ||
}) | ||
it('should remove all rooms from the client if called without room', function (done) { | ||
adapter.del('12', null, function (err) { | ||
if (err) return done(err); | ||
client.multi() | ||
.sismember('socket:12', 'my:room:name') | ||
.sismember('room:my:room:name', '12') | ||
.sismember('socket:12', 'my:second:room:name') | ||
.sismember('room:my:second:room:name', '12') | ||
.exec(function (err, replies) { | ||
adapter.get('12', function(err, rooms) { | ||
if (err) return done(err); | ||
expect(replies[0]).to.equal(0); | ||
expect(replies[1]).to.equal(0); | ||
expect(replies[2]).to.equal(0); | ||
expect(replies[3]).to.equal(0); | ||
expect(rooms).to.be.eql([]); | ||
done(); | ||
@@ -132,2 +143,20 @@ }); | ||
}); | ||
it('should remove client from all rooms if called without room', function (done) { | ||
adapter.del('12', null, function (err) { | ||
if (err) return done(err); | ||
async.series([ | ||
adapter.clients.bind(adapter, 'my:room:name'), | ||
adapter.clients.bind(adapter, 'my:second:room:name') | ||
], function(err, results) { | ||
results.forEach(function(result){ | ||
expect(result).to.not.contain('12'); | ||
}); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
@@ -269,7 +298,7 @@ | ||
if (err) return done(err); | ||
client.exists('room:my:room:name', function (err, exists) { | ||
adapter.clients('my:room:name', function(err, clients){ | ||
if (err) return done(err); | ||
expect(exists).to.equal(0); | ||
expect(clients).to.have.length(0); | ||
done(); | ||
}); | ||
}) | ||
}); | ||
@@ -303,2 +332,34 @@ }); | ||
}); | ||
describe("#_getTimeFraction", function() { | ||
beforeEach(function() { | ||
this.clock = sinon.useFakeTimers(); | ||
}); | ||
afterEach(function() { | ||
this.clock.restore(); | ||
}); | ||
it('should substract from the current time interval when offset provided', function() { | ||
var current = adapter._getTimeFraction(); | ||
var oneBefore = adapter._getTimeFraction(1); | ||
expect(current - 1).to.be.equal(oneBefore); | ||
}); | ||
it('should change fraction every 10 seconds if ttl is 100 seconds', function() { | ||
adapter = new Adapter({ttl: 100}); | ||
var current = adapter._getTimeFraction(); | ||
this.clock.tick(10 * 1000); | ||
var next = adapter._getTimeFraction(); | ||
expect(current + 1).to.be.equal(next); | ||
}); | ||
it('should change time every 2h and 24min by default', function() { | ||
var current = adapter._getTimeFraction(); | ||
this.clock.tick( (2 * 60 + 24) * 60 * 1000 ); | ||
var next = adapter._getTimeFraction(); | ||
expect(current + 1).to.be.equal(next); | ||
}); | ||
}); | ||
}); |
30028
859
9