New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

emitter-pubsub-broker

Package Overview
Dependencies
Maintainers
1
Versions
9
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

emitter-pubsub-broker - npm Package Compare versions

Comparing version 0.3.0 to 0.4.0

15

CHANGELOG.md

@@ -5,2 +5,17 @@ # Change Log

<a name="0.4.0"></a>
# [0.4.0](https://github.com/an-sh/emitter-pubsub-broker/compare/v0.3.0...v0.4.0) (2016-11-29)
### Chores
* use es2015-node4 preset ([5e05222](https://github.com/an-sh/emitter-pubsub-broker/commit/5e05222))
### BREAKING CHANGES
* Possible node 4.x regression due to the preset change.
<a name="0.3.0"></a>

@@ -7,0 +22,0 @@ # [0.3.0](https://github.com/an-sh/emitter-pubsub-broker/compare/v0.2.1...v0.3.0) (2016-10-21)

559

lib/EmitterPubsubBroker.js
'use strict';
var _toConsumableArray2 = require('babel-runtime/helpers/toConsumableArray');
function _toConsumableArray(arr) { if (Array.isArray(arr)) { for (var i = 0, arr2 = Array(arr.length); i < arr.length; i++) arr2[i] = arr[i]; return arr2; } else { return Array.from(arr); } }
var _toConsumableArray3 = _interopRequireDefault(_toConsumableArray2);
const Promise = require('bluebird');
const Redis = require('ioredis');
const msgpack = require('msgpack-lite');
var _getIterator2 = require('babel-runtime/core-js/get-iterator');
var _getIterator3 = _interopRequireDefault(_getIterator2);
var _set = require('babel-runtime/core-js/set');
var _set2 = _interopRequireDefault(_set);
var _map = require('babel-runtime/core-js/map');
var _map2 = _interopRequireDefault(_map);
var _getPrototypeOf = require('babel-runtime/core-js/object/get-prototype-of');
var _getPrototypeOf2 = _interopRequireDefault(_getPrototypeOf);
var _classCallCheck2 = require('babel-runtime/helpers/classCallCheck');
var _classCallCheck3 = _interopRequireDefault(_classCallCheck2);
var _createClass2 = require('babel-runtime/helpers/createClass');
var _createClass3 = _interopRequireDefault(_createClass2);
var _possibleConstructorReturn2 = require('babel-runtime/helpers/possibleConstructorReturn');
var _possibleConstructorReturn3 = _interopRequireDefault(_possibleConstructorReturn2);
var _inherits2 = require('babel-runtime/helpers/inherits');
var _inherits3 = _interopRequireDefault(_inherits2);
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
var Promise = require('bluebird');
var Redis = require('ioredis');
var msgpack = require('msgpack-lite');
var _require = require('eventemitter3');
var EventEmitter = _require.EventEmitter;
const EventEmitter = _require.EventEmitter;

@@ -107,85 +71,57 @@ /**

var RedisConnector = function (_EventEmitter) {
(0, _inherits3.default)(RedisConnector, _EventEmitter);
class RedisConnector extends EventEmitter {
constructor(options) {
super();
this.pub = new Redis(options);
this.sub = new Redis(options);
this.sub.subscribe().catchReturn();
this.sub.on('messageBuffer', this._onMessage.bind(this));
this.sub.on('error', this.emit.bind(this));
this.pub.on('error', this.emit.bind(this));
}
function RedisConnector(options) {
(0, _classCallCheck3.default)(this, RedisConnector);
_onMessage(buf, data) {
let ch = buf.toString();
this.emit('message', ch, data);
}
var _this = (0, _possibleConstructorReturn3.default)(this, (RedisConnector.__proto__ || (0, _getPrototypeOf2.default)(RedisConnector)).call(this));
publish(ch, data) {
return this.pub.publish(ch, data);
}
_this.pub = new Redis(options);
_this.sub = new Redis(options);
_this.sub.subscribe().catchReturn();
_this.sub.on('messageBuffer', _this._onMessage.bind(_this));
_this.sub.on('error', _this.emit.bind(_this));
_this.pub.on('error', _this.emit.bind(_this));
return _this;
subscribe(ch) {
return this.sub.subscribe(ch);
}
(0, _createClass3.default)(RedisConnector, [{
key: '_onMessage',
value: function _onMessage(buf, data) {
var ch = buf.toString();
this.emit('message', ch, data);
}
}, {
key: 'publish',
value: function publish(ch, data) {
return this.pub.publish(ch, data);
}
}, {
key: 'subscribe',
value: function subscribe(ch) {
return this.sub.subscribe(ch);
}
}, {
key: 'unsubscribe',
value: function unsubscribe(ch) {
return this.sub.unsubscribe(ch);
}
}, {
key: 'close',
value: function close() {
return Promise.all([this.pub.quit(), this.sub.quit()]);
}
}]);
return RedisConnector;
}(EventEmitter);
unsubscribe(ch) {
return this.sub.unsubscribe(ch);
}
var MemoryConnector = function (_EventEmitter2) {
(0, _inherits3.default)(MemoryConnector, _EventEmitter2);
close() {
return Promise.all([this.pub.quit(), this.sub.quit()]);
}
}
function MemoryConnector(options) {
(0, _classCallCheck3.default)(this, MemoryConnector);
return (0, _possibleConstructorReturn3.default)(this, (MemoryConnector.__proto__ || (0, _getPrototypeOf2.default)(MemoryConnector)).call(this));
class MemoryConnector extends EventEmitter {
constructor(options) {
super();
}
(0, _createClass3.default)(MemoryConnector, [{
key: 'publish',
value: function publish(ch, data) {
var _this3 = this;
publish(ch, data) {
return Promise.resolve().then(() => this.emit('message', ch, data));
}
return Promise.resolve().then(function () {
return _this3.emit('message', ch, data);
});
}
}, {
key: 'subscribe',
value: function subscribe(ch) {
return Promise.resolve();
}
}, {
key: 'unsubscribe',
value: function unsubscribe(ch) {
return Promise.resolve();
}
}, {
key: 'close',
value: function close() {
return Promise.resolve();
}
}]);
return MemoryConnector;
}(EventEmitter);
subscribe(ch) {
return Promise.resolve();
}
unsubscribe(ch) {
return Promise.resolve();
}
close() {
return Promise.resolve();
}
}
/**

@@ -204,7 +140,3 @@ * @typedef {Object} EmitterPubsubBroker.Options

*/
var EmitterPubsubBroker = function (_EventEmitter3) {
(0, _inherits3.default)(EmitterPubsubBroker, _EventEmitter3);
class EmitterPubsubBroker extends EventEmitter {
/**

@@ -217,22 +149,19 @@ * Creates a broker.

*/
function EmitterPubsubBroker(options) {
(0, _classCallCheck3.default)(this, EmitterPubsubBroker);
var _this4 = (0, _possibleConstructorReturn3.default)(this, (EmitterPubsubBroker.__proto__ || (0, _getPrototypeOf2.default)(EmitterPubsubBroker)).call(this));
constructor(options) {
super();
if (options == null || typeof options === 'string') {
options = { connect: options };
}
_this4.prefix = options.prefix || 'emitter-pubsub-broker:';
_this4.includeChannel = options.includeChannel;
_this4.clientChannels = new _map2.default();
_this4.channelClients = new _map2.default();
this.prefix = options.prefix || 'emitter-pubsub-broker:';
this.includeChannel = options.includeChannel;
this.clientChannels = new Map();
this.channelClients = new Map();
if (options.connect || options.connector) {
_this4.connector = options.connector || new RedisConnector(options.connect);
_this4.serialize = true;
this.connector = options.connector || new RedisConnector(options.connect);
this.serialize = true;
} else {
_this4.connector = new MemoryConnector();
_this4.serialize = false;
this.connector = new MemoryConnector();
this.serialize = false;
}
_this4.connector.on('message', _this4._dispatch.bind(_this4));
this.connector.on('message', this._dispatch.bind(this));
/**

@@ -245,247 +174,177 @@ * Connector error. Does not throw if there are no listeners.

*/
_this4.connector.on('error', _this4.emit.bind(_this4));
return _this4;
this.connector.on('error', this.emit.bind(this));
}
(0, _createClass3.default)(EmitterPubsubBroker, [{
key: '_channelAddClient',
value: function _channelAddClient(client, ch) {
var clients = this.channelClients.get(ch);
if (clients == null) {
clients = new _set2.default();
this.channelClients.set(ch, clients);
clients.add(client);
return this.connector.subscribe(ch);
} else {
clients.add(client);
return Promise.resolve();
}
_channelAddClient(client, ch) {
let clients = this.channelClients.get(ch);
if (clients == null) {
clients = new Set();
this.channelClients.set(ch, clients);
clients.add(client);
return this.connector.subscribe(ch);
} else {
clients.add(client);
return Promise.resolve();
}
}, {
key: '_channelRemoveClient',
value: function _channelRemoveClient(client, ch) {
var clients = this.channelClients.get(ch);
var nclients = void 0;
if (clients != null) {
clients.delete(client);
nclients = clients.size;
}
if (nclients === 0) {
return this.connector.unsubscribe(ch);
} else {
return Promise.resolve();
}
}
_channelRemoveClient(client, ch) {
let clients = this.channelClients.get(ch);
let nclients;
if (clients != null) {
clients.delete(client);
nclients = clients.size;
}
}, {
key: '_makeMessage',
value: function _makeMessage(msg) {
return this.serialize ? msgpack.encode(msg) : msg;
if (nclients === 0) {
return this.connector.unsubscribe(ch);
} else {
return Promise.resolve();
}
}
/**
* Subscribes emitter to a channel.
*
* @param {EventEmitter} client Emitter.
* @param {string} channel Channel.
* @return {Promise<undefined>}
*/
_makeMessage(msg) {
return this.serialize ? msgpack.encode(msg) : msg;
}
}, {
key: 'subscribe',
value: function subscribe(client, channel) {
var channels = this.clientChannels.get(client);
if (!channels) {
channels = new _set2.default();
this.clientChannels.set(client, channels);
}
var ch = this.prefix + channel;
channels.add(ch);
return this._channelAddClient(client, ch);
/**
* Subscribes emitter to a channel.
*
* @param {EventEmitter} client Emitter.
* @param {string} channel Channel.
* @return {Promise<undefined>}
*/
subscribe(client, channel) {
let channels = this.clientChannels.get(client);
if (!channels) {
channels = new Set();
this.clientChannels.set(client, channels);
}
let ch = this.prefix + channel;
channels.add(ch);
return this._channelAddClient(client, ch);
}
/**
* Unsubscribes emitter from a channel.
*
* @param {EventEmitter} client Emitter.
* @param {string} channel Channel.
* @return {Promise<undefined>}
*/
/**
* Unsubscribes emitter from a channel.
*
* @param {EventEmitter} client Emitter.
* @param {string} channel Channel.
* @return {Promise<undefined>}
*/
unsubscribe(client, channel) {
let channels = this.clientChannels.get(client);
let ch = this.prefix + channel;
if (channels) {
channels.delete(ch);
}
return this._channelRemoveClient(client, ch);
}
}, {
key: 'unsubscribe',
value: function unsubscribe(client, channel) {
var channels = this.clientChannels.get(client);
var ch = this.prefix + channel;
if (channels) {
channels.delete(ch);
}
return this._channelRemoveClient(client, ch);
/**
* Unsubscribes emitter from all channel.
*
* @param {EventEmitter} client Emitter.
* @return {Promise<undefined>}
*/
unsubscribeall(client) {
let channels = this.clientChannels.get(client);
this.clientChannels.delete(client);
if (channels) {
return Promise.each(channels, this._channelRemoveClient.bind(this, client));
} else {
return Promise.resolve();
}
}
/**
* Unsubscribes emitter from all channel.
*
* @param {EventEmitter} client Emitter.
* @return {Promise<undefined>}
*/
}, {
key: 'unsubscribeall',
value: function unsubscribeall(client) {
var channels = this.clientChannels.get(client);
this.clientChannels.delete(client);
if (channels) {
return Promise.each(channels, this._channelRemoveClient.bind(this, client));
} else {
return Promise.resolve();
}
/**
* Publish an event to a channel.
*
* @param {string} channel Channel.
* @param {string} name Event name.
* @param {*} args Arguments.
* @return {Promise<undefined>}
*/
publish(channel, name) {
for (var _len = arguments.length, args = Array(_len > 2 ? _len - 2 : 0), _key = 2; _key < _len; _key++) {
args[_key - 2] = arguments[_key];
}
/**
* Publish an event to a channel.
*
* @param {string} channel Channel.
* @param {string} name Event name.
* @param {*} args Arguments.
* @return {Promise<undefined>}
*/
let ch = this.prefix + channel;
let msg = this._makeMessage({ name: name, args: args });
return this.connector.publish(ch, msg);
}
}, {
key: 'publish',
value: function publish(channel, name) {
for (var _len = arguments.length, args = Array(_len > 2 ? _len - 2 : 0), _key = 2; _key < _len; _key++) {
args[_key - 2] = arguments[_key];
}
var ch = this.prefix + channel;
var msg = this._makeMessage({ name: name, args: args });
return this.connector.publish(ch, msg);
/**
* Publish an event to a channel, excluding the sender. The client
* object __MUST__ have an unique `id` field.
*
* @param {EventEmitter} client Emitter.
* @param {string} channel Channel.
* @param {string} name Event name.
* @param {*} args Arguments.
* @return {Promise<undefined>}
*/
send(client, channel, name) {
for (var _len2 = arguments.length, args = Array(_len2 > 3 ? _len2 - 3 : 0), _key2 = 3; _key2 < _len2; _key2++) {
args[_key2 - 3] = arguments[_key2];
}
/**
* Publish an event to a channel, excluding the sender. The client
* object __MUST__ have an unique `id` field.
*
* @param {EventEmitter} client Emitter.
* @param {string} channel Channel.
* @param {string} name Event name.
* @param {*} args Arguments.
* @return {Promise<undefined>}
*/
let ch = this.prefix + channel;
let sender = client.id;
let msg = this._makeMessage({ sender: sender, name: name, args: args });
return this.connector.publish(ch, msg);
}
}, {
key: 'send',
value: function send(client, channel, name) {
for (var _len2 = arguments.length, args = Array(_len2 > 3 ? _len2 - 3 : 0), _key2 = 3; _key2 < _len2; _key2++) {
args[_key2 - 3] = arguments[_key2];
/**
* Returns client subscriptions.
*
* @param {EventEmitter} client Emitter.
* @return {Array<string>}
*/
getSubscriptions(client) {
let channels = this.clientChannels.get(client);
let res = [];
if (channels) {
let plen = this.prefix.length;
for (let channel of channels) {
res.push(channel.slice(plen));
}
var ch = this.prefix + channel;
var sender = client.id;
var msg = this._makeMessage({ sender: sender, name: name, args: args });
return this.connector.publish(ch, msg);
}
return res;
}
/**
* Returns client subscriptions.
*
* @param {EventEmitter} client Emitter.
* @return {Array<string>}
*/
/**
* Closes broker.
*
* @return {Promise<undefined>}
*/
close() {
this.channelClients.clear();
this.clientChannels.clear();
return this.connector.close();
}
}, {
key: 'getSubscriptions',
value: function getSubscriptions(client) {
var channels = this.clientChannels.get(client);
var res = [];
if (channels) {
var plen = this.prefix.length;
var _iteratorNormalCompletion = true;
var _didIteratorError = false;
var _iteratorError = undefined;
try {
for (var _iterator = (0, _getIterator3.default)(channels), _step; !(_iteratorNormalCompletion = (_step = _iterator.next()).done); _iteratorNormalCompletion = true) {
var channel = _step.value;
res.push(channel.slice(plen));
}
} catch (err) {
_didIteratorError = true;
_iteratorError = err;
} finally {
try {
if (!_iteratorNormalCompletion && _iterator.return) {
_iterator.return();
}
} finally {
if (_didIteratorError) {
throw _iteratorError;
}
}
_dispatch(ch, data) {
let channel = ch.slice(this.prefix.length);
let message = this.serialize ? msgpack.decode(data) : data;
let clients = this.channelClients.get(ch);
/* istanbul ignore else */
if (clients) {
let args;
if (this.includeChannel) {
args = [message.name, channel].concat(_toConsumableArray(message.args));
} else {
args = [message.name].concat(_toConsumableArray(message.args));
}
for (let client of clients) {
if (!message.sender || client.id !== message.sender) {
client.emit.apply(client, _toConsumableArray(args));
}
}
return res;
}
}
/**
* Closes broker.
*
* @return {Promise<undefined>}
*/
}
}, {
key: 'close',
value: function close() {
this.channelClients.clear();
this.clientChannels.clear();
return this.connector.close();
}
}, {
key: '_dispatch',
value: function _dispatch(ch, data) {
var channel = ch.slice(this.prefix.length);
var message = this.serialize ? msgpack.decode(data) : data;
var clients = this.channelClients.get(ch);
/* istanbul ignore else */
if (clients) {
var args = void 0;
if (this.includeChannel) {
args = [message.name, channel].concat((0, _toConsumableArray3.default)(message.args));
} else {
args = [message.name].concat((0, _toConsumableArray3.default)(message.args));
}
var _iteratorNormalCompletion2 = true;
var _didIteratorError2 = false;
var _iteratorError2 = undefined;
try {
for (var _iterator2 = (0, _getIterator3.default)(clients), _step2; !(_iteratorNormalCompletion2 = (_step2 = _iterator2.next()).done); _iteratorNormalCompletion2 = true) {
var client = _step2.value;
if (!message.sender || client.id !== message.sender) {
client.emit.apply(client, (0, _toConsumableArray3.default)(args));
}
}
} catch (err) {
_didIteratorError2 = true;
_iteratorError2 = err;
} finally {
try {
if (!_iteratorNormalCompletion2 && _iterator2.return) {
_iterator2.return();
}
} finally {
if (_didIteratorError2) {
throw _iteratorError2;
}
}
}
}
}
}]);
return EmitterPubsubBroker;
}(EventEmitter);
module.exports = EmitterPubsubBroker;
//# sourceMappingURL=data:application/json;base64,{"version":3,"sources":["../src/EmitterPubsubBroker.js"],"names":["Promise","require","Redis","msgpack","EventEmitter","RedisConnector","options","pub","sub","subscribe","catchReturn","on","_onMessage","bind","emit","buf","data","ch","toString","publish","unsubscribe","all","quit","MemoryConnector","resolve","then","EmitterPubsubBroker","connect","prefix","includeChannel","clientChannels","channelClients","connector","serialize","_dispatch","client","clients","get","set","add","nclients","delete","size","msg","encode","channel","channels","_channelAddClient","_channelRemoveClient","each","name","args","_makeMessage","sender","id","res","plen","length","push","slice","clear","close","message","decode","module","exports"],"mappings":"AAAA;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;AAEA,IAAMA,UAAUC,QAAQ,UAAR,CAAhB;AACA,IAAMC,QAAQD,QAAQ,SAAR,CAAd;AACA,IAAME,UAAUF,QAAQ,cAAR,CAAhB;;eACyBA,QAAQ,eAAR,C;;IAAjBG,Y,YAAAA,Y;;AAER;;;;;;;AAOA;;;;;;;;;;AAUA;;;;;;;;;AASA;;;;;;;;;AASA;;;;;;;;AAQA;;;;;;;AAOA;;;;;;;;IAQMC,c;;;AACJ,0BAAaC,OAAb,EAAsB;AAAA;;AAAA;;AAEpB,UAAKC,GAAL,GAAW,IAAIL,KAAJ,CAAUI,OAAV,CAAX;AACA,UAAKE,GAAL,GAAW,IAAIN,KAAJ,CAAUI,OAAV,CAAX;AACA,UAAKE,GAAL,CAASC,SAAT,GAAqBC,WAArB;AACA,UAAKF,GAAL,CAASG,EAAT,CAAY,eAAZ,EAA6B,MAAKC,UAAL,CAAgBC,IAAhB,OAA7B;AACA,UAAKL,GAAL,CAASG,EAAT,CAAY,OAAZ,EAAqB,MAAKG,IAAL,CAAUD,IAAV,OAArB;AACA,UAAKN,GAAL,CAASI,EAAT,CAAY,OAAZ,EAAqB,MAAKG,IAAL,CAAUD,IAAV,OAArB;AAPoB;AAQrB;;;;+BAEWE,G,EAAKC,I,EAAM;AACrB,UAAIC,KAAKF,IAAIG,QAAJ,EAAT;AACA,WAAKJ,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB;AACD;;;4BAEQC,E,EAAID,I,EAAM;AACjB,aAAO,KAAKT,GAAL,CAASY,OAAT,CAAiBF,EAAjB,EAAqBD,IAArB,CAAP;AACD;;;8BAEUC,E,EAAI;AACb,aAAO,KAAKT,GAAL,CAASC,SAAT,CAAmBQ,EAAnB,CAAP;AACD;;;gCAEYA,E,EAAI;AACf,aAAO,KAAKT,GAAL,CAASY,WAAT,CAAqBH,EAArB,CAAP;AACD;;;4BAEQ;AACP,aAAOjB,QAAQqB,GAAR,CAAY,CAAC,KAAKd,GAAL,CAASe,IAAT,EAAD,EAAkB,KAAKd,GAAL,CAASc,IAAT,EAAlB,CAAZ,CAAP;AACD;;;EA9B0BlB,Y;;IAiCvBmB,e;;;AACJ,2BAAajB,OAAb,EAAsB;AAAA;AAAA;AAErB;;;;4BAEQW,E,EAAID,I,EAAM;AAAA;;AACjB,aAAOhB,QAAQwB,OAAR,GACJC,IADI,CACC;AAAA,eAAM,OAAKX,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB,CAAN;AAAA,OADD,CAAP;AAED;;;8BAEUC,E,EAAI;AACb,aAAOjB,QAAQwB,OAAR,EAAP;AACD;;;gCAEYP,E,EAAI;AACf,aAAOjB,QAAQwB,OAAR,EAAP;AACD;;;4BAEQ;AACP,aAAOxB,QAAQwB,OAAR,EAAP;AACD;;;EApB2BpB,Y;;AAuB9B;;;;;;;;;;AAUA;;;;;IAGMsB,mB;;;AACJ;;;;;;;AAOA,+BAAapB,OAAb,EAAsB;AAAA;;AAAA;;AAEpB,QAAIA,WAAW,IAAX,IAAmB,OAAOA,OAAP,KAAmB,QAA1C,EAAoD;AAClDA,gBAAU,EAAEqB,SAASrB,OAAX,EAAV;AACD;AACD,WAAKsB,MAAL,GAActB,QAAQsB,MAAR,IAAkB,wBAAhC;AACA,WAAKC,cAAL,GAAsBvB,QAAQuB,cAA9B;AACA,WAAKC,cAAL,GAAsB,mBAAtB;AACA,WAAKC,cAAL,GAAsB,mBAAtB;AACA,QAAIzB,QAAQqB,OAAR,IAAmBrB,QAAQ0B,SAA/B,EAA0C;AACxC,aAAKA,SAAL,GAAiB1B,QAAQ0B,SAAR,IAAqB,IAAI3B,cAAJ,CAAmBC,QAAQqB,OAA3B,CAAtC;AACA,aAAKM,SAAL,GAAiB,IAAjB;AACD,KAHD,MAGO;AACL,aAAKD,SAAL,GAAiB,IAAIT,eAAJ,EAAjB;AACA,aAAKU,SAAL,GAAiB,KAAjB;AACD;AACD,WAAKD,SAAL,CAAerB,EAAf,CAAkB,SAAlB,EAA6B,OAAKuB,SAAL,CAAerB,IAAf,QAA7B;AACA;;;;;;;AAOA,WAAKmB,SAAL,CAAerB,EAAf,CAAkB,OAAlB,EAA2B,OAAKG,IAAL,CAAUD,IAAV,QAA3B;AAxBoB;AAyBrB;;;;sCAEkBsB,M,EAAQlB,E,EAAI;AAC7B,UAAImB,UAAU,KAAKL,cAAL,CAAoBM,GAApB,CAAwBpB,EAAxB,CAAd;AACA,UAAImB,WAAW,IAAf,EAAqB;AACnBA,kBAAU,mBAAV;AACA,aAAKL,cAAL,CAAoBO,GAApB,CAAwBrB,EAAxB,EAA4BmB,OAA5B;AACAA,gBAAQG,GAAR,CAAYJ,MAAZ;AACA,eAAO,KAAKH,SAAL,CAAevB,SAAf,CAAyBQ,EAAzB,CAAP;AACD,OALD,MAKO;AACLmB,gBAAQG,GAAR,CAAYJ,MAAZ;AACA,eAAOnC,QAAQwB,OAAR,EAAP;AACD;AACF;;;yCAEqBW,M,EAAQlB,E,EAAI;AAChC,UAAImB,UAAU,KAAKL,cAAL,CAAoBM,GAApB,CAAwBpB,EAAxB,CAAd;AACA,UAAIuB,iBAAJ;AACA,UAAIJ,WAAW,IAAf,EAAqB;AACnBA,gBAAQK,MAAR,CAAeN,MAAf;AACAK,mBAAWJ,QAAQM,IAAnB;AACD;AACD,UAAIF,aAAa,CAAjB,EAAoB;AAClB,eAAO,KAAKR,SAAL,CAAeZ,WAAf,CAA2BH,EAA3B,CAAP;AACD,OAFD,MAEO;AACL,eAAOjB,QAAQwB,OAAR,EAAP;AACD;AACF;;;iCAEamB,G,EAAK;AACjB,aAAO,KAAKV,SAAL,GAAiB9B,QAAQyC,MAAR,CAAeD,GAAf,CAAjB,GAAuCA,GAA9C;AACD;;AAED;;;;;;;;;;8BAOWR,M,EAAQU,O,EAAS;AAC1B,UAAIC,WAAW,KAAKhB,cAAL,CAAoBO,GAApB,CAAwBF,MAAxB,CAAf;AACA,UAAI,CAACW,QAAL,EAAe;AACbA,mBAAW,mBAAX;AACA,aAAKhB,cAAL,CAAoBQ,GAApB,CAAwBH,MAAxB,EAAgCW,QAAhC;AACD;AACD,UAAI7B,KAAK,KAAKW,MAAL,GAAciB,OAAvB;AACAC,eAASP,GAAT,CAAatB,EAAb;AACA,aAAO,KAAK8B,iBAAL,CAAuBZ,MAAvB,EAA+BlB,EAA/B,CAAP;AACD;;AAED;;;;;;;;;;gCAOakB,M,EAAQU,O,EAAS;AAC5B,UAAIC,WAAW,KAAKhB,cAAL,CAAoBO,GAApB,CAAwBF,MAAxB,CAAf;AACA,UAAIlB,KAAK,KAAKW,MAAL,GAAciB,OAAvB;AACA,UAAIC,QAAJ,EAAc;AACZA,iBAASL,MAAT,CAAgBxB,EAAhB;AACD;AACD,aAAO,KAAK+B,oBAAL,CAA0Bb,MAA1B,EAAkClB,EAAlC,CAAP;AACD;;AAED;;;;;;;;;mCAMgBkB,M,EAAQ;AACtB,UAAIW,WAAW,KAAKhB,cAAL,CAAoBO,GAApB,CAAwBF,MAAxB,CAAf;AACA,WAAKL,cAAL,CAAoBW,MAApB,CAA2BN,MAA3B;AACA,UAAIW,QAAJ,EAAc;AACZ,eAAO9C,QAAQiD,IAAR,CACLH,QADK,EACK,KAAKE,oBAAL,CAA0BnC,IAA1B,CAA+B,IAA/B,EAAqCsB,MAArC,CADL,CAAP;AAED,OAHD,MAGO;AACL,eAAOnC,QAAQwB,OAAR,EAAP;AACD;AACF;;AAED;;;;;;;;;;;4BAQSqB,O,EAASK,I,EAAe;AAAA,wCAANC,IAAM;AAANA,YAAM;AAAA;;AAC/B,UAAIlC,KAAK,KAAKW,MAAL,GAAciB,OAAvB;AACA,UAAIF,MAAM,KAAKS,YAAL,CAAkB,EAACF,UAAD,EAAOC,UAAP,EAAlB,CAAV;AACA,aAAO,KAAKnB,SAAL,CAAeb,OAAf,CAAuBF,EAAvB,EAA2B0B,GAA3B,CAAP;AACD;;AAED;;;;;;;;;;;;;yBAUMR,M,EAAQU,O,EAASK,I,EAAe;AAAA,yCAANC,IAAM;AAANA,YAAM;AAAA;;AACpC,UAAIlC,KAAK,KAAKW,MAAL,GAAciB,OAAvB;AACA,UAAIQ,SAASlB,OAAOmB,EAApB;AACA,UAAIX,MAAM,KAAKS,YAAL,CAAkB,EAACC,cAAD,EAASH,UAAT,EAAeC,UAAf,EAAlB,CAAV;AACA,aAAO,KAAKnB,SAAL,CAAeb,OAAf,CAAuBF,EAAvB,EAA2B0B,GAA3B,CAAP;AACD;;AAED;;;;;;;;;qCAMkBR,M,EAAQ;AACxB,UAAIW,WAAW,KAAKhB,cAAL,CAAoBO,GAApB,CAAwBF,MAAxB,CAAf;AACA,UAAIoB,MAAM,EAAV;AACA,UAAIT,QAAJ,EAAc;AACZ,YAAIU,OAAO,KAAK5B,MAAL,CAAY6B,MAAvB;AADY;AAAA;AAAA;;AAAA;AAEZ,0DAAoBX,QAApB,4GAA8B;AAAA,gBAArBD,OAAqB;;AAC5BU,gBAAIG,IAAJ,CAASb,QAAQc,KAAR,CAAcH,IAAd,CAAT;AACD;AAJW;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAKb;AACD,aAAOD,GAAP;AACD;;AAED;;;;;;;;4BAKS;AACP,WAAKxB,cAAL,CAAoB6B,KAApB;AACA,WAAK9B,cAAL,CAAoB8B,KAApB;AACA,aAAO,KAAK5B,SAAL,CAAe6B,KAAf,EAAP;AACD;;;8BAEU5C,E,EAAID,I,EAAM;AACnB,UAAI6B,UAAU5B,GAAG0C,KAAH,CAAS,KAAK/B,MAAL,CAAY6B,MAArB,CAAd;AACA,UAAIK,UAAU,KAAK7B,SAAL,GAAiB9B,QAAQ4D,MAAR,CAAe/C,IAAf,CAAjB,GAAwCA,IAAtD;AACA,UAAIoB,UAAU,KAAKL,cAAL,CAAoBM,GAApB,CAAwBpB,EAAxB,CAAd;AACA;AACA,UAAImB,OAAJ,EAAa;AACX,YAAIe,aAAJ;AACA,YAAI,KAAKtB,cAAT,EAAyB;AACvBsB,kBAAQW,QAAQZ,IAAhB,EAAsBL,OAAtB,0CAAkCiB,QAAQX,IAA1C;AACD,SAFD,MAEO;AACLA,kBAAQW,QAAQZ,IAAhB,0CAAyBY,QAAQX,IAAjC;AACD;AANU;AAAA;AAAA;;AAAA;AAOX,2DAAmBf,OAAnB,iHAA4B;AAAA,gBAAnBD,MAAmB;;AAC1B,gBAAI,CAAC2B,QAAQT,MAAT,IAAmBlB,OAAOmB,EAAP,KAAcQ,QAAQT,MAA7C,EAAqD;AACnDlB,qBAAOrB,IAAP,gDAAeqC,IAAf;AACD;AACF;AAXU;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAYZ;AACF;;;EAnM+B/C,Y;;AAuMlC4D,OAAOC,OAAP,GAAiBvC,mBAAjB","file":"EmitterPubsubBroker.js","sourcesContent":["'use strict'\n\nconst Promise = require('bluebird')\nconst Redis = require('ioredis')\nconst msgpack = require('msgpack-lite')\nconst { EventEmitter } = require('eventemitter3')\n\n/**\n * Interface for connector implementations.\n *\n * @interface Connector\n * @extends EventEmitter\n */\n\n/**\n * @method\n * @instance\n * @name publish\n * @memberOf Connector\n * @param {string} channel Channel.\n * @param {Buffer} data Data.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name subscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name unsubscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name close\n * @memberOf Connector\n * @return {Promise<undefined>}\n */\n\n/**\n * @event message\n * @memberOf Connector\n * @param {string} event Event name.\n * @param {Buffer} data Event data.\n */\n\n/**\n * Event will be listened by {@link EmitterPubsubBroker} instance.\n *\n * @event error\n * @memberOf Connector\n * @param {Error} error Error.\n */\n\nclass RedisConnector extends EventEmitter {\n  constructor (options) {\n    super()\n    this.pub = new Redis(options)\n    this.sub = new Redis(options)\n    this.sub.subscribe().catchReturn()\n    this.sub.on('messageBuffer', this._onMessage.bind(this))\n    this.sub.on('error', this.emit.bind(this))\n    this.pub.on('error', this.emit.bind(this))\n  }\n\n  _onMessage (buf, data) {\n    let ch = buf.toString()\n    this.emit('message', ch, data)\n  }\n\n  publish (ch, data) {\n    return this.pub.publish(ch, data)\n  }\n\n  subscribe (ch) {\n    return this.sub.subscribe(ch)\n  }\n\n  unsubscribe (ch) {\n    return this.sub.unsubscribe(ch)\n  }\n\n  close () {\n    return Promise.all([this.pub.quit(), this.sub.quit()])\n  }\n}\n\nclass MemoryConnector extends EventEmitter {\n  constructor (options) {\n    super()\n  }\n\n  publish (ch, data) {\n    return Promise.resolve()\n      .then(() => this.emit('message', ch, data))\n  }\n\n  subscribe (ch) {\n    return Promise.resolve()\n  }\n\n  unsubscribe (ch) {\n    return Promise.resolve()\n  }\n\n  close () {\n    return Promise.resolve()\n  }\n}\n\n/**\n * @typedef {Object} EmitterPubsubBroker.Options\n *\n * @property {string} connect Connect string.\n * @property {string} [prefix='emitter-pubsub-broker:'] Prefix.\n * @property {boolean} [includeChannel=false] Include channel as the\n * first argument.\n * @property {Connector} [connector] Custom connector implementation.\n */\n\n/**\n * @extends EventEmitter\n */\nclass EmitterPubsubBroker extends EventEmitter {\n  /**\n   * Creates a broker.\n   *\n   * @param {EmitterPubsubBroker.Options|string} options Options or a\n   * connect if a string. If connect string is empty, then an\n   * in-memory connector is used.\n   */\n  constructor (options) {\n    super()\n    if (options == null || typeof options === 'string') {\n      options = { connect: options }\n    }\n    this.prefix = options.prefix || 'emitter-pubsub-broker:'\n    this.includeChannel = options.includeChannel\n    this.clientChannels = new Map()\n    this.channelClients = new Map()\n    if (options.connect || options.connector) {\n      this.connector = options.connector || new RedisConnector(options.connect)\n      this.serialize = true\n    } else {\n      this.connector = new MemoryConnector()\n      this.serialize = false\n    }\n    this.connector.on('message', this._dispatch.bind(this))\n    /**\n     * Connector error. Does not throw if there are no listeners.\n     *\n     * @event error\n     * @memberOf EmitterPubsubBroker\n     * @param {Error} error Error.\n     */\n    this.connector.on('error', this.emit.bind(this))\n  }\n\n  _channelAddClient (client, ch) {\n    let clients = this.channelClients.get(ch)\n    if (clients == null) {\n      clients = new Set()\n      this.channelClients.set(ch, clients)\n      clients.add(client)\n      return this.connector.subscribe(ch)\n    } else {\n      clients.add(client)\n      return Promise.resolve()\n    }\n  }\n\n  _channelRemoveClient (client, ch) {\n    let clients = this.channelClients.get(ch)\n    let nclients\n    if (clients != null) {\n      clients.delete(client)\n      nclients = clients.size\n    }\n    if (nclients === 0) {\n      return this.connector.unsubscribe(ch)\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  _makeMessage (msg) {\n    return this.serialize ? msgpack.encode(msg) : msg\n  }\n\n  /**\n   * Subscribes emitter to a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  subscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    if (!channels) {\n      channels = new Set()\n      this.clientChannels.set(client, channels)\n    }\n    let ch = this.prefix + channel\n    channels.add(ch)\n    return this._channelAddClient(client, ch)\n  }\n\n  /**\n   * Unsubscribes emitter from a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  unsubscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    let ch = this.prefix + channel\n    if (channels) {\n      channels.delete(ch)\n    }\n    return this._channelRemoveClient(client, ch)\n  }\n\n  /**\n   * Unsubscribes emitter from all channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Promise<undefined>}\n   */\n  unsubscribeall (client) {\n    let channels = this.clientChannels.get(client)\n    this.clientChannels.delete(client)\n    if (channels) {\n      return Promise.each(\n        channels, this._channelRemoveClient.bind(this, client))\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  /**\n   * Publish an event to a channel.\n   *\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  publish (channel, name, ...args) {\n    let ch = this.prefix + channel\n    let msg = this._makeMessage({name, args})\n    return this.connector.publish(ch, msg)\n  }\n\n  /**\n   * Publish an event to a channel, excluding the sender. The client\n   * object __MUST__ have an unique `id` field.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  send (client, channel, name, ...args) {\n    let ch = this.prefix + channel\n    let sender = client.id\n    let msg = this._makeMessage({sender, name, args})\n    return this.connector.publish(ch, msg)\n  }\n\n  /**\n   * Returns client subscriptions.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Array<string>}\n   */\n  getSubscriptions (client) {\n    let channels = this.clientChannels.get(client)\n    let res = []\n    if (channels) {\n      let plen = this.prefix.length\n      for (let channel of channels) {\n        res.push(channel.slice(plen))\n      }\n    }\n    return res\n  }\n\n  /**\n   * Closes broker.\n   *\n   * @return {Promise<undefined>}\n   */\n  close () {\n    this.channelClients.clear()\n    this.clientChannels.clear()\n    return this.connector.close()\n  }\n\n  _dispatch (ch, data) {\n    let channel = ch.slice(this.prefix.length)\n    let message = this.serialize ? msgpack.decode(data) : data\n    let clients = this.channelClients.get(ch)\n    /* istanbul ignore else */\n    if (clients) {\n      let args\n      if (this.includeChannel) {\n        args = [message.name, channel, ...message.args]\n      } else {\n        args = [message.name, ...message.args]\n      }\n      for (let client of clients) {\n        if (!message.sender || client.id !== message.sender) {\n          client.emit(...args)\n        }\n      }\n    }\n  }\n\n}\n\nmodule.exports = EmitterPubsubBroker\n"]}
//# sourceMappingURL=data:application/json;base64,{"version":3,"sources":["../src/EmitterPubsubBroker.js"],"names":["Promise","require","Redis","msgpack","EventEmitter","RedisConnector","constructor","options","pub","sub","subscribe","catchReturn","on","_onMessage","bind","emit","buf","data","ch","toString","publish","unsubscribe","close","all","quit","MemoryConnector","resolve","then","EmitterPubsubBroker","connect","prefix","includeChannel","clientChannels","Map","channelClients","connector","serialize","_dispatch","_channelAddClient","client","clients","get","Set","set","add","_channelRemoveClient","nclients","delete","size","_makeMessage","msg","encode","channel","channels","unsubscribeall","each","name","args","send","sender","id","getSubscriptions","res","plen","length","push","slice","clear","message","decode","module","exports"],"mappings":"AAAA;;;;AAEA,MAAMA,UAAUC,QAAQ,UAAR,CAAhB;AACA,MAAMC,QAAQD,QAAQ,SAAR,CAAd;AACA,MAAME,UAAUF,QAAQ,cAAR,CAAhB;;eACyBA,QAAQ,eAAR,C;;MAAjBG,Y,YAAAA,Y;;AAER;;;;;;;AAOA;;;;;;;;;;AAUA;;;;;;;;;AASA;;;;;;;;;AASA;;;;;;;;AAQA;;;;;;;AAOA;;;;;;;;AAQA,MAAMC,cAAN,SAA6BD,YAA7B,CAA0C;AACxCE,cAAaC,OAAb,EAAsB;AACpB;AACA,SAAKC,GAAL,GAAW,IAAIN,KAAJ,CAAUK,OAAV,CAAX;AACA,SAAKE,GAAL,GAAW,IAAIP,KAAJ,CAAUK,OAAV,CAAX;AACA,SAAKE,GAAL,CAASC,SAAT,GAAqBC,WAArB;AACA,SAAKF,GAAL,CAASG,EAAT,CAAY,eAAZ,EAA6B,KAAKC,UAAL,CAAgBC,IAAhB,CAAqB,IAArB,CAA7B;AACA,SAAKL,GAAL,CAASG,EAAT,CAAY,OAAZ,EAAqB,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAArB;AACA,SAAKN,GAAL,CAASI,EAAT,CAAY,OAAZ,EAAqB,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAArB;AACD;;AAEDD,aAAYG,GAAZ,EAAiBC,IAAjB,EAAuB;AACrB,QAAIC,KAAKF,IAAIG,QAAJ,EAAT;AACA,SAAKJ,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB;AACD;;AAEDG,UAASF,EAAT,EAAaD,IAAb,EAAmB;AACjB,WAAO,KAAKT,GAAL,CAASY,OAAT,CAAiBF,EAAjB,EAAqBD,IAArB,CAAP;AACD;;AAEDP,YAAWQ,EAAX,EAAe;AACb,WAAO,KAAKT,GAAL,CAASC,SAAT,CAAmBQ,EAAnB,CAAP;AACD;;AAEDG,cAAaH,EAAb,EAAiB;AACf,WAAO,KAAKT,GAAL,CAASY,WAAT,CAAqBH,EAArB,CAAP;AACD;;AAEDI,UAAS;AACP,WAAOtB,QAAQuB,GAAR,CAAY,CAAC,KAAKf,GAAL,CAASgB,IAAT,EAAD,EAAkB,KAAKf,GAAL,CAASe,IAAT,EAAlB,CAAZ,CAAP;AACD;AA9BuC;;AAiC1C,MAAMC,eAAN,SAA8BrB,YAA9B,CAA2C;AACzCE,cAAaC,OAAb,EAAsB;AACpB;AACD;;AAEDa,UAASF,EAAT,EAAaD,IAAb,EAAmB;AACjB,WAAOjB,QAAQ0B,OAAR,GACJC,IADI,CACC,MAAM,KAAKZ,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB,CADP,CAAP;AAED;;AAEDP,YAAWQ,EAAX,EAAe;AACb,WAAOlB,QAAQ0B,OAAR,EAAP;AACD;;AAEDL,cAAaH,EAAb,EAAiB;AACf,WAAOlB,QAAQ0B,OAAR,EAAP;AACD;;AAEDJ,UAAS;AACP,WAAOtB,QAAQ0B,OAAR,EAAP;AACD;AApBwC;;AAuB3C;;;;;;;;;;AAUA;;;AAGA,MAAME,mBAAN,SAAkCxB,YAAlC,CAA+C;AAC7C;;;;;;;AAOAE,cAAaC,OAAb,EAAsB;AACpB;AACA,QAAIA,WAAW,IAAX,IAAmB,OAAOA,OAAP,KAAmB,QAA1C,EAAoD;AAClDA,gBAAU,EAAEsB,SAAStB,OAAX,EAAV;AACD;AACD,SAAKuB,MAAL,GAAcvB,QAAQuB,MAAR,IAAkB,wBAAhC;AACA,SAAKC,cAAL,GAAsBxB,QAAQwB,cAA9B;AACA,SAAKC,cAAL,GAAsB,IAAIC,GAAJ,EAAtB;AACA,SAAKC,cAAL,GAAsB,IAAID,GAAJ,EAAtB;AACA,QAAI1B,QAAQsB,OAAR,IAAmBtB,QAAQ4B,SAA/B,EAA0C;AACxC,WAAKA,SAAL,GAAiB5B,QAAQ4B,SAAR,IAAqB,IAAI9B,cAAJ,CAAmBE,QAAQsB,OAA3B,CAAtC;AACA,WAAKO,SAAL,GAAiB,IAAjB;AACD,KAHD,MAGO;AACL,WAAKD,SAAL,GAAiB,IAAIV,eAAJ,EAAjB;AACA,WAAKW,SAAL,GAAiB,KAAjB;AACD;AACD,SAAKD,SAAL,CAAevB,EAAf,CAAkB,SAAlB,EAA6B,KAAKyB,SAAL,CAAevB,IAAf,CAAoB,IAApB,CAA7B;AACA;;;;;;;AAOA,SAAKqB,SAAL,CAAevB,EAAf,CAAkB,OAAlB,EAA2B,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAA3B;AACD;;AAEDwB,oBAAmBC,MAAnB,EAA2BrB,EAA3B,EAA+B;AAC7B,QAAIsB,UAAU,KAAKN,cAAL,CAAoBO,GAApB,CAAwBvB,EAAxB,CAAd;AACA,QAAIsB,WAAW,IAAf,EAAqB;AACnBA,gBAAU,IAAIE,GAAJ,EAAV;AACA,WAAKR,cAAL,CAAoBS,GAApB,CAAwBzB,EAAxB,EAA4BsB,OAA5B;AACAA,cAAQI,GAAR,CAAYL,MAAZ;AACA,aAAO,KAAKJ,SAAL,CAAezB,SAAf,CAAyBQ,EAAzB,CAAP;AACD,KALD,MAKO;AACLsB,cAAQI,GAAR,CAAYL,MAAZ;AACA,aAAOvC,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAEDmB,uBAAsBN,MAAtB,EAA8BrB,EAA9B,EAAkC;AAChC,QAAIsB,UAAU,KAAKN,cAAL,CAAoBO,GAApB,CAAwBvB,EAAxB,CAAd;AACA,QAAI4B,QAAJ;AACA,QAAIN,WAAW,IAAf,EAAqB;AACnBA,cAAQO,MAAR,CAAeR,MAAf;AACAO,iBAAWN,QAAQQ,IAAnB;AACD;AACD,QAAIF,aAAa,CAAjB,EAAoB;AAClB,aAAO,KAAKX,SAAL,CAAed,WAAf,CAA2BH,EAA3B,CAAP;AACD,KAFD,MAEO;AACL,aAAOlB,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAEDuB,eAAcC,GAAd,EAAmB;AACjB,WAAO,KAAKd,SAAL,GAAiBjC,QAAQgD,MAAR,CAAeD,GAAf,CAAjB,GAAuCA,GAA9C;AACD;;AAED;;;;;;;AAOAxC,YAAW6B,MAAX,EAAmBa,OAAnB,EAA4B;AAC1B,QAAIC,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,QAAI,CAACc,QAAL,EAAe;AACbA,iBAAW,IAAIX,GAAJ,EAAX;AACA,WAAKV,cAAL,CAAoBW,GAApB,CAAwBJ,MAAxB,EAAgCc,QAAhC;AACD;AACD,QAAInC,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACAC,aAAST,GAAT,CAAa1B,EAAb;AACA,WAAO,KAAKoB,iBAAL,CAAuBC,MAAvB,EAA+BrB,EAA/B,CAAP;AACD;;AAED;;;;;;;AAOAG,cAAakB,MAAb,EAAqBa,OAArB,EAA8B;AAC5B,QAAIC,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,QAAIrB,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACA,QAAIC,QAAJ,EAAc;AACZA,eAASN,MAAT,CAAgB7B,EAAhB;AACD;AACD,WAAO,KAAK2B,oBAAL,CAA0BN,MAA1B,EAAkCrB,EAAlC,CAAP;AACD;;AAED;;;;;;AAMAoC,iBAAgBf,MAAhB,EAAwB;AACtB,QAAIc,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,SAAKP,cAAL,CAAoBe,MAApB,CAA2BR,MAA3B;AACA,QAAIc,QAAJ,EAAc;AACZ,aAAOrD,QAAQuD,IAAR,CACLF,QADK,EACK,KAAKR,oBAAL,CAA0B/B,IAA1B,CAA+B,IAA/B,EAAqCyB,MAArC,CADL,CAAP;AAED,KAHD,MAGO;AACL,aAAOvC,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAED;;;;;;;;AAQAN,UAASgC,OAAT,EAAkBI,IAAlB,EAAiC;AAAA,sCAANC,IAAM;AAANA,UAAM;AAAA;;AAC/B,QAAIvC,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACA,QAAIF,MAAM,KAAKD,YAAL,CAAkB,EAACO,UAAD,EAAOC,UAAP,EAAlB,CAAV;AACA,WAAO,KAAKtB,SAAL,CAAef,OAAf,CAAuBF,EAAvB,EAA2BgC,GAA3B,CAAP;AACD;;AAED;;;;;;;;;;AAUAQ,OAAMnB,MAAN,EAAca,OAAd,EAAuBI,IAAvB,EAAsC;AAAA,uCAANC,IAAM;AAANA,UAAM;AAAA;;AACpC,QAAIvC,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACA,QAAIO,SAASpB,OAAOqB,EAApB;AACA,QAAIV,MAAM,KAAKD,YAAL,CAAkB,EAACU,cAAD,EAASH,UAAT,EAAeC,UAAf,EAAlB,CAAV;AACA,WAAO,KAAKtB,SAAL,CAAef,OAAf,CAAuBF,EAAvB,EAA2BgC,GAA3B,CAAP;AACD;;AAED;;;;;;AAMAW,mBAAkBtB,MAAlB,EAA0B;AACxB,QAAIc,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,QAAIuB,MAAM,EAAV;AACA,QAAIT,QAAJ,EAAc;AACZ,UAAIU,OAAO,KAAKjC,MAAL,CAAYkC,MAAvB;AACA,WAAK,IAAIZ,OAAT,IAAoBC,QAApB,EAA8B;AAC5BS,YAAIG,IAAJ,CAASb,QAAQc,KAAR,CAAcH,IAAd,CAAT;AACD;AACF;AACD,WAAOD,GAAP;AACD;;AAED;;;;;AAKAxC,UAAS;AACP,SAAKY,cAAL,CAAoBiC,KAApB;AACA,SAAKnC,cAAL,CAAoBmC,KAApB;AACA,WAAO,KAAKhC,SAAL,CAAeb,KAAf,EAAP;AACD;;AAEDe,YAAWnB,EAAX,EAAeD,IAAf,EAAqB;AACnB,QAAImC,UAAUlC,GAAGgD,KAAH,CAAS,KAAKpC,MAAL,CAAYkC,MAArB,CAAd;AACA,QAAII,UAAU,KAAKhC,SAAL,GAAiBjC,QAAQkE,MAAR,CAAepD,IAAf,CAAjB,GAAwCA,IAAtD;AACA,QAAIuB,UAAU,KAAKN,cAAL,CAAoBO,GAApB,CAAwBvB,EAAxB,CAAd;AACA;AACA,QAAIsB,OAAJ,EAAa;AACX,UAAIiB,IAAJ;AACA,UAAI,KAAK1B,cAAT,EAAyB;AACvB0B,gBAAQW,QAAQZ,IAAhB,EAAsBJ,OAAtB,4BAAkCgB,QAAQX,IAA1C;AACD,OAFD,MAEO;AACLA,gBAAQW,QAAQZ,IAAhB,4BAAyBY,QAAQX,IAAjC;AACD;AACD,WAAK,IAAIlB,MAAT,IAAmBC,OAAnB,EAA4B;AAC1B,YAAI,CAAC4B,QAAQT,MAAT,IAAmBpB,OAAOqB,EAAP,KAAcQ,QAAQT,MAA7C,EAAqD;AACnDpB,iBAAOxB,IAAP,kCAAe0C,IAAf;AACD;AACF;AACF;AACF;;AAnM4C;;AAuM/Ca,OAAOC,OAAP,GAAiB3C,mBAAjB","file":"EmitterPubsubBroker.js","sourcesContent":["'use strict'\n\nconst Promise = require('bluebird')\nconst Redis = require('ioredis')\nconst msgpack = require('msgpack-lite')\nconst { EventEmitter } = require('eventemitter3')\n\n/**\n * Interface for connector implementations.\n *\n * @interface Connector\n * @extends EventEmitter\n */\n\n/**\n * @method\n * @instance\n * @name publish\n * @memberOf Connector\n * @param {string} channel Channel.\n * @param {Buffer} data Data.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name subscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name unsubscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name close\n * @memberOf Connector\n * @return {Promise<undefined>}\n */\n\n/**\n * @event message\n * @memberOf Connector\n * @param {string} event Event name.\n * @param {Buffer} data Event data.\n */\n\n/**\n * Event will be listened by {@link EmitterPubsubBroker} instance.\n *\n * @event error\n * @memberOf Connector\n * @param {Error} error Error.\n */\n\nclass RedisConnector extends EventEmitter {\n  constructor (options) {\n    super()\n    this.pub = new Redis(options)\n    this.sub = new Redis(options)\n    this.sub.subscribe().catchReturn()\n    this.sub.on('messageBuffer', this._onMessage.bind(this))\n    this.sub.on('error', this.emit.bind(this))\n    this.pub.on('error', this.emit.bind(this))\n  }\n\n  _onMessage (buf, data) {\n    let ch = buf.toString()\n    this.emit('message', ch, data)\n  }\n\n  publish (ch, data) {\n    return this.pub.publish(ch, data)\n  }\n\n  subscribe (ch) {\n    return this.sub.subscribe(ch)\n  }\n\n  unsubscribe (ch) {\n    return this.sub.unsubscribe(ch)\n  }\n\n  close () {\n    return Promise.all([this.pub.quit(), this.sub.quit()])\n  }\n}\n\nclass MemoryConnector extends EventEmitter {\n  constructor (options) {\n    super()\n  }\n\n  publish (ch, data) {\n    return Promise.resolve()\n      .then(() => this.emit('message', ch, data))\n  }\n\n  subscribe (ch) {\n    return Promise.resolve()\n  }\n\n  unsubscribe (ch) {\n    return Promise.resolve()\n  }\n\n  close () {\n    return Promise.resolve()\n  }\n}\n\n/**\n * @typedef {Object} EmitterPubsubBroker.Options\n *\n * @property {string} connect Connect string.\n * @property {string} [prefix='emitter-pubsub-broker:'] Prefix.\n * @property {boolean} [includeChannel=false] Include channel as the\n * first argument.\n * @property {Connector} [connector] Custom connector implementation.\n */\n\n/**\n * @extends EventEmitter\n */\nclass EmitterPubsubBroker extends EventEmitter {\n  /**\n   * Creates a broker.\n   *\n   * @param {EmitterPubsubBroker.Options|string} options Options or a\n   * connect if a string. If connect string is empty, then an\n   * in-memory connector is used.\n   */\n  constructor (options) {\n    super()\n    if (options == null || typeof options === 'string') {\n      options = { connect: options }\n    }\n    this.prefix = options.prefix || 'emitter-pubsub-broker:'\n    this.includeChannel = options.includeChannel\n    this.clientChannels = new Map()\n    this.channelClients = new Map()\n    if (options.connect || options.connector) {\n      this.connector = options.connector || new RedisConnector(options.connect)\n      this.serialize = true\n    } else {\n      this.connector = new MemoryConnector()\n      this.serialize = false\n    }\n    this.connector.on('message', this._dispatch.bind(this))\n    /**\n     * Connector error. Does not throw if there are no listeners.\n     *\n     * @event error\n     * @memberOf EmitterPubsubBroker\n     * @param {Error} error Error.\n     */\n    this.connector.on('error', this.emit.bind(this))\n  }\n\n  _channelAddClient (client, ch) {\n    let clients = this.channelClients.get(ch)\n    if (clients == null) {\n      clients = new Set()\n      this.channelClients.set(ch, clients)\n      clients.add(client)\n      return this.connector.subscribe(ch)\n    } else {\n      clients.add(client)\n      return Promise.resolve()\n    }\n  }\n\n  _channelRemoveClient (client, ch) {\n    let clients = this.channelClients.get(ch)\n    let nclients\n    if (clients != null) {\n      clients.delete(client)\n      nclients = clients.size\n    }\n    if (nclients === 0) {\n      return this.connector.unsubscribe(ch)\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  _makeMessage (msg) {\n    return this.serialize ? msgpack.encode(msg) : msg\n  }\n\n  /**\n   * Subscribes emitter to a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  subscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    if (!channels) {\n      channels = new Set()\n      this.clientChannels.set(client, channels)\n    }\n    let ch = this.prefix + channel\n    channels.add(ch)\n    return this._channelAddClient(client, ch)\n  }\n\n  /**\n   * Unsubscribes emitter from a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  unsubscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    let ch = this.prefix + channel\n    if (channels) {\n      channels.delete(ch)\n    }\n    return this._channelRemoveClient(client, ch)\n  }\n\n  /**\n   * Unsubscribes emitter from all channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Promise<undefined>}\n   */\n  unsubscribeall (client) {\n    let channels = this.clientChannels.get(client)\n    this.clientChannels.delete(client)\n    if (channels) {\n      return Promise.each(\n        channels, this._channelRemoveClient.bind(this, client))\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  /**\n   * Publish an event to a channel.\n   *\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  publish (channel, name, ...args) {\n    let ch = this.prefix + channel\n    let msg = this._makeMessage({name, args})\n    return this.connector.publish(ch, msg)\n  }\n\n  /**\n   * Publish an event to a channel, excluding the sender. The client\n   * object __MUST__ have an unique `id` field.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  send (client, channel, name, ...args) {\n    let ch = this.prefix + channel\n    let sender = client.id\n    let msg = this._makeMessage({sender, name, args})\n    return this.connector.publish(ch, msg)\n  }\n\n  /**\n   * Returns client subscriptions.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Array<string>}\n   */\n  getSubscriptions (client) {\n    let channels = this.clientChannels.get(client)\n    let res = []\n    if (channels) {\n      let plen = this.prefix.length\n      for (let channel of channels) {\n        res.push(channel.slice(plen))\n      }\n    }\n    return res\n  }\n\n  /**\n   * Closes broker.\n   *\n   * @return {Promise<undefined>}\n   */\n  close () {\n    this.channelClients.clear()\n    this.clientChannels.clear()\n    return this.connector.close()\n  }\n\n  _dispatch (ch, data) {\n    let channel = ch.slice(this.prefix.length)\n    let message = this.serialize ? msgpack.decode(data) : data\n    let clients = this.channelClients.get(ch)\n    /* istanbul ignore else */\n    if (clients) {\n      let args\n      if (this.includeChannel) {\n        args = [message.name, channel, ...message.args]\n      } else {\n        args = [message.name, ...message.args]\n      }\n      for (let client of clients) {\n        if (!message.sender || client.id !== message.sender) {\n          client.emit(...args)\n        }\n      }\n    }\n  }\n\n}\n\nmodule.exports = EmitterPubsubBroker\n"]}
{
"name": "emitter-pubsub-broker",
"version": "0.3.0",
"version": "0.4.0",
"private": false,

@@ -35,3 +35,2 @@ "description": "An utility for connecting EventEmitters via a pubsub.",

"dependencies": {
"babel-runtime": "^6.9.2",
"bluebird": "^3.3.5",

@@ -45,4 +44,3 @@ "eventemitter3": "^2.0.2",

"babel-cli": "^6.11.4",
"babel-plugin-transform-runtime": "^6.9.0",
"babel-preset-es2015": "^6.9.0",
"babel-preset-es2015-node4": "^2.1.0",
"chai": "^3.5.0",

@@ -54,3 +52,3 @@ "codecov": "^1.0.1",

"mocha": "^3.0.0",
"nyc": "^8.1.0",
"nyc": "^10.0.0",
"standard": "^8.0.0"

@@ -57,0 +55,0 @@ },

@@ -49,3 +49,3 @@

[API](https://an-sh.github.io/emitter-pubsub-broker/0.3/index.html)
[API](https://an-sh.github.io/emitter-pubsub-broker/0.4/index.html)
documentation is available online.

@@ -52,0 +52,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc