primus-cluster
Advanced tools
Comparing version 1.1.0 to 2.0.0
/** | ||
* Module dependencies. | ||
*/ | ||
var _ = require('lodash'); | ||
/** | ||
* Module exports. | ||
*/ | ||
module.exports = Adapter; | ||
exports.Adapter = Adapter; | ||
@@ -17,3 +11,4 @@ /** | ||
function Adapter(options){ | ||
function Adapter(options) { | ||
this.prefix = options.prefix ?? ""; | ||
this.ttl = options.ttl || 86400; | ||
@@ -25,4 +20,3 @@ this.publish = options.publish; | ||
this.numberOfSets = options.numberOfSets || 10; | ||
var ttlMs = this.ttl * 1000; | ||
this.interval = ttlMs / this.numberOfSets; | ||
this.interval = (this.ttl * 1000) / this.numberOfSets; | ||
} | ||
@@ -42,12 +36,10 @@ | ||
var multi = this.client.multi(); | ||
const multi = this.client.multi(); | ||
multi | ||
.sadd(this._socketKey(id), room) | ||
.sadd(this._roomKey(room), id); | ||
multi.sadd(this._socketKey(id), room).sadd(this._roomKey(room), id); | ||
if (this.ttl) { | ||
multi | ||
.expire(this._socketKey(id), this.ttl) | ||
.expire(this._roomKey(room), this.ttl); | ||
.expire(this._socketKey(id), this.ttl) | ||
.expire(this._roomKey(room), this.ttl); | ||
} | ||
@@ -71,19 +63,17 @@ | ||
if (id) { | ||
var socketKeys = this._getSocketKeys(id); | ||
const socketKeys = this._getSocketKeys(id); | ||
this.client.sunion(socketKeys, cb); | ||
return ; | ||
return; | ||
} | ||
this.client.keys('room:*', function (err, rooms) { | ||
this.client.keys(`${this.prefix}room:*`, (err, rooms) => { | ||
if (err) return cb(err); | ||
rooms = rooms.map(function (room) { | ||
return room.slice(5).replace(/\:(\d*)$/g, ''); | ||
}); | ||
rooms = rooms.map((room) => | ||
room.slice(`${this.prefix}room:`.length).replace(/\:(\d*)$/g, "") | ||
); | ||
rooms = _.uniq(rooms); | ||
rooms = Array.from(new Set(rooms)); | ||
cb(undefined, rooms); | ||
}); | ||
}; | ||
@@ -109,5 +99,5 @@ | ||
var i = 0; | ||
for( ; i < this.numberOfSets ; i++) { | ||
for (; i < this.numberOfSets; i++) { | ||
multi.srem(this._roomKey(room, i), id); | ||
multi.srem(this._socketKey(id, i), room) | ||
multi.srem(this._socketKey(id, i), room); | ||
} | ||
@@ -136,24 +126,25 @@ | ||
cb = cb || noop; | ||
var rooms = opts.rooms = opts.rooms || []; | ||
var except = opts.except = opts.except || []; | ||
var method = opts.method = opts.method || 'write'; | ||
var ids = {}; | ||
var self = this; | ||
const rooms = (opts.rooms = opts.rooms || []); | ||
const except = (opts.except = opts.except || []); | ||
const method = (opts.method = opts.method || "write"); | ||
const ids = {}; | ||
this.publish(data, 'room', opts); | ||
this.publish(data, "room", opts); | ||
if (rooms.length) { | ||
var multi = this.client.multi(); | ||
rooms.forEach(function (room) { | ||
var roomKeys = self._getRoomKeys(room); | ||
const multi = this.client.multi(); | ||
rooms.forEach((room) => { | ||
const roomKeys = this._getRoomKeys(room); | ||
multi.sunion(roomKeys); | ||
}); | ||
multi.exec(function (err, replies) { | ||
if (err) return cb(err); | ||
multi.exec((err, replies) => { | ||
if (err) { | ||
return cb(err); | ||
} | ||
replies.forEach(function (roomIds) { | ||
roomIds.forEach(function (id) { | ||
replies.forEach((roomIds) => { | ||
roomIds.forEach((id) => { | ||
if (ids[id] || except.indexOf(id) !== -1) return; | ||
if (! clients[id]) return ; | ||
if (!clients[id]) return; | ||
clients[id][method].apply(clients[id], data); | ||
@@ -168,12 +159,14 @@ ids[id] = true; | ||
// TODO replace this `KEYS` should be only used for debugging | ||
this.client.keys('socket:*', function (err, sockets) { | ||
if (err) return cb(err); | ||
this.client.keys("socket:*", (err, sockets) => { | ||
if (err) { | ||
return cb(err); | ||
} | ||
var ids = sockets.map(function(key) { | ||
return key.replace(/^socket\:/, '').replace(/\:(\d+)$/, ''); | ||
}); | ||
ids = _.uniq(ids); | ||
ids.forEach(function (id) { | ||
let ids = sockets.map((key) => | ||
key.slice(`${this.prefix}socket:`.length).replace(/\:(\d+)$/, "") | ||
); | ||
ids = Array.from(new Set(ids)); | ||
ids.forEach((id) => { | ||
if (except.indexOf(id) !== -1) return; | ||
if (! clients[id]) return ; | ||
if (!clients[id]) return; | ||
clients[id][method].apply(clients[id], data); | ||
@@ -196,3 +189,3 @@ }); | ||
Adapter.prototype.clients = function clients(room, cb) { | ||
var roomKeys = this._getRoomKeys(room) | ||
const roomKeys = this._getRoomKeys(room); | ||
this.client.sunion(roomKeys, cb); | ||
@@ -210,5 +203,5 @@ }; | ||
Adapter.prototype.empty = function empty(room, cb) { | ||
var multi = this.client.multi(); | ||
var roomKeys = this._getRoomKeys(room); | ||
roomKeys.forEach(function(key) { | ||
const multi = this.client.multi(); | ||
const roomKeys = this._getRoomKeys(room); | ||
roomKeys.forEach((key) => { | ||
multi.del(key); | ||
@@ -227,5 +220,10 @@ }); | ||
Adapter.prototype.isEmpty = function isEmpty(room, cb) { | ||
var roomKeys = this._getRoomKeys(room); | ||
this.client.sunion(roomKeys, function (err, clients) { | ||
cb(err, clients.length === 0); | ||
const roomKeys = this._getRoomKeys(room); | ||
this.client.sunion(roomKeys, (err, clients) => { | ||
if (err) { | ||
cb(err); | ||
return; | ||
} | ||
cb(null, clients.length === 0); | ||
}); | ||
@@ -243,13 +241,14 @@ }; | ||
cb = cb || noop; | ||
var self = this; | ||
// Get rooms. | ||
this.get(id, function (err, rooms) { | ||
if (err) return cb(err); | ||
this.get(id, (err, rooms) => { | ||
if (err) { | ||
return cb(err); | ||
} | ||
var multi = this.client.multi(); | ||
const multi = this.client.multi(); | ||
// Remove id from each rooms. | ||
rooms.forEach(function (room) { | ||
var roomKeys = self._getRoomKeys(room); | ||
roomKeys.forEach(function(key) { | ||
rooms.forEach((room) => { | ||
const roomKeys = this._getRoomKeys(room); | ||
roomKeys.forEach((key) => { | ||
multi.srem(key, id); | ||
@@ -259,6 +258,6 @@ }); | ||
var socketKeys = self._getSocketKeys(id); | ||
const socketKeys = this._getSocketKeys(id); | ||
// Remove id key. | ||
socketKeys.forEach(function(key) { | ||
socketKeys.forEach((key) => { | ||
multi.del(key); | ||
@@ -268,9 +267,8 @@ }); | ||
multi.exec(cb); | ||
}.bind(this)); | ||
}); | ||
}; | ||
Adapter.prototype._getRoomKeys = function(room) { | ||
var i = 0; | ||
var roomKeys = []; | ||
for ( ; i< this.numberOfSets; i++) { | ||
Adapter.prototype._getRoomKeys = function (room) { | ||
const roomKeys = []; | ||
for (let i = 0; i < this.numberOfSets; i++) { | ||
roomKeys.push(this._roomKey(room, i)); | ||
@@ -281,11 +279,10 @@ } | ||
Adapter.prototype._roomKey = function(room, offset) { | ||
var time = this._getTimeFraction(offset); | ||
return ['room', room, time].join(':'); | ||
Adapter.prototype._roomKey = function (room, offset) { | ||
const time = this._getTimeFraction(offset); | ||
return `${this.prefix}room:${room}:${time}`; | ||
}; | ||
Adapter.prototype._getSocketKeys = function(room) { | ||
var i = 0; | ||
var roomKeys = []; | ||
for ( ; i< this.numberOfSets; i++) { | ||
Adapter.prototype._getSocketKeys = function (room) { | ||
const roomKeys = []; | ||
for (let i = 0; i < this.numberOfSets; i++) { | ||
roomKeys.push(this._socketKey(room, i)); | ||
@@ -296,12 +293,11 @@ } | ||
Adapter.prototype._socketKey = function(id, offset) { | ||
var time = this._getTimeFraction(offset); | ||
return ['socket', id, time].join(':'); | ||
Adapter.prototype._socketKey = function (id, offset) { | ||
const time = this._getTimeFraction(offset); | ||
return `${this.prefix}socket:${id}:${time}`; | ||
}; | ||
Adapter.prototype._getTimeFraction = function(offset) { | ||
var now = Date.now(); | ||
Adapter.prototype._getTimeFraction = function (offset) { | ||
const now = Date.now(); | ||
offset = offset || 0; | ||
var interval = this.interval; | ||
const interval = this.interval; | ||
return Math.floor(now / interval) - offset; | ||
@@ -308,0 +304,0 @@ }; |
@@ -5,4 +5,3 @@ /** | ||
var _ = require('lodash'); | ||
var Adapter = require('./adapter'); | ||
const { Adapter } = require("./adapter"); | ||
@@ -13,3 +12,3 @@ /** | ||
module.exports = function (primus, options) { | ||
exports.server = function (primus, options) { | ||
return new PrimusCluster(primus, options); | ||
@@ -26,11 +25,15 @@ }; | ||
function PrimusCluster(primus, options) { | ||
options = options || {}; | ||
options.cluster = _.defaults(options.cluster || {}, { | ||
channel: 'primus', | ||
ttl: 86400 | ||
}); | ||
function PrimusCluster( | ||
primus, | ||
{ | ||
cluster: { | ||
prefix = "primus-cluster:", | ||
channel = `${prefix}pubsub`, | ||
ttl = 86400, | ||
redis = {}, | ||
} = {}, | ||
} = {} | ||
) { | ||
this.primus = primus; | ||
this.channel = options.cluster.channel; | ||
this.channel = channel; | ||
this.silent = false; | ||
@@ -41,8 +44,8 @@ | ||
this.initializeClients(options.cluster.redis); | ||
this.initializeAdapter(options.cluster.ttl); | ||
this.initializeClients(redis); | ||
this.initializeAdapter({ ttl, prefix }); | ||
this.wrapPrimusMethods(); | ||
this.initializeMessageDispatcher(); | ||
this.primus.on('close', this.close.bind(this)); | ||
this.primus.on("close", this.close.bind(this)); | ||
} | ||
@@ -54,21 +57,23 @@ | ||
PrimusCluster.prototype.initializeClients = function initializeClients(options) { | ||
options = _.defaults(options || {}, { | ||
host: '127.0.0.1', | ||
port: 6379 | ||
}); | ||
PrimusCluster.prototype.initializeClients = function initializeClients( | ||
options | ||
) { | ||
this.clients = {}; | ||
// Create redis clients. | ||
['pub', 'sub', 'storage'].forEach(function (name) { | ||
var client = createClient(); | ||
["pub", "sub", "storage"].forEach( | ||
function (name) { | ||
var client = createClient(); | ||
// Forward errors to Primus. | ||
client.on('error', function (err) { | ||
this.primus.emit('error', err); | ||
}.bind(this)); | ||
// Forward errors to Primus. | ||
client.on( | ||
"error", | ||
function (err) { | ||
this.primus.emit("error", err); | ||
}.bind(this) | ||
); | ||
this.clients[name] = client; | ||
}.bind(this)); | ||
this.clients[name] = client; | ||
}.bind(this) | ||
); | ||
@@ -82,25 +87,22 @@ /** | ||
function createClient() { | ||
if (_.isFunction(options)) return options(); | ||
if (typeof options === "function") return options(); | ||
try { | ||
return require('redis').createClient(options.port, options.host, _.omit(options, 'port', 'host')); | ||
return require("redis").createClient(options); | ||
} catch (err) { | ||
throw new Error("You must add redis as dependency."); | ||
} | ||
catch(err) { | ||
throw new Error('You must add redis as dependency.'); | ||
} | ||
} | ||
}; | ||
/** | ||
* Initialize the room adapter. | ||
* | ||
* @param {Number} ttl TTL in second | ||
*/ | ||
PrimusCluster.prototype.initializeAdapter = function initializeAdapter(ttl) { | ||
PrimusCluster.prototype.initializeAdapter = function initializeAdapter({ | ||
ttl, | ||
prefix, | ||
}) { | ||
// Create adapter. | ||
var adapter = new Adapter({ | ||
const adapter = new Adapter({ | ||
publish: this.publish.bind(this), | ||
client: this.clients.storage, | ||
ttl: ttl | ||
ttl: ttl, | ||
prefix, | ||
}); | ||
@@ -122,12 +124,12 @@ | ||
PrimusCluster.prototype.wrapPrimusMethods = function wrapPrimusMethods() { | ||
['write', 'send'].forEach(wrapMethod.bind(this)); | ||
function wrapMethod(method) { | ||
if (! this.primus[method]) return ; | ||
this.primus['__original' + method] = this.primus[method]; | ||
this.primus[method] = function () { | ||
this.publish(arguments, 'primus', { method: method }); | ||
this.primus['__original' + method].apply(this.primus, arguments); | ||
}.bind(this); | ||
} | ||
["write", "send"].forEach((method) => { | ||
if (!this.primus[method]) return; | ||
this.primus["__original" + method] = this.primus[method]; | ||
Object.defineProperty(this.primus, method, { | ||
value: (...args) => { | ||
this.publish(args, "primus", { method: method }); | ||
this.primus["__original" + method].apply(this.primus, args); | ||
}, | ||
}); | ||
}); | ||
}; | ||
@@ -139,9 +141,10 @@ | ||
PrimusCluster.prototype.initializeMessageDispatcher = function initializeMessageDispatcher() { | ||
this.clients.sub.subscribe(this.channel); | ||
PrimusCluster.prototype.initializeMessageDispatcher = | ||
function initializeMessageDispatcher() { | ||
this.clients.sub.subscribe(this.channel); | ||
this.clients.sub.on('message', function (channel, message) { | ||
this.dispatchMessage(message); | ||
}.bind(this)); | ||
}; | ||
this.clients.sub.on("message", (channel, message) => { | ||
this.dispatchMessage(message); | ||
}); | ||
}; | ||
@@ -155,16 +158,20 @@ /** | ||
PrimusCluster.prototype.dispatchMessage = function dispatchMessage(msg) { | ||
this.primus.decoder(msg, function (err, msg) { | ||
this.primus.decoder(msg, (err, msg) => { | ||
// Do a "save" emit('error') when we fail to parse a message. We don't | ||
// want to throw here as listening to errors should be optional. | ||
if (err) return this.primus.listeners('error').length && this.primus.emit('error', err); | ||
if (err) { | ||
if (this.primus.listeners("error").length) { | ||
this.primus.emit("error", err); | ||
} | ||
return; | ||
} | ||
// If message have no type, we ignore it. | ||
if (! msg.type) return ; | ||
if (!msg.type) return; | ||
// If we are the emitter, we ignore it. | ||
if (msg.id === this.id) return ; | ||
if (msg.id === this.id) return; | ||
this.callDispatcher(msg); | ||
}.bind(this)); | ||
}); | ||
}; | ||
@@ -183,3 +190,3 @@ | ||
// Call the dispatcher. | ||
this[msg.type + 'MessageDispatcher'](msg); | ||
this[msg.type + "MessageDispatcher"](msg); | ||
@@ -197,7 +204,9 @@ // Exit silent mode. | ||
PrimusCluster.prototype.roomMessageDispatcher = function roomMessageDispatcher(msg) { | ||
_.each(msg.opts.rooms, function (room) { | ||
var rooms = this.primus.room(room).except(msg.opts.except); | ||
rooms[msg.opts.method].apply(rooms, _.toArray(msg.data)); | ||
}, this); | ||
PrimusCluster.prototype.roomMessageDispatcher = function roomMessageDispatcher( | ||
msg | ||
) { | ||
msg.opts.rooms.forEach((room) => { | ||
const rooms = this.primus.room(room).except(msg.opts.except); | ||
rooms[msg.opts.method].apply(rooms, Array.from(msg.data)); | ||
}); | ||
}; | ||
@@ -212,5 +221,9 @@ | ||
PrimusCluster.prototype.primusMessageDispatcher = function primusMessageDispatcher(msg) { | ||
this.primus['__original' + msg.opts.method].apply(this.primus, _.toArray(msg.data)); | ||
}; | ||
PrimusCluster.prototype.primusMessageDispatcher = | ||
function primusMessageDispatcher(msg) { | ||
this.primus["__original" + msg.opts.method].apply( | ||
this.primus, | ||
Array.from(msg.data) | ||
); | ||
}; | ||
@@ -225,23 +238,25 @@ /** | ||
PrimusCluster.prototype.publish = function publish(data, type, opts) { | ||
opts = opts || {}; | ||
PrimusCluster.prototype.publish = function publish(data, type, opts = {}) { | ||
// In silent mode, we do nothing. | ||
if (this.silent) return ; | ||
if (this.silent) return; | ||
var message = { | ||
const message = { | ||
id: this.id, | ||
data: data, | ||
type: type, | ||
opts: opts | ||
opts: opts, | ||
}; | ||
this.primus.encoder(message, function (err, msg) { | ||
this.primus.encoder(message, (err, msg) => { | ||
// Do a "save" emit('error') when we fail to parse a message. We don't | ||
// want to throw here as listening to errors should be optional. | ||
if (err) return this.primus.listeners('error').length && this.primus.emit('error', err); | ||
if (err) { | ||
if (this.primus.listeners("error").length) { | ||
this.primus.emit("error", err); | ||
} | ||
return; | ||
} | ||
this.clients.pub.publish(this.channel, msg); | ||
}.bind(this)); | ||
}); | ||
}; | ||
@@ -255,3 +270,5 @@ | ||
PrimusCluster.prototype.close = function close() { | ||
_.invoke(this.clients, 'quit'); | ||
}; | ||
Object.values(this.clients).forEach((client) => { | ||
client.quit(); | ||
}); | ||
}; |
{ | ||
"name": "primus-cluster", | ||
"version": "1.1.0", | ||
"version": "2.0.0", | ||
"description": "Scale Primus across multiple servers.", | ||
"dependencies": { | ||
"lodash": "~2.4.1" | ||
}, | ||
"main": "./lib/index.js", | ||
"devDependencies": { | ||
"redis": "~0.10.0", | ||
"mocha": "~1.17.0", | ||
"chai": "~1.8.1", | ||
"async": "~0.2.9", | ||
"sinon-chai": "~2.4.0", | ||
"sinon": "~1.7.3", | ||
"primus": "~2.0.2", | ||
"ws": "~0.4.31", | ||
"primus-emitter": "~3.0.0", | ||
"primus-rooms": "~3.0.1" | ||
"async": "^3.2.1", | ||
"conventional-github-releaser": "^3.1.5", | ||
"jest": "^27.1.0", | ||
"standard-version": "^9.1.0", | ||
"primus": "^8.0.5", | ||
"primus-emitter": "^3.1.1", | ||
"primus-rooms": "^3.4.1", | ||
"redis": "^3.1.2", | ||
"ws": "^8.2.2" | ||
}, | ||
"peerDependencies": { | ||
"primus": ">=1.0.0" | ||
"primus": ">=8.0.0" | ||
}, | ||
"repository": { | ||
"type": "git", | ||
"url": "https://github.com/neoziro/primus-cluster.git" | ||
}, | ||
"repository": "github:lemonde/primus-cluster", | ||
"scripts": { | ||
"test": "mocha --recursive" | ||
"test": "jest", | ||
"release": "standard-version && conventional-github-releaser --preset angular" | ||
}, | ||
"main": ".", | ||
"author": "Greg Bergé <berge.greg@gmail.com>", | ||
"license": "MIT" | ||
} | ||
} |
@@ -1,3 +0,5 @@ | ||
# primus-cluster [![Build Status](https://travis-ci.org/neoziro/primus-cluster.png)](https://travis-ci.org/neoziro/primus-cluster) | ||
# primus-cluster | ||
![CI](https://github.com/lemonde/primus-cluster/workflows/CI/badge.svg) | ||
Primus cluster runs Primus accross multiple servers, it use Redis to store data and distribute messages across Primus instances. For more informations you can see [Redis Pub/Sub](http://redis.io/topics/pubsub). | ||
@@ -10,14 +12,13 @@ | ||
## Usage | ||
```js | ||
var http = require('http'); | ||
var Primus = require('primus'); | ||
var PrimusCluster = require('primus-cluster'); | ||
const http = require("http"); | ||
const Primus = require("primus"); | ||
const PrimusCluster = require("primus-cluster"); | ||
const | ||
const server = http.createServer(); | ||
const primus = new Primus(server); | ||
var server = http.createServer(); | ||
var primus = new Primus(server); | ||
primus.use('cluster', PrimusCluster); | ||
primus.plugin("cluster", PrimusCluster); | ||
``` | ||
@@ -39,7 +40,7 @@ | ||
port: 6379, | ||
host: '127.0.0.1', | ||
connect_timeout: 200 | ||
} | ||
} | ||
}) | ||
host: "127.0.0.1", | ||
connect_timeout: 200, | ||
}, | ||
}, | ||
}); | ||
``` | ||
@@ -50,12 +51,12 @@ | ||
```js | ||
var redis = require('redis'); | ||
const redis = require("redis"); | ||
new Primus(server, { | ||
cluster: { | ||
redis: createClient | ||
} | ||
}) | ||
redis: createClient, | ||
}, | ||
}); | ||
function createClient() { | ||
var client = redis.createClient(); | ||
const client = redis.createClient(); | ||
client.select(1); // Choose a custom database. | ||
@@ -66,2 +67,16 @@ return client; | ||
### prefix | ||
Type: `String` | ||
Prefix added to every redis key and default channel, default channel is "primus-cluster:". | ||
```js | ||
new Primus(server, { | ||
cluster: { | ||
prefix: "my-client:", | ||
}, | ||
}); | ||
``` | ||
### channel | ||
@@ -71,3 +86,3 @@ | ||
The name of the channel to use, the default channel is "primus". | ||
The name of the channel to use, the default channel is `${prefix}pubsub`. | ||
@@ -77,5 +92,5 @@ ```js | ||
cluster: { | ||
channel: 'primus' | ||
} | ||
}) | ||
channel: "my-client:awesome-channel", | ||
}, | ||
}); | ||
``` | ||
@@ -92,5 +107,5 @@ | ||
cluster: { | ||
ttl: 86400 | ||
} | ||
}) | ||
ttl: 86400, | ||
}, | ||
}); | ||
``` | ||
@@ -102,7 +117,6 @@ | ||
```js | ||
primus.use('rooms', PrimusRooms); | ||
primus.use('emitter', PrimusEmitter); | ||
primus.use('cluster', PrimusCluster); | ||
primus.plugin("rooms", PrimusRooms); | ||
primus.plugin("emitter", PrimusEmitter); | ||
primus.plugin("cluster", PrimusCluster); | ||
``` | ||
@@ -109,0 +123,0 @@ |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
No contributors or author data
MaintenancePackage does not specify a list of contributors or an author in package.json.
Found 1 instance in 1 package
1
9
0
119
15749
5
462
- Removedlodash@~2.4.1
- Removedlodash@2.4.2(transitive)