Socket
Socket
Sign inDemoInstall

primus-cluster

Package Overview
Dependencies
31
Maintainers
1
Versions
5
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.0.2 to 1.1.0

126

lib/adapter.js

@@ -5,4 +5,3 @@ /**

var socketKey = require('./keys/socket');
var roomKey = require('./keys/room');
var _ = require('lodash');

@@ -20,5 +19,10 @@ /**

function Adapter(options){
this.ttl = options.ttl;
this.ttl = options.ttl || 86400;
this.publish = options.publish;
this.client = options.client;
// numberOfSets # of sets you want to seperate your keys
this.numberOfSets = options.numberOfSets || 10;
var ttlMs = this.ttl * 1000;
this.interval = ttlMs / this.numberOfSets;
}

@@ -41,9 +45,9 @@

multi
.sadd(socketKey.format(id), room)
.sadd(roomKey.format(room), id);
.sadd(this._socketKey(id), room)
.sadd(this._roomKey(room), id);
if (this.ttl) {
multi
.expire(socketKey.format(id), this.ttl)
.expire(roomKey.format(room), this.ttl);
.expire(this._socketKey(id), this.ttl)
.expire(this._roomKey(room), this.ttl);
}

@@ -66,11 +70,20 @@

if (id) return this.client.smembers(socketKey.format(id), cb);
if (id) {
var socketKeys = this._getSocketKeys(id);
this.client.sunion(socketKeys, cb);
return ;
}
this.client.keys('room:*', function (err, rooms) {
if (err) return cb(err);
rooms = rooms.map(function (room) {
return room.slice(5).replace(/\:(\d*)$/g, '');
});
cb(undefined, rooms.map(function (room) {
return room.slice(5);
}));
rooms = _.uniq(rooms);
cb(undefined, rooms);
});
};

@@ -93,6 +106,11 @@

this.client.multi()
.srem(socketKey.format(id), room)
.srem(roomKey.format(room), id)
.exec(cb);
var multi = this.client.multi();
var i = 0;
for( ; i < this.numberOfSets ; i++) {
multi.srem(this._roomKey(room, i), id);
multi.srem(this._socketKey(id, i), room)
}
multi.exec(cb);
};

@@ -122,2 +140,3 @@

var ids = {};
var self = this;

@@ -129,4 +148,6 @@ this.publish(data, 'room', opts);

rooms.forEach(function (room) {
multi.smembers(roomKey.format(room));
var roomKeys = self._getRoomKeys(room);
multi.sunion(roomKeys);
});
multi.exec(function (err, replies) {

@@ -147,6 +168,10 @@ if (err) return cb(err);

} else {
// TODO replace this `KEYS` should be only used for debugging
this.client.keys('socket:*', function (err, sockets) {
if (err) return cb(err);
var ids = sockets.map(socketKey.parse);
var ids = sockets.map(function(key) {
return key.replace(/^socket\:/, '').replace(/\:(\d+)$/, '');
});
ids = _.uniq(ids);
ids.forEach(function (id) {

@@ -172,3 +197,4 @@ if (except.indexOf(id) !== -1) return;

Adapter.prototype.clients = function clients(room, cb) {
this.client.smembers(roomKey.format(room), cb);
var roomKeys = this._getRoomKeys(room)
this.client.sunion(roomKeys, cb);
};

@@ -185,3 +211,8 @@

Adapter.prototype.empty = function empty(room, cb) {
this.client.del(roomKey.format(room), cb);
var multi = this.client.multi();
var roomKeys = this._getRoomKeys(room);
roomKeys.forEach(function(key) {
multi.del(key);
});
multi.exec(cb);
};

@@ -196,6 +227,6 @@

*/
Adapter.prototype.isEmpty = function isEmpty(room, cb) {
this.client.exists(roomKey.format(room), function (err, exists) {
cb(err, ! exists);
var roomKeys = this._getRoomKeys(room);
this.client.sunion(roomKeys, function (err, clients) {
cb(err, clients.length === 0);
});

@@ -213,3 +244,3 @@ };

cb = cb || noop;
var self = this;
// Get rooms.

@@ -223,7 +254,14 @@ this.get(id, function (err, rooms) {

rooms.forEach(function (room) {
multi.srem(roomKey.format(room), id);
var roomKeys = self._getRoomKeys(room);
roomKeys.forEach(function(key) {
multi.srem(key, id);
});
});
var socketKeys = self._getSocketKeys(id);
// Remove id key.
multi.del(socketKey.format(id));
socketKeys.forEach(function(key) {
multi.del(key);
});

@@ -234,2 +272,38 @@ multi.exec(cb);

Adapter.prototype._getRoomKeys = function(room) {
var i = 0;
var roomKeys = [];
for ( ; i< this.numberOfSets; i++) {
roomKeys.push(this._roomKey(room, i));
}
return roomKeys;
};
Adapter.prototype._roomKey = function(room, offset) {
var time = this._getTimeFraction(offset);
return ['room', room, time].join(':');
};
Adapter.prototype._getSocketKeys = function(room) {
var i = 0;
var roomKeys = [];
for ( ; i< this.numberOfSets; i++) {
roomKeys.push(this._socketKey(room, i));
}
return roomKeys;
};
Adapter.prototype._socketKey = function(id, offset) {
var time = this._getTimeFraction(offset);
return ['socket', id, time].join(':');
};
Adapter.prototype._getTimeFraction = function(offset) {
var now = Date.now();
offset = offset || 0;
var interval = this.interval;
return Math.floor(now / interval) - offset;
};
/**

@@ -239,2 +313,2 @@ * Noop function.

function noop() {}
function noop() {}
{
"name": "primus-cluster",
"version": "1.0.2",
"version": "1.1.0",
"description": "Scale Primus across multiple servers.",

@@ -5,0 +5,0 @@ "dependencies": {

@@ -29,9 +29,6 @@ var redis = require('redis');

if (err) return done(err);
client.multi()
.sismember('socket:12', 'my:room:name')
.sismember('room:my:room:name', '12')
.exec(function (err, replies) {
adapter.get('12', function(err, rooms){
if (err) return done(err);
expect(replies[0]).to.equal(1);
expect(replies[1]).to.equal(1);
expect(rooms).to.contain('my:room:name');
done();

@@ -42,17 +39,34 @@ });

it('should add a socket into a room with correct expire', function (done) {
adapter.ttl = 30;
it('should expire after given ttl', function (done) {
adapter.ttl = 1;
adapter.add('12', 'my:room:name', function (err) {
if (err) return done(err);
client.multi()
.ttl('socket:12')
.ttl('room:my:room:name')
.exec(function (err, replies) {
if (err) return done(err);
expect(replies[0]).to.be.most(30);
expect(replies[1]).to.be.most(30);
done();
});
setTimeout(function(){
adapter.get('12', function(err, rooms) {
if (err) return done(err);
expect(rooms).to.not.contain('my:room:name');
done();
});
}, 1100);
});
});
it('should not expire', function (done) {
adapter.ttl = 100;
adapter.add('12', 'my:room:name', function (err) {
if (err) return done(err);
setTimeout(function(){
adapter.get('12', function(err, rooms) {
if (err) return done(err);
expect(rooms).to.contain('my:room:name');
done();
});
}, 1100);
});
});
});

@@ -93,16 +107,9 @@

it('should remove client from a room', function (done) {
it('should remove room from a client', function (done) {
adapter.del('12', 'my:room:name', function (err) {
if (err) return done(err);
client.multi()
.sismember('socket:12', 'my:room:name')
.sismember('room:my:room:name', '12')
.sismember('socket:12', 'my:second:room:name')
.sismember('room:my:second:room:name', '12')
.exec(function (err, replies) {
adapter.get('12', function(err, rooms) {
if (err) return done(err);
expect(replies[0]).to.equal(0);
expect(replies[1]).to.equal(0);
expect(replies[2]).to.equal(1);
expect(replies[3]).to.equal(1);
expect(rooms).to.not.contain('my:room:name');
done();

@@ -113,16 +120,20 @@ });

it('should remove client from all room if called without room', function (done) {
it('should remove client from a room', function(done){
adapter.del('12', 'my:room:name', function (err) {
if (err) return done(err);
adapter.clients('my:room:name', function(err, client) {
if (err) return done(err);
expect(client).to.not.contain('12');
done();
});
});
})
it('should remove all rooms from the client if called without room', function (done) {
adapter.del('12', null, function (err) {
if (err) return done(err);
client.multi()
.sismember('socket:12', 'my:room:name')
.sismember('room:my:room:name', '12')
.sismember('socket:12', 'my:second:room:name')
.sismember('room:my:second:room:name', '12')
.exec(function (err, replies) {
adapter.get('12', function(err, rooms) {
if (err) return done(err);
expect(replies[0]).to.equal(0);
expect(replies[1]).to.equal(0);
expect(replies[2]).to.equal(0);
expect(replies[3]).to.equal(0);
expect(rooms).to.be.eql([]);
done();

@@ -132,2 +143,20 @@ });

});
it('should remove client from all rooms if called without room', function (done) {
adapter.del('12', null, function (err) {
if (err) return done(err);
async.series([
adapter.clients.bind(adapter, 'my:room:name'),
adapter.clients.bind(adapter, 'my:second:room:name')
], function(err, results) {
results.forEach(function(result){
expect(result).to.not.contain('12');
});
done();
});
});
});
});

@@ -269,7 +298,7 @@

if (err) return done(err);
client.exists('room:my:room:name', function (err, exists) {
adapter.clients('my:room:name', function(err, clients){
if (err) return done(err);
expect(exists).to.equal(0);
expect(clients).to.have.length(0);
done();
});
})
});

@@ -303,2 +332,34 @@ });

});
describe("#_getTimeFraction", function() {
beforeEach(function() {
this.clock = sinon.useFakeTimers();
});
afterEach(function() {
this.clock.restore();
});
it('should substract from the current time interval when offset provided', function() {
var current = adapter._getTimeFraction();
var oneBefore = adapter._getTimeFraction(1);
expect(current - 1).to.be.equal(oneBefore);
});
it('should change fraction every 10 seconds if ttl is 100 seconds', function() {
adapter = new Adapter({ttl: 100});
var current = adapter._getTimeFraction();
this.clock.tick(10 * 1000);
var next = adapter._getTimeFraction();
expect(current + 1).to.be.equal(next);
});
it('should change time every 2h and 24min by default', function() {
var current = adapter._getTimeFraction();
this.clock.tick( (2 * 60 + 24) * 60 * 1000 );
var next = adapter._getTimeFraction();
expect(current + 1).to.be.equal(next);
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc