Socket
Socket
Sign inDemoInstall

primus-cluster

Package Overview
Dependencies
30
Maintainers
6
Versions
5
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.1.0 to 2.0.0

lib/index.js

166

lib/adapter.js
/**
* Module dependencies.
*/
var _ = require('lodash');
/**
* Module exports.
*/
module.exports = Adapter;
exports.Adapter = Adapter;

@@ -17,3 +11,4 @@ /**

function Adapter(options){
function Adapter(options) {
this.prefix = options.prefix ?? "";
this.ttl = options.ttl || 86400;

@@ -25,4 +20,3 @@ this.publish = options.publish;

this.numberOfSets = options.numberOfSets || 10;
var ttlMs = this.ttl * 1000;
this.interval = ttlMs / this.numberOfSets;
this.interval = (this.ttl * 1000) / this.numberOfSets;
}

@@ -42,12 +36,10 @@

var multi = this.client.multi();
const multi = this.client.multi();
multi
.sadd(this._socketKey(id), room)
.sadd(this._roomKey(room), id);
multi.sadd(this._socketKey(id), room).sadd(this._roomKey(room), id);
if (this.ttl) {
multi
.expire(this._socketKey(id), this.ttl)
.expire(this._roomKey(room), this.ttl);
.expire(this._socketKey(id), this.ttl)
.expire(this._roomKey(room), this.ttl);
}

@@ -71,19 +63,17 @@

if (id) {
var socketKeys = this._getSocketKeys(id);
const socketKeys = this._getSocketKeys(id);
this.client.sunion(socketKeys, cb);
return ;
return;
}
this.client.keys('room:*', function (err, rooms) {
this.client.keys(`${this.prefix}room:*`, (err, rooms) => {
if (err) return cb(err);
rooms = rooms.map(function (room) {
return room.slice(5).replace(/\:(\d*)$/g, '');
});
rooms = rooms.map((room) =>
room.slice(`${this.prefix}room:`.length).replace(/\:(\d*)$/g, "")
);
rooms = _.uniq(rooms);
rooms = Array.from(new Set(rooms));
cb(undefined, rooms);
});
};

@@ -109,5 +99,5 @@

var i = 0;
for( ; i < this.numberOfSets ; i++) {
for (; i < this.numberOfSets; i++) {
multi.srem(this._roomKey(room, i), id);
multi.srem(this._socketKey(id, i), room)
multi.srem(this._socketKey(id, i), room);
}

@@ -136,24 +126,25 @@

cb = cb || noop;
var rooms = opts.rooms = opts.rooms || [];
var except = opts.except = opts.except || [];
var method = opts.method = opts.method || 'write';
var ids = {};
var self = this;
const rooms = (opts.rooms = opts.rooms || []);
const except = (opts.except = opts.except || []);
const method = (opts.method = opts.method || "write");
const ids = {};
this.publish(data, 'room', opts);
this.publish(data, "room", opts);
if (rooms.length) {
var multi = this.client.multi();
rooms.forEach(function (room) {
var roomKeys = self._getRoomKeys(room);
const multi = this.client.multi();
rooms.forEach((room) => {
const roomKeys = this._getRoomKeys(room);
multi.sunion(roomKeys);
});
multi.exec(function (err, replies) {
if (err) return cb(err);
multi.exec((err, replies) => {
if (err) {
return cb(err);
}
replies.forEach(function (roomIds) {
roomIds.forEach(function (id) {
replies.forEach((roomIds) => {
roomIds.forEach((id) => {
if (ids[id] || except.indexOf(id) !== -1) return;
if (! clients[id]) return ;
if (!clients[id]) return;
clients[id][method].apply(clients[id], data);

@@ -168,12 +159,14 @@ ids[id] = true;

// TODO replace this `KEYS` should be only used for debugging
this.client.keys('socket:*', function (err, sockets) {
if (err) return cb(err);
this.client.keys("socket:*", (err, sockets) => {
if (err) {
return cb(err);
}
var ids = sockets.map(function(key) {
return key.replace(/^socket\:/, '').replace(/\:(\d+)$/, '');
});
ids = _.uniq(ids);
ids.forEach(function (id) {
let ids = sockets.map((key) =>
key.slice(`${this.prefix}socket:`.length).replace(/\:(\d+)$/, "")
);
ids = Array.from(new Set(ids));
ids.forEach((id) => {
if (except.indexOf(id) !== -1) return;
if (! clients[id]) return ;
if (!clients[id]) return;
clients[id][method].apply(clients[id], data);

@@ -196,3 +189,3 @@ });

Adapter.prototype.clients = function clients(room, cb) {
var roomKeys = this._getRoomKeys(room)
const roomKeys = this._getRoomKeys(room);
this.client.sunion(roomKeys, cb);

@@ -210,5 +203,5 @@ };

Adapter.prototype.empty = function empty(room, cb) {
var multi = this.client.multi();
var roomKeys = this._getRoomKeys(room);
roomKeys.forEach(function(key) {
const multi = this.client.multi();
const roomKeys = this._getRoomKeys(room);
roomKeys.forEach((key) => {
multi.del(key);

@@ -227,5 +220,10 @@ });

Adapter.prototype.isEmpty = function isEmpty(room, cb) {
var roomKeys = this._getRoomKeys(room);
this.client.sunion(roomKeys, function (err, clients) {
cb(err, clients.length === 0);
const roomKeys = this._getRoomKeys(room);
this.client.sunion(roomKeys, (err, clients) => {
if (err) {
cb(err);
return;
}
cb(null, clients.length === 0);
});

@@ -243,13 +241,14 @@ };

cb = cb || noop;
var self = this;
// Get rooms.
this.get(id, function (err, rooms) {
if (err) return cb(err);
this.get(id, (err, rooms) => {
if (err) {
return cb(err);
}
var multi = this.client.multi();
const multi = this.client.multi();
// Remove id from each rooms.
rooms.forEach(function (room) {
var roomKeys = self._getRoomKeys(room);
roomKeys.forEach(function(key) {
rooms.forEach((room) => {
const roomKeys = this._getRoomKeys(room);
roomKeys.forEach((key) => {
multi.srem(key, id);

@@ -259,6 +258,6 @@ });

var socketKeys = self._getSocketKeys(id);
const socketKeys = this._getSocketKeys(id);
// Remove id key.
socketKeys.forEach(function(key) {
socketKeys.forEach((key) => {
multi.del(key);

@@ -268,9 +267,8 @@ });

multi.exec(cb);
}.bind(this));
});
};
Adapter.prototype._getRoomKeys = function(room) {
var i = 0;
var roomKeys = [];
for ( ; i< this.numberOfSets; i++) {
Adapter.prototype._getRoomKeys = function (room) {
const roomKeys = [];
for (let i = 0; i < this.numberOfSets; i++) {
roomKeys.push(this._roomKey(room, i));

@@ -281,11 +279,10 @@ }

Adapter.prototype._roomKey = function(room, offset) {
var time = this._getTimeFraction(offset);
return ['room', room, time].join(':');
Adapter.prototype._roomKey = function (room, offset) {
const time = this._getTimeFraction(offset);
return `${this.prefix}room:${room}:${time}`;
};
Adapter.prototype._getSocketKeys = function(room) {
var i = 0;
var roomKeys = [];
for ( ; i< this.numberOfSets; i++) {
Adapter.prototype._getSocketKeys = function (room) {
const roomKeys = [];
for (let i = 0; i < this.numberOfSets; i++) {
roomKeys.push(this._socketKey(room, i));

@@ -296,12 +293,11 @@ }

Adapter.prototype._socketKey = function(id, offset) {
var time = this._getTimeFraction(offset);
return ['socket', id, time].join(':');
Adapter.prototype._socketKey = function (id, offset) {
const time = this._getTimeFraction(offset);
return `${this.prefix}socket:${id}:${time}`;
};
Adapter.prototype._getTimeFraction = function(offset) {
var now = Date.now();
Adapter.prototype._getTimeFraction = function (offset) {
const now = Date.now();
offset = offset || 0;
var interval = this.interval;
const interval = this.interval;
return Math.floor(now / interval) - offset;

@@ -308,0 +304,0 @@ };

@@ -5,4 +5,3 @@ /**

var _ = require('lodash');
var Adapter = require('./adapter');
const { Adapter } = require("./adapter");

@@ -13,3 +12,3 @@ /**

module.exports = function (primus, options) {
exports.server = function (primus, options) {
return new PrimusCluster(primus, options);

@@ -26,11 +25,15 @@ };

function PrimusCluster(primus, options) {
options = options || {};
options.cluster = _.defaults(options.cluster || {}, {
channel: 'primus',
ttl: 86400
});
function PrimusCluster(
primus,
{
cluster: {
prefix = "primus-cluster:",
channel = `${prefix}pubsub`,
ttl = 86400,
redis = {},
} = {},
} = {}
) {
this.primus = primus;
this.channel = options.cluster.channel;
this.channel = channel;
this.silent = false;

@@ -41,8 +44,8 @@

this.initializeClients(options.cluster.redis);
this.initializeAdapter(options.cluster.ttl);
this.initializeClients(redis);
this.initializeAdapter({ ttl, prefix });
this.wrapPrimusMethods();
this.initializeMessageDispatcher();
this.primus.on('close', this.close.bind(this));
this.primus.on("close", this.close.bind(this));
}

@@ -54,21 +57,23 @@

PrimusCluster.prototype.initializeClients = function initializeClients(options) {
options = _.defaults(options || {}, {
host: '127.0.0.1',
port: 6379
});
PrimusCluster.prototype.initializeClients = function initializeClients(
options
) {
this.clients = {};
// Create redis clients.
['pub', 'sub', 'storage'].forEach(function (name) {
var client = createClient();
["pub", "sub", "storage"].forEach(
function (name) {
var client = createClient();
// Forward errors to Primus.
client.on('error', function (err) {
this.primus.emit('error', err);
}.bind(this));
// Forward errors to Primus.
client.on(
"error",
function (err) {
this.primus.emit("error", err);
}.bind(this)
);
this.clients[name] = client;
}.bind(this));
this.clients[name] = client;
}.bind(this)
);

@@ -82,25 +87,22 @@ /**

function createClient() {
if (_.isFunction(options)) return options();
if (typeof options === "function") return options();
try {
return require('redis').createClient(options.port, options.host, _.omit(options, 'port', 'host'));
return require("redis").createClient(options);
} catch (err) {
throw new Error("You must add redis as dependency.");
}
catch(err) {
throw new Error('You must add redis as dependency.');
}
}
};
/**
* Initialize the room adapter.
*
* @param {Number} ttl TTL in second
*/
PrimusCluster.prototype.initializeAdapter = function initializeAdapter(ttl) {
PrimusCluster.prototype.initializeAdapter = function initializeAdapter({
ttl,
prefix,
}) {
// Create adapter.
var adapter = new Adapter({
const adapter = new Adapter({
publish: this.publish.bind(this),
client: this.clients.storage,
ttl: ttl
ttl: ttl,
prefix,
});

@@ -122,12 +124,12 @@

PrimusCluster.prototype.wrapPrimusMethods = function wrapPrimusMethods() {
['write', 'send'].forEach(wrapMethod.bind(this));
function wrapMethod(method) {
if (! this.primus[method]) return ;
this.primus['__original' + method] = this.primus[method];
this.primus[method] = function () {
this.publish(arguments, 'primus', { method: method });
this.primus['__original' + method].apply(this.primus, arguments);
}.bind(this);
}
["write", "send"].forEach((method) => {
if (!this.primus[method]) return;
this.primus["__original" + method] = this.primus[method];
Object.defineProperty(this.primus, method, {
value: (...args) => {
this.publish(args, "primus", { method: method });
this.primus["__original" + method].apply(this.primus, args);
},
});
});
};

@@ -139,9 +141,10 @@

PrimusCluster.prototype.initializeMessageDispatcher = function initializeMessageDispatcher() {
this.clients.sub.subscribe(this.channel);
PrimusCluster.prototype.initializeMessageDispatcher =
function initializeMessageDispatcher() {
this.clients.sub.subscribe(this.channel);
this.clients.sub.on('message', function (channel, message) {
this.dispatchMessage(message);
}.bind(this));
};
this.clients.sub.on("message", (channel, message) => {
this.dispatchMessage(message);
});
};

@@ -155,16 +158,20 @@ /**

PrimusCluster.prototype.dispatchMessage = function dispatchMessage(msg) {
this.primus.decoder(msg, function (err, msg) {
this.primus.decoder(msg, (err, msg) => {
// Do a "save" emit('error') when we fail to parse a message. We don't
// want to throw here as listening to errors should be optional.
if (err) return this.primus.listeners('error').length && this.primus.emit('error', err);
if (err) {
if (this.primus.listeners("error").length) {
this.primus.emit("error", err);
}
return;
}
// If message have no type, we ignore it.
if (! msg.type) return ;
if (!msg.type) return;
// If we are the emitter, we ignore it.
if (msg.id === this.id) return ;
if (msg.id === this.id) return;
this.callDispatcher(msg);
}.bind(this));
});
};

@@ -183,3 +190,3 @@

// Call the dispatcher.
this[msg.type + 'MessageDispatcher'](msg);
this[msg.type + "MessageDispatcher"](msg);

@@ -197,7 +204,9 @@ // Exit silent mode.

PrimusCluster.prototype.roomMessageDispatcher = function roomMessageDispatcher(msg) {
_.each(msg.opts.rooms, function (room) {
var rooms = this.primus.room(room).except(msg.opts.except);
rooms[msg.opts.method].apply(rooms, _.toArray(msg.data));
}, this);
PrimusCluster.prototype.roomMessageDispatcher = function roomMessageDispatcher(
msg
) {
msg.opts.rooms.forEach((room) => {
const rooms = this.primus.room(room).except(msg.opts.except);
rooms[msg.opts.method].apply(rooms, Array.from(msg.data));
});
};

@@ -212,5 +221,9 @@

PrimusCluster.prototype.primusMessageDispatcher = function primusMessageDispatcher(msg) {
this.primus['__original' + msg.opts.method].apply(this.primus, _.toArray(msg.data));
};
PrimusCluster.prototype.primusMessageDispatcher =
function primusMessageDispatcher(msg) {
this.primus["__original" + msg.opts.method].apply(
this.primus,
Array.from(msg.data)
);
};

@@ -225,23 +238,25 @@ /**

PrimusCluster.prototype.publish = function publish(data, type, opts) {
opts = opts || {};
PrimusCluster.prototype.publish = function publish(data, type, opts = {}) {
// In silent mode, we do nothing.
if (this.silent) return ;
if (this.silent) return;
var message = {
const message = {
id: this.id,
data: data,
type: type,
opts: opts
opts: opts,
};
this.primus.encoder(message, function (err, msg) {
this.primus.encoder(message, (err, msg) => {
// Do a "save" emit('error') when we fail to parse a message. We don't
// want to throw here as listening to errors should be optional.
if (err) return this.primus.listeners('error').length && this.primus.emit('error', err);
if (err) {
if (this.primus.listeners("error").length) {
this.primus.emit("error", err);
}
return;
}
this.clients.pub.publish(this.channel, msg);
}.bind(this));
});
};

@@ -255,3 +270,5 @@

PrimusCluster.prototype.close = function close() {
_.invoke(this.clients, 'quit');
};
Object.values(this.clients).forEach((client) => {
client.quit();
});
};
{
"name": "primus-cluster",
"version": "1.1.0",
"version": "2.0.0",
"description": "Scale Primus across multiple servers.",
"dependencies": {
"lodash": "~2.4.1"
},
"main": "./lib/index.js",
"devDependencies": {
"redis": "~0.10.0",
"mocha": "~1.17.0",
"chai": "~1.8.1",
"async": "~0.2.9",
"sinon-chai": "~2.4.0",
"sinon": "~1.7.3",
"primus": "~2.0.2",
"ws": "~0.4.31",
"primus-emitter": "~3.0.0",
"primus-rooms": "~3.0.1"
"async": "^3.2.1",
"conventional-github-releaser": "^3.1.5",
"jest": "^27.1.0",
"standard-version": "^9.1.0",
"primus": "^8.0.5",
"primus-emitter": "^3.1.1",
"primus-rooms": "^3.4.1",
"redis": "^3.1.2",
"ws": "^8.2.2"
},
"peerDependencies": {
"primus": ">=1.0.0"
"primus": ">=8.0.0"
},
"repository": {
"type": "git",
"url": "https://github.com/neoziro/primus-cluster.git"
},
"repository": "github:lemonde/primus-cluster",
"scripts": {
"test": "mocha --recursive"
"test": "jest",
"release": "standard-version && conventional-github-releaser --preset angular"
},
"main": ".",
"author": "Greg Bergé <berge.greg@gmail.com>",
"license": "MIT"
}
}

@@ -1,3 +0,5 @@

# primus-cluster [![Build Status](https://travis-ci.org/neoziro/primus-cluster.png)](https://travis-ci.org/neoziro/primus-cluster)
# primus-cluster
![CI](https://github.com/lemonde/primus-cluster/workflows/CI/badge.svg)
Primus cluster runs Primus accross multiple servers, it use Redis to store data and distribute messages across Primus instances. For more informations you can see [Redis Pub/Sub](http://redis.io/topics/pubsub).

@@ -10,14 +12,13 @@

## Usage
```js
var http = require('http');
var Primus = require('primus');
var PrimusCluster = require('primus-cluster');
const http = require("http");
const Primus = require("primus");
const PrimusCluster = require("primus-cluster");
const
const server = http.createServer();
const primus = new Primus(server);
var server = http.createServer();
var primus = new Primus(server);
primus.use('cluster', PrimusCluster);
primus.plugin("cluster", PrimusCluster);
```

@@ -39,7 +40,7 @@

port: 6379,
host: '127.0.0.1',
connect_timeout: 200
}
}
})
host: "127.0.0.1",
connect_timeout: 200,
},
},
});
```

@@ -50,12 +51,12 @@

```js
var redis = require('redis');
const redis = require("redis");
new Primus(server, {
cluster: {
redis: createClient
}
})
redis: createClient,
},
});
function createClient() {
var client = redis.createClient();
const client = redis.createClient();
client.select(1); // Choose a custom database.

@@ -66,2 +67,16 @@ return client;

### prefix
Type: `String`
Prefix added to every redis key and default channel, default channel is "primus-cluster:".
```js
new Primus(server, {
cluster: {
prefix: "my-client:",
},
});
```
### channel

@@ -71,3 +86,3 @@

The name of the channel to use, the default channel is "primus".
The name of the channel to use, the default channel is `${prefix}pubsub`.

@@ -77,5 +92,5 @@ ```js

cluster: {
channel: 'primus'
}
})
channel: "my-client:awesome-channel",
},
});
```

@@ -92,5 +107,5 @@

cluster: {
ttl: 86400
}
})
ttl: 86400,
},
});
```

@@ -102,7 +117,6 @@

```js
primus.use('rooms', PrimusRooms);
primus.use('emitter', PrimusEmitter);
primus.use('cluster', PrimusCluster);
primus.plugin("rooms", PrimusRooms);
primus.plugin("emitter", PrimusEmitter);
primus.plugin("cluster", PrimusCluster);
```

@@ -109,0 +123,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc