emitter-pubsub-broker
Advanced tools
Comparing version 0.3.0 to 0.4.0
@@ -5,2 +5,17 @@ # Change Log | ||
<a name="0.4.0"></a> | ||
# [0.4.0](https://github.com/an-sh/emitter-pubsub-broker/compare/v0.3.0...v0.4.0) (2016-11-29) | ||
### Chores | ||
* use es2015-node4 preset ([5e05222](https://github.com/an-sh/emitter-pubsub-broker/commit/5e05222)) | ||
### BREAKING CHANGES | ||
* Possible node 4.x regression due to the preset change. | ||
<a name="0.3.0"></a> | ||
@@ -7,0 +22,0 @@ # [0.3.0](https://github.com/an-sh/emitter-pubsub-broker/compare/v0.2.1...v0.3.0) (2016-10-21) |
'use strict'; | ||
var _toConsumableArray2 = require('babel-runtime/helpers/toConsumableArray'); | ||
function _toConsumableArray(arr) { if (Array.isArray(arr)) { for (var i = 0, arr2 = Array(arr.length); i < arr.length; i++) arr2[i] = arr[i]; return arr2; } else { return Array.from(arr); } } | ||
var _toConsumableArray3 = _interopRequireDefault(_toConsumableArray2); | ||
const Promise = require('bluebird'); | ||
const Redis = require('ioredis'); | ||
const msgpack = require('msgpack-lite'); | ||
var _getIterator2 = require('babel-runtime/core-js/get-iterator'); | ||
var _getIterator3 = _interopRequireDefault(_getIterator2); | ||
var _set = require('babel-runtime/core-js/set'); | ||
var _set2 = _interopRequireDefault(_set); | ||
var _map = require('babel-runtime/core-js/map'); | ||
var _map2 = _interopRequireDefault(_map); | ||
var _getPrototypeOf = require('babel-runtime/core-js/object/get-prototype-of'); | ||
var _getPrototypeOf2 = _interopRequireDefault(_getPrototypeOf); | ||
var _classCallCheck2 = require('babel-runtime/helpers/classCallCheck'); | ||
var _classCallCheck3 = _interopRequireDefault(_classCallCheck2); | ||
var _createClass2 = require('babel-runtime/helpers/createClass'); | ||
var _createClass3 = _interopRequireDefault(_createClass2); | ||
var _possibleConstructorReturn2 = require('babel-runtime/helpers/possibleConstructorReturn'); | ||
var _possibleConstructorReturn3 = _interopRequireDefault(_possibleConstructorReturn2); | ||
var _inherits2 = require('babel-runtime/helpers/inherits'); | ||
var _inherits3 = _interopRequireDefault(_inherits2); | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
var Promise = require('bluebird'); | ||
var Redis = require('ioredis'); | ||
var msgpack = require('msgpack-lite'); | ||
var _require = require('eventemitter3'); | ||
var EventEmitter = _require.EventEmitter; | ||
const EventEmitter = _require.EventEmitter; | ||
@@ -107,85 +71,57 @@ /** | ||
var RedisConnector = function (_EventEmitter) { | ||
(0, _inherits3.default)(RedisConnector, _EventEmitter); | ||
class RedisConnector extends EventEmitter { | ||
constructor(options) { | ||
super(); | ||
this.pub = new Redis(options); | ||
this.sub = new Redis(options); | ||
this.sub.subscribe().catchReturn(); | ||
this.sub.on('messageBuffer', this._onMessage.bind(this)); | ||
this.sub.on('error', this.emit.bind(this)); | ||
this.pub.on('error', this.emit.bind(this)); | ||
} | ||
function RedisConnector(options) { | ||
(0, _classCallCheck3.default)(this, RedisConnector); | ||
_onMessage(buf, data) { | ||
let ch = buf.toString(); | ||
this.emit('message', ch, data); | ||
} | ||
var _this = (0, _possibleConstructorReturn3.default)(this, (RedisConnector.__proto__ || (0, _getPrototypeOf2.default)(RedisConnector)).call(this)); | ||
publish(ch, data) { | ||
return this.pub.publish(ch, data); | ||
} | ||
_this.pub = new Redis(options); | ||
_this.sub = new Redis(options); | ||
_this.sub.subscribe().catchReturn(); | ||
_this.sub.on('messageBuffer', _this._onMessage.bind(_this)); | ||
_this.sub.on('error', _this.emit.bind(_this)); | ||
_this.pub.on('error', _this.emit.bind(_this)); | ||
return _this; | ||
subscribe(ch) { | ||
return this.sub.subscribe(ch); | ||
} | ||
(0, _createClass3.default)(RedisConnector, [{ | ||
key: '_onMessage', | ||
value: function _onMessage(buf, data) { | ||
var ch = buf.toString(); | ||
this.emit('message', ch, data); | ||
} | ||
}, { | ||
key: 'publish', | ||
value: function publish(ch, data) { | ||
return this.pub.publish(ch, data); | ||
} | ||
}, { | ||
key: 'subscribe', | ||
value: function subscribe(ch) { | ||
return this.sub.subscribe(ch); | ||
} | ||
}, { | ||
key: 'unsubscribe', | ||
value: function unsubscribe(ch) { | ||
return this.sub.unsubscribe(ch); | ||
} | ||
}, { | ||
key: 'close', | ||
value: function close() { | ||
return Promise.all([this.pub.quit(), this.sub.quit()]); | ||
} | ||
}]); | ||
return RedisConnector; | ||
}(EventEmitter); | ||
unsubscribe(ch) { | ||
return this.sub.unsubscribe(ch); | ||
} | ||
var MemoryConnector = function (_EventEmitter2) { | ||
(0, _inherits3.default)(MemoryConnector, _EventEmitter2); | ||
close() { | ||
return Promise.all([this.pub.quit(), this.sub.quit()]); | ||
} | ||
} | ||
function MemoryConnector(options) { | ||
(0, _classCallCheck3.default)(this, MemoryConnector); | ||
return (0, _possibleConstructorReturn3.default)(this, (MemoryConnector.__proto__ || (0, _getPrototypeOf2.default)(MemoryConnector)).call(this)); | ||
class MemoryConnector extends EventEmitter { | ||
constructor(options) { | ||
super(); | ||
} | ||
(0, _createClass3.default)(MemoryConnector, [{ | ||
key: 'publish', | ||
value: function publish(ch, data) { | ||
var _this3 = this; | ||
publish(ch, data) { | ||
return Promise.resolve().then(() => this.emit('message', ch, data)); | ||
} | ||
return Promise.resolve().then(function () { | ||
return _this3.emit('message', ch, data); | ||
}); | ||
} | ||
}, { | ||
key: 'subscribe', | ||
value: function subscribe(ch) { | ||
return Promise.resolve(); | ||
} | ||
}, { | ||
key: 'unsubscribe', | ||
value: function unsubscribe(ch) { | ||
return Promise.resolve(); | ||
} | ||
}, { | ||
key: 'close', | ||
value: function close() { | ||
return Promise.resolve(); | ||
} | ||
}]); | ||
return MemoryConnector; | ||
}(EventEmitter); | ||
subscribe(ch) { | ||
return Promise.resolve(); | ||
} | ||
unsubscribe(ch) { | ||
return Promise.resolve(); | ||
} | ||
close() { | ||
return Promise.resolve(); | ||
} | ||
} | ||
/** | ||
@@ -204,7 +140,3 @@ * @typedef {Object} EmitterPubsubBroker.Options | ||
*/ | ||
var EmitterPubsubBroker = function (_EventEmitter3) { | ||
(0, _inherits3.default)(EmitterPubsubBroker, _EventEmitter3); | ||
class EmitterPubsubBroker extends EventEmitter { | ||
/** | ||
@@ -217,22 +149,19 @@ * Creates a broker. | ||
*/ | ||
function EmitterPubsubBroker(options) { | ||
(0, _classCallCheck3.default)(this, EmitterPubsubBroker); | ||
var _this4 = (0, _possibleConstructorReturn3.default)(this, (EmitterPubsubBroker.__proto__ || (0, _getPrototypeOf2.default)(EmitterPubsubBroker)).call(this)); | ||
constructor(options) { | ||
super(); | ||
if (options == null || typeof options === 'string') { | ||
options = { connect: options }; | ||
} | ||
_this4.prefix = options.prefix || 'emitter-pubsub-broker:'; | ||
_this4.includeChannel = options.includeChannel; | ||
_this4.clientChannels = new _map2.default(); | ||
_this4.channelClients = new _map2.default(); | ||
this.prefix = options.prefix || 'emitter-pubsub-broker:'; | ||
this.includeChannel = options.includeChannel; | ||
this.clientChannels = new Map(); | ||
this.channelClients = new Map(); | ||
if (options.connect || options.connector) { | ||
_this4.connector = options.connector || new RedisConnector(options.connect); | ||
_this4.serialize = true; | ||
this.connector = options.connector || new RedisConnector(options.connect); | ||
this.serialize = true; | ||
} else { | ||
_this4.connector = new MemoryConnector(); | ||
_this4.serialize = false; | ||
this.connector = new MemoryConnector(); | ||
this.serialize = false; | ||
} | ||
_this4.connector.on('message', _this4._dispatch.bind(_this4)); | ||
this.connector.on('message', this._dispatch.bind(this)); | ||
/** | ||
@@ -245,247 +174,177 @@ * Connector error. Does not throw if there are no listeners. | ||
*/ | ||
_this4.connector.on('error', _this4.emit.bind(_this4)); | ||
return _this4; | ||
this.connector.on('error', this.emit.bind(this)); | ||
} | ||
(0, _createClass3.default)(EmitterPubsubBroker, [{ | ||
key: '_channelAddClient', | ||
value: function _channelAddClient(client, ch) { | ||
var clients = this.channelClients.get(ch); | ||
if (clients == null) { | ||
clients = new _set2.default(); | ||
this.channelClients.set(ch, clients); | ||
clients.add(client); | ||
return this.connector.subscribe(ch); | ||
} else { | ||
clients.add(client); | ||
return Promise.resolve(); | ||
} | ||
_channelAddClient(client, ch) { | ||
let clients = this.channelClients.get(ch); | ||
if (clients == null) { | ||
clients = new Set(); | ||
this.channelClients.set(ch, clients); | ||
clients.add(client); | ||
return this.connector.subscribe(ch); | ||
} else { | ||
clients.add(client); | ||
return Promise.resolve(); | ||
} | ||
}, { | ||
key: '_channelRemoveClient', | ||
value: function _channelRemoveClient(client, ch) { | ||
var clients = this.channelClients.get(ch); | ||
var nclients = void 0; | ||
if (clients != null) { | ||
clients.delete(client); | ||
nclients = clients.size; | ||
} | ||
if (nclients === 0) { | ||
return this.connector.unsubscribe(ch); | ||
} else { | ||
return Promise.resolve(); | ||
} | ||
} | ||
_channelRemoveClient(client, ch) { | ||
let clients = this.channelClients.get(ch); | ||
let nclients; | ||
if (clients != null) { | ||
clients.delete(client); | ||
nclients = clients.size; | ||
} | ||
}, { | ||
key: '_makeMessage', | ||
value: function _makeMessage(msg) { | ||
return this.serialize ? msgpack.encode(msg) : msg; | ||
if (nclients === 0) { | ||
return this.connector.unsubscribe(ch); | ||
} else { | ||
return Promise.resolve(); | ||
} | ||
} | ||
/** | ||
* Subscribes emitter to a channel. | ||
* | ||
* @param {EventEmitter} client Emitter. | ||
* @param {string} channel Channel. | ||
* @return {Promise<undefined>} | ||
*/ | ||
_makeMessage(msg) { | ||
return this.serialize ? msgpack.encode(msg) : msg; | ||
} | ||
}, { | ||
key: 'subscribe', | ||
value: function subscribe(client, channel) { | ||
var channels = this.clientChannels.get(client); | ||
if (!channels) { | ||
channels = new _set2.default(); | ||
this.clientChannels.set(client, channels); | ||
} | ||
var ch = this.prefix + channel; | ||
channels.add(ch); | ||
return this._channelAddClient(client, ch); | ||
/** | ||
* Subscribes emitter to a channel. | ||
* | ||
* @param {EventEmitter} client Emitter. | ||
* @param {string} channel Channel. | ||
* @return {Promise<undefined>} | ||
*/ | ||
subscribe(client, channel) { | ||
let channels = this.clientChannels.get(client); | ||
if (!channels) { | ||
channels = new Set(); | ||
this.clientChannels.set(client, channels); | ||
} | ||
let ch = this.prefix + channel; | ||
channels.add(ch); | ||
return this._channelAddClient(client, ch); | ||
} | ||
/** | ||
* Unsubscribes emitter from a channel. | ||
* | ||
* @param {EventEmitter} client Emitter. | ||
* @param {string} channel Channel. | ||
* @return {Promise<undefined>} | ||
*/ | ||
/** | ||
* Unsubscribes emitter from a channel. | ||
* | ||
* @param {EventEmitter} client Emitter. | ||
* @param {string} channel Channel. | ||
* @return {Promise<undefined>} | ||
*/ | ||
unsubscribe(client, channel) { | ||
let channels = this.clientChannels.get(client); | ||
let ch = this.prefix + channel; | ||
if (channels) { | ||
channels.delete(ch); | ||
} | ||
return this._channelRemoveClient(client, ch); | ||
} | ||
}, { | ||
key: 'unsubscribe', | ||
value: function unsubscribe(client, channel) { | ||
var channels = this.clientChannels.get(client); | ||
var ch = this.prefix + channel; | ||
if (channels) { | ||
channels.delete(ch); | ||
} | ||
return this._channelRemoveClient(client, ch); | ||
/** | ||
* Unsubscribes emitter from all channel. | ||
* | ||
* @param {EventEmitter} client Emitter. | ||
* @return {Promise<undefined>} | ||
*/ | ||
unsubscribeall(client) { | ||
let channels = this.clientChannels.get(client); | ||
this.clientChannels.delete(client); | ||
if (channels) { | ||
return Promise.each(channels, this._channelRemoveClient.bind(this, client)); | ||
} else { | ||
return Promise.resolve(); | ||
} | ||
} | ||
/** | ||
* Unsubscribes emitter from all channel. | ||
* | ||
* @param {EventEmitter} client Emitter. | ||
* @return {Promise<undefined>} | ||
*/ | ||
}, { | ||
key: 'unsubscribeall', | ||
value: function unsubscribeall(client) { | ||
var channels = this.clientChannels.get(client); | ||
this.clientChannels.delete(client); | ||
if (channels) { | ||
return Promise.each(channels, this._channelRemoveClient.bind(this, client)); | ||
} else { | ||
return Promise.resolve(); | ||
} | ||
/** | ||
* Publish an event to a channel. | ||
* | ||
* @param {string} channel Channel. | ||
* @param {string} name Event name. | ||
* @param {*} args Arguments. | ||
* @return {Promise<undefined>} | ||
*/ | ||
publish(channel, name) { | ||
for (var _len = arguments.length, args = Array(_len > 2 ? _len - 2 : 0), _key = 2; _key < _len; _key++) { | ||
args[_key - 2] = arguments[_key]; | ||
} | ||
/** | ||
* Publish an event to a channel. | ||
* | ||
* @param {string} channel Channel. | ||
* @param {string} name Event name. | ||
* @param {*} args Arguments. | ||
* @return {Promise<undefined>} | ||
*/ | ||
let ch = this.prefix + channel; | ||
let msg = this._makeMessage({ name: name, args: args }); | ||
return this.connector.publish(ch, msg); | ||
} | ||
}, { | ||
key: 'publish', | ||
value: function publish(channel, name) { | ||
for (var _len = arguments.length, args = Array(_len > 2 ? _len - 2 : 0), _key = 2; _key < _len; _key++) { | ||
args[_key - 2] = arguments[_key]; | ||
} | ||
var ch = this.prefix + channel; | ||
var msg = this._makeMessage({ name: name, args: args }); | ||
return this.connector.publish(ch, msg); | ||
/** | ||
* Publish an event to a channel, excluding the sender. The client | ||
* object __MUST__ have an unique `id` field. | ||
* | ||
* @param {EventEmitter} client Emitter. | ||
* @param {string} channel Channel. | ||
* @param {string} name Event name. | ||
* @param {*} args Arguments. | ||
* @return {Promise<undefined>} | ||
*/ | ||
send(client, channel, name) { | ||
for (var _len2 = arguments.length, args = Array(_len2 > 3 ? _len2 - 3 : 0), _key2 = 3; _key2 < _len2; _key2++) { | ||
args[_key2 - 3] = arguments[_key2]; | ||
} | ||
/** | ||
* Publish an event to a channel, excluding the sender. The client | ||
* object __MUST__ have an unique `id` field. | ||
* | ||
* @param {EventEmitter} client Emitter. | ||
* @param {string} channel Channel. | ||
* @param {string} name Event name. | ||
* @param {*} args Arguments. | ||
* @return {Promise<undefined>} | ||
*/ | ||
let ch = this.prefix + channel; | ||
let sender = client.id; | ||
let msg = this._makeMessage({ sender: sender, name: name, args: args }); | ||
return this.connector.publish(ch, msg); | ||
} | ||
}, { | ||
key: 'send', | ||
value: function send(client, channel, name) { | ||
for (var _len2 = arguments.length, args = Array(_len2 > 3 ? _len2 - 3 : 0), _key2 = 3; _key2 < _len2; _key2++) { | ||
args[_key2 - 3] = arguments[_key2]; | ||
/** | ||
* Returns client subscriptions. | ||
* | ||
* @param {EventEmitter} client Emitter. | ||
* @return {Array<string>} | ||
*/ | ||
getSubscriptions(client) { | ||
let channels = this.clientChannels.get(client); | ||
let res = []; | ||
if (channels) { | ||
let plen = this.prefix.length; | ||
for (let channel of channels) { | ||
res.push(channel.slice(plen)); | ||
} | ||
var ch = this.prefix + channel; | ||
var sender = client.id; | ||
var msg = this._makeMessage({ sender: sender, name: name, args: args }); | ||
return this.connector.publish(ch, msg); | ||
} | ||
return res; | ||
} | ||
/** | ||
* Returns client subscriptions. | ||
* | ||
* @param {EventEmitter} client Emitter. | ||
* @return {Array<string>} | ||
*/ | ||
/** | ||
* Closes broker. | ||
* | ||
* @return {Promise<undefined>} | ||
*/ | ||
close() { | ||
this.channelClients.clear(); | ||
this.clientChannels.clear(); | ||
return this.connector.close(); | ||
} | ||
}, { | ||
key: 'getSubscriptions', | ||
value: function getSubscriptions(client) { | ||
var channels = this.clientChannels.get(client); | ||
var res = []; | ||
if (channels) { | ||
var plen = this.prefix.length; | ||
var _iteratorNormalCompletion = true; | ||
var _didIteratorError = false; | ||
var _iteratorError = undefined; | ||
try { | ||
for (var _iterator = (0, _getIterator3.default)(channels), _step; !(_iteratorNormalCompletion = (_step = _iterator.next()).done); _iteratorNormalCompletion = true) { | ||
var channel = _step.value; | ||
res.push(channel.slice(plen)); | ||
} | ||
} catch (err) { | ||
_didIteratorError = true; | ||
_iteratorError = err; | ||
} finally { | ||
try { | ||
if (!_iteratorNormalCompletion && _iterator.return) { | ||
_iterator.return(); | ||
} | ||
} finally { | ||
if (_didIteratorError) { | ||
throw _iteratorError; | ||
} | ||
} | ||
_dispatch(ch, data) { | ||
let channel = ch.slice(this.prefix.length); | ||
let message = this.serialize ? msgpack.decode(data) : data; | ||
let clients = this.channelClients.get(ch); | ||
/* istanbul ignore else */ | ||
if (clients) { | ||
let args; | ||
if (this.includeChannel) { | ||
args = [message.name, channel].concat(_toConsumableArray(message.args)); | ||
} else { | ||
args = [message.name].concat(_toConsumableArray(message.args)); | ||
} | ||
for (let client of clients) { | ||
if (!message.sender || client.id !== message.sender) { | ||
client.emit.apply(client, _toConsumableArray(args)); | ||
} | ||
} | ||
return res; | ||
} | ||
} | ||
/** | ||
* Closes broker. | ||
* | ||
* @return {Promise<undefined>} | ||
*/ | ||
} | ||
}, { | ||
key: 'close', | ||
value: function close() { | ||
this.channelClients.clear(); | ||
this.clientChannels.clear(); | ||
return this.connector.close(); | ||
} | ||
}, { | ||
key: '_dispatch', | ||
value: function _dispatch(ch, data) { | ||
var channel = ch.slice(this.prefix.length); | ||
var message = this.serialize ? msgpack.decode(data) : data; | ||
var clients = this.channelClients.get(ch); | ||
/* istanbul ignore else */ | ||
if (clients) { | ||
var args = void 0; | ||
if (this.includeChannel) { | ||
args = [message.name, channel].concat((0, _toConsumableArray3.default)(message.args)); | ||
} else { | ||
args = [message.name].concat((0, _toConsumableArray3.default)(message.args)); | ||
} | ||
var _iteratorNormalCompletion2 = true; | ||
var _didIteratorError2 = false; | ||
var _iteratorError2 = undefined; | ||
try { | ||
for (var _iterator2 = (0, _getIterator3.default)(clients), _step2; !(_iteratorNormalCompletion2 = (_step2 = _iterator2.next()).done); _iteratorNormalCompletion2 = true) { | ||
var client = _step2.value; | ||
if (!message.sender || client.id !== message.sender) { | ||
client.emit.apply(client, (0, _toConsumableArray3.default)(args)); | ||
} | ||
} | ||
} catch (err) { | ||
_didIteratorError2 = true; | ||
_iteratorError2 = err; | ||
} finally { | ||
try { | ||
if (!_iteratorNormalCompletion2 && _iterator2.return) { | ||
_iterator2.return(); | ||
} | ||
} finally { | ||
if (_didIteratorError2) { | ||
throw _iteratorError2; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
}]); | ||
return EmitterPubsubBroker; | ||
}(EventEmitter); | ||
module.exports = EmitterPubsubBroker; | ||
//# sourceMappingURL=data:application/json;base64,{"version":3,"sources":["../src/EmitterPubsubBroker.js"],"names":["Promise","require","Redis","msgpack","EventEmitter","RedisConnector","options","pub","sub","subscribe","catchReturn","on","_onMessage","bind","emit","buf","data","ch","toString","publish","unsubscribe","all","quit","MemoryConnector","resolve","then","EmitterPubsubBroker","connect","prefix","includeChannel","clientChannels","channelClients","connector","serialize","_dispatch","client","clients","get","set","add","nclients","delete","size","msg","encode","channel","channels","_channelAddClient","_channelRemoveClient","each","name","args","_makeMessage","sender","id","res","plen","length","push","slice","clear","close","message","decode","module","exports"],"mappings":"AAAA;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;AAEA,IAAMA,UAAUC,QAAQ,UAAR,CAAhB;AACA,IAAMC,QAAQD,QAAQ,SAAR,CAAd;AACA,IAAME,UAAUF,QAAQ,cAAR,CAAhB;;eACyBA,QAAQ,eAAR,C;;IAAjBG,Y,YAAAA,Y;;AAER;;;;;;;AAOA;;;;;;;;;;AAUA;;;;;;;;;AASA;;;;;;;;;AASA;;;;;;;;AAQA;;;;;;;AAOA;;;;;;;;IAQMC,c;;;AACJ,0BAAaC,OAAb,EAAsB;AAAA;;AAAA;;AAEpB,UAAKC,GAAL,GAAW,IAAIL,KAAJ,CAAUI,OAAV,CAAX;AACA,UAAKE,GAAL,GAAW,IAAIN,KAAJ,CAAUI,OAAV,CAAX;AACA,UAAKE,GAAL,CAASC,SAAT,GAAqBC,WAArB;AACA,UAAKF,GAAL,CAASG,EAAT,CAAY,eAAZ,EAA6B,MAAKC,UAAL,CAAgBC,IAAhB,OAA7B;AACA,UAAKL,GAAL,CAASG,EAAT,CAAY,OAAZ,EAAqB,MAAKG,IAAL,CAAUD,IAAV,OAArB;AACA,UAAKN,GAAL,CAASI,EAAT,CAAY,OAAZ,EAAqB,MAAKG,IAAL,CAAUD,IAAV,OAArB;AAPoB;AAQrB;;;;+BAEWE,G,EAAKC,I,EAAM;AACrB,UAAIC,KAAKF,IAAIG,QAAJ,EAAT;AACA,WAAKJ,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB;AACD;;;4BAEQC,E,EAAID,I,EAAM;AACjB,aAAO,KAAKT,GAAL,CAASY,OAAT,CAAiBF,EAAjB,EAAqBD,IAArB,CAAP;AACD;;;8BAEUC,E,EAAI;AACb,aAAO,KAAKT,GAAL,CAASC,SAAT,CAAmBQ,EAAnB,CAAP;AACD;;;gCAEYA,E,EAAI;AACf,aAAO,KAAKT,GAAL,CAASY,WAAT,CAAqBH,EAArB,CAAP;AACD;;;4BAEQ;AACP,aAAOjB,QAAQqB,GAAR,CAAY,CAAC,KAAKd,GAAL,CAASe,IAAT,EAAD,EAAkB,KAAKd,GAAL,CAASc,IAAT,EAAlB,CAAZ,CAAP;AACD;;;EA9B0BlB,Y;;IAiCvBmB,e;;;AACJ,2BAAajB,OAAb,EAAsB;AAAA;AAAA;AAErB;;;;4BAEQW,E,EAAID,I,EAAM;AAAA;;AACjB,aAAOhB,QAAQwB,OAAR,GACJC,IADI,CACC;AAAA,eAAM,OAAKX,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB,CAAN;AAAA,OADD,CAAP;AAED;;;8BAEUC,E,EAAI;AACb,aAAOjB,QAAQwB,OAAR,EAAP;AACD;;;gCAEYP,E,EAAI;AACf,aAAOjB,QAAQwB,OAAR,EAAP;AACD;;;4BAEQ;AACP,aAAOxB,QAAQwB,OAAR,EAAP;AACD;;;EApB2BpB,Y;;AAuB9B;;;;;;;;;;AAUA;;;;;IAGMsB,mB;;;AACJ;;;;;;;AAOA,+BAAapB,OAAb,EAAsB;AAAA;;AAAA;;AAEpB,QAAIA,WAAW,IAAX,IAAmB,OAAOA,OAAP,KAAmB,QAA1C,EAAoD;AAClDA,gBAAU,EAAEqB,SAASrB,OAAX,EAAV;AACD;AACD,WAAKsB,MAAL,GAActB,QAAQsB,MAAR,IAAkB,wBAAhC;AACA,WAAKC,cAAL,GAAsBvB,QAAQuB,cAA9B;AACA,WAAKC,cAAL,GAAsB,mBAAtB;AACA,WAAKC,cAAL,GAAsB,mBAAtB;AACA,QAAIzB,QAAQqB,OAAR,IAAmBrB,QAAQ0B,SAA/B,EAA0C;AACxC,aAAKA,SAAL,GAAiB1B,QAAQ0B,SAAR,IAAqB,IAAI3B,cAAJ,CAAmBC,QAAQqB,OAA3B,CAAtC;AACA,aAAKM,SAAL,GAAiB,IAAjB;AACD,KAHD,MAGO;AACL,aAAKD,SAAL,GAAiB,IAAIT,eAAJ,EAAjB;AACA,aAAKU,SAAL,GAAiB,KAAjB;AACD;AACD,WAAKD,SAAL,CAAerB,EAAf,CAAkB,SAAlB,EAA6B,OAAKuB,SAAL,CAAerB,IAAf,QAA7B;AACA;;;;;;;AAOA,WAAKmB,SAAL,CAAerB,EAAf,CAAkB,OAAlB,EAA2B,OAAKG,IAAL,CAAUD,IAAV,QAA3B;AAxBoB;AAyBrB;;;;sCAEkBsB,M,EAAQlB,E,EAAI;AAC7B,UAAImB,UAAU,KAAKL,cAAL,CAAoBM,GAApB,CAAwBpB,EAAxB,CAAd;AACA,UAAImB,WAAW,IAAf,EAAqB;AACnBA,kBAAU,mBAAV;AACA,aAAKL,cAAL,CAAoBO,GAApB,CAAwBrB,EAAxB,EAA4BmB,OAA5B;AACAA,gBAAQG,GAAR,CAAYJ,MAAZ;AACA,eAAO,KAAKH,SAAL,CAAevB,SAAf,CAAyBQ,EAAzB,CAAP;AACD,OALD,MAKO;AACLmB,gBAAQG,GAAR,CAAYJ,MAAZ;AACA,eAAOnC,QAAQwB,OAAR,EAAP;AACD;AACF;;;yCAEqBW,M,EAAQlB,E,EAAI;AAChC,UAAImB,UAAU,KAAKL,cAAL,CAAoBM,GAApB,CAAwBpB,EAAxB,CAAd;AACA,UAAIuB,iBAAJ;AACA,UAAIJ,WAAW,IAAf,EAAqB;AACnBA,gBAAQK,MAAR,CAAeN,MAAf;AACAK,mBAAWJ,QAAQM,IAAnB;AACD;AACD,UAAIF,aAAa,CAAjB,EAAoB;AAClB,eAAO,KAAKR,SAAL,CAAeZ,WAAf,CAA2BH,EAA3B,CAAP;AACD,OAFD,MAEO;AACL,eAAOjB,QAAQwB,OAAR,EAAP;AACD;AACF;;;iCAEamB,G,EAAK;AACjB,aAAO,KAAKV,SAAL,GAAiB9B,QAAQyC,MAAR,CAAeD,GAAf,CAAjB,GAAuCA,GAA9C;AACD;;AAED;;;;;;;;;;8BAOWR,M,EAAQU,O,EAAS;AAC1B,UAAIC,WAAW,KAAKhB,cAAL,CAAoBO,GAApB,CAAwBF,MAAxB,CAAf;AACA,UAAI,CAACW,QAAL,EAAe;AACbA,mBAAW,mBAAX;AACA,aAAKhB,cAAL,CAAoBQ,GAApB,CAAwBH,MAAxB,EAAgCW,QAAhC;AACD;AACD,UAAI7B,KAAK,KAAKW,MAAL,GAAciB,OAAvB;AACAC,eAASP,GAAT,CAAatB,EAAb;AACA,aAAO,KAAK8B,iBAAL,CAAuBZ,MAAvB,EAA+BlB,EAA/B,CAAP;AACD;;AAED;;;;;;;;;;gCAOakB,M,EAAQU,O,EAAS;AAC5B,UAAIC,WAAW,KAAKhB,cAAL,CAAoBO,GAApB,CAAwBF,MAAxB,CAAf;AACA,UAAIlB,KAAK,KAAKW,MAAL,GAAciB,OAAvB;AACA,UAAIC,QAAJ,EAAc;AACZA,iBAASL,MAAT,CAAgBxB,EAAhB;AACD;AACD,aAAO,KAAK+B,oBAAL,CAA0Bb,MAA1B,EAAkClB,EAAlC,CAAP;AACD;;AAED;;;;;;;;;mCAMgBkB,M,EAAQ;AACtB,UAAIW,WAAW,KAAKhB,cAAL,CAAoBO,GAApB,CAAwBF,MAAxB,CAAf;AACA,WAAKL,cAAL,CAAoBW,MAApB,CAA2BN,MAA3B;AACA,UAAIW,QAAJ,EAAc;AACZ,eAAO9C,QAAQiD,IAAR,CACLH,QADK,EACK,KAAKE,oBAAL,CAA0BnC,IAA1B,CAA+B,IAA/B,EAAqCsB,MAArC,CADL,CAAP;AAED,OAHD,MAGO;AACL,eAAOnC,QAAQwB,OAAR,EAAP;AACD;AACF;;AAED;;;;;;;;;;;4BAQSqB,O,EAASK,I,EAAe;AAAA,wCAANC,IAAM;AAANA,YAAM;AAAA;;AAC/B,UAAIlC,KAAK,KAAKW,MAAL,GAAciB,OAAvB;AACA,UAAIF,MAAM,KAAKS,YAAL,CAAkB,EAACF,UAAD,EAAOC,UAAP,EAAlB,CAAV;AACA,aAAO,KAAKnB,SAAL,CAAeb,OAAf,CAAuBF,EAAvB,EAA2B0B,GAA3B,CAAP;AACD;;AAED;;;;;;;;;;;;;yBAUMR,M,EAAQU,O,EAASK,I,EAAe;AAAA,yCAANC,IAAM;AAANA,YAAM;AAAA;;AACpC,UAAIlC,KAAK,KAAKW,MAAL,GAAciB,OAAvB;AACA,UAAIQ,SAASlB,OAAOmB,EAApB;AACA,UAAIX,MAAM,KAAKS,YAAL,CAAkB,EAACC,cAAD,EAASH,UAAT,EAAeC,UAAf,EAAlB,CAAV;AACA,aAAO,KAAKnB,SAAL,CAAeb,OAAf,CAAuBF,EAAvB,EAA2B0B,GAA3B,CAAP;AACD;;AAED;;;;;;;;;qCAMkBR,M,EAAQ;AACxB,UAAIW,WAAW,KAAKhB,cAAL,CAAoBO,GAApB,CAAwBF,MAAxB,CAAf;AACA,UAAIoB,MAAM,EAAV;AACA,UAAIT,QAAJ,EAAc;AACZ,YAAIU,OAAO,KAAK5B,MAAL,CAAY6B,MAAvB;AADY;AAAA;AAAA;;AAAA;AAEZ,0DAAoBX,QAApB,4GAA8B;AAAA,gBAArBD,OAAqB;;AAC5BU,gBAAIG,IAAJ,CAASb,QAAQc,KAAR,CAAcH,IAAd,CAAT;AACD;AAJW;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAKb;AACD,aAAOD,GAAP;AACD;;AAED;;;;;;;;4BAKS;AACP,WAAKxB,cAAL,CAAoB6B,KAApB;AACA,WAAK9B,cAAL,CAAoB8B,KAApB;AACA,aAAO,KAAK5B,SAAL,CAAe6B,KAAf,EAAP;AACD;;;8BAEU5C,E,EAAID,I,EAAM;AACnB,UAAI6B,UAAU5B,GAAG0C,KAAH,CAAS,KAAK/B,MAAL,CAAY6B,MAArB,CAAd;AACA,UAAIK,UAAU,KAAK7B,SAAL,GAAiB9B,QAAQ4D,MAAR,CAAe/C,IAAf,CAAjB,GAAwCA,IAAtD;AACA,UAAIoB,UAAU,KAAKL,cAAL,CAAoBM,GAApB,CAAwBpB,EAAxB,CAAd;AACA;AACA,UAAImB,OAAJ,EAAa;AACX,YAAIe,aAAJ;AACA,YAAI,KAAKtB,cAAT,EAAyB;AACvBsB,kBAAQW,QAAQZ,IAAhB,EAAsBL,OAAtB,0CAAkCiB,QAAQX,IAA1C;AACD,SAFD,MAEO;AACLA,kBAAQW,QAAQZ,IAAhB,0CAAyBY,QAAQX,IAAjC;AACD;AANU;AAAA;AAAA;;AAAA;AAOX,2DAAmBf,OAAnB,iHAA4B;AAAA,gBAAnBD,MAAmB;;AAC1B,gBAAI,CAAC2B,QAAQT,MAAT,IAAmBlB,OAAOmB,EAAP,KAAcQ,QAAQT,MAA7C,EAAqD;AACnDlB,qBAAOrB,IAAP,gDAAeqC,IAAf;AACD;AACF;AAXU;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAYZ;AACF;;;EAnM+B/C,Y;;AAuMlC4D,OAAOC,OAAP,GAAiBvC,mBAAjB","file":"EmitterPubsubBroker.js","sourcesContent":["'use strict'\n\nconst Promise = require('bluebird')\nconst Redis = require('ioredis')\nconst msgpack = require('msgpack-lite')\nconst { EventEmitter } = require('eventemitter3')\n\n/**\n * Interface for connector implementations.\n *\n * @interface Connector\n * @extends EventEmitter\n */\n\n/**\n * @method\n * @instance\n * @name publish\n * @memberOf Connector\n * @param {string} channel Channel.\n * @param {Buffer} data Data.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name subscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name unsubscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name close\n * @memberOf Connector\n * @return {Promise<undefined>}\n */\n\n/**\n * @event message\n * @memberOf Connector\n * @param {string} event Event name.\n * @param {Buffer} data Event data.\n */\n\n/**\n * Event will be listened by {@link EmitterPubsubBroker} instance.\n *\n * @event error\n * @memberOf Connector\n * @param {Error} error Error.\n */\n\nclass RedisConnector extends EventEmitter {\n  constructor (options) {\n    super()\n    this.pub = new Redis(options)\n    this.sub = new Redis(options)\n    this.sub.subscribe().catchReturn()\n    this.sub.on('messageBuffer', this._onMessage.bind(this))\n    this.sub.on('error', this.emit.bind(this))\n    this.pub.on('error', this.emit.bind(this))\n  }\n\n  _onMessage (buf, data) {\n    let ch = buf.toString()\n    this.emit('message', ch, data)\n  }\n\n  publish (ch, data) {\n    return this.pub.publish(ch, data)\n  }\n\n  subscribe (ch) {\n    return this.sub.subscribe(ch)\n  }\n\n  unsubscribe (ch) {\n    return this.sub.unsubscribe(ch)\n  }\n\n  close () {\n    return Promise.all([this.pub.quit(), this.sub.quit()])\n  }\n}\n\nclass MemoryConnector extends EventEmitter {\n  constructor (options) {\n    super()\n  }\n\n  publish (ch, data) {\n    return Promise.resolve()\n      .then(() => this.emit('message', ch, data))\n  }\n\n  subscribe (ch) {\n    return Promise.resolve()\n  }\n\n  unsubscribe (ch) {\n    return Promise.resolve()\n  }\n\n  close () {\n    return Promise.resolve()\n  }\n}\n\n/**\n * @typedef {Object} EmitterPubsubBroker.Options\n *\n * @property {string} connect Connect string.\n * @property {string} [prefix='emitter-pubsub-broker:'] Prefix.\n * @property {boolean} [includeChannel=false] Include channel as the\n * first argument.\n * @property {Connector} [connector] Custom connector implementation.\n */\n\n/**\n * @extends EventEmitter\n */\nclass EmitterPubsubBroker extends EventEmitter {\n  /**\n   * Creates a broker.\n   *\n   * @param {EmitterPubsubBroker.Options|string} options Options or a\n   * connect if a string. If connect string is empty, then an\n   * in-memory connector is used.\n   */\n  constructor (options) {\n    super()\n    if (options == null || typeof options === 'string') {\n      options = { connect: options }\n    }\n    this.prefix = options.prefix || 'emitter-pubsub-broker:'\n    this.includeChannel = options.includeChannel\n    this.clientChannels = new Map()\n    this.channelClients = new Map()\n    if (options.connect || options.connector) {\n      this.connector = options.connector || new RedisConnector(options.connect)\n      this.serialize = true\n    } else {\n      this.connector = new MemoryConnector()\n      this.serialize = false\n    }\n    this.connector.on('message', this._dispatch.bind(this))\n    /**\n     * Connector error. Does not throw if there are no listeners.\n     *\n     * @event error\n     * @memberOf EmitterPubsubBroker\n     * @param {Error} error Error.\n     */\n    this.connector.on('error', this.emit.bind(this))\n  }\n\n  _channelAddClient (client, ch) {\n    let clients = this.channelClients.get(ch)\n    if (clients == null) {\n      clients = new Set()\n      this.channelClients.set(ch, clients)\n      clients.add(client)\n      return this.connector.subscribe(ch)\n    } else {\n      clients.add(client)\n      return Promise.resolve()\n    }\n  }\n\n  _channelRemoveClient (client, ch) {\n    let clients = this.channelClients.get(ch)\n    let nclients\n    if (clients != null) {\n      clients.delete(client)\n      nclients = clients.size\n    }\n    if (nclients === 0) {\n      return this.connector.unsubscribe(ch)\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  _makeMessage (msg) {\n    return this.serialize ? msgpack.encode(msg) : msg\n  }\n\n  /**\n   * Subscribes emitter to a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  subscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    if (!channels) {\n      channels = new Set()\n      this.clientChannels.set(client, channels)\n    }\n    let ch = this.prefix + channel\n    channels.add(ch)\n    return this._channelAddClient(client, ch)\n  }\n\n  /**\n   * Unsubscribes emitter from a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  unsubscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    let ch = this.prefix + channel\n    if (channels) {\n      channels.delete(ch)\n    }\n    return this._channelRemoveClient(client, ch)\n  }\n\n  /**\n   * Unsubscribes emitter from all channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Promise<undefined>}\n   */\n  unsubscribeall (client) {\n    let channels = this.clientChannels.get(client)\n    this.clientChannels.delete(client)\n    if (channels) {\n      return Promise.each(\n        channels, this._channelRemoveClient.bind(this, client))\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  /**\n   * Publish an event to a channel.\n   *\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  publish (channel, name, ...args) {\n    let ch = this.prefix + channel\n    let msg = this._makeMessage({name, args})\n    return this.connector.publish(ch, msg)\n  }\n\n  /**\n   * Publish an event to a channel, excluding the sender. The client\n   * object __MUST__ have an unique `id` field.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  send (client, channel, name, ...args) {\n    let ch = this.prefix + channel\n    let sender = client.id\n    let msg = this._makeMessage({sender, name, args})\n    return this.connector.publish(ch, msg)\n  }\n\n  /**\n   * Returns client subscriptions.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Array<string>}\n   */\n  getSubscriptions (client) {\n    let channels = this.clientChannels.get(client)\n    let res = []\n    if (channels) {\n      let plen = this.prefix.length\n      for (let channel of channels) {\n        res.push(channel.slice(plen))\n      }\n    }\n    return res\n  }\n\n  /**\n   * Closes broker.\n   *\n   * @return {Promise<undefined>}\n   */\n  close () {\n    this.channelClients.clear()\n    this.clientChannels.clear()\n    return this.connector.close()\n  }\n\n  _dispatch (ch, data) {\n    let channel = ch.slice(this.prefix.length)\n    let message = this.serialize ? msgpack.decode(data) : data\n    let clients = this.channelClients.get(ch)\n    /* istanbul ignore else */\n    if (clients) {\n      let args\n      if (this.includeChannel) {\n        args = [message.name, channel, ...message.args]\n      } else {\n        args = [message.name, ...message.args]\n      }\n      for (let client of clients) {\n        if (!message.sender || client.id !== message.sender) {\n          client.emit(...args)\n        }\n      }\n    }\n  }\n\n}\n\nmodule.exports = EmitterPubsubBroker\n"]} | ||
//# sourceMappingURL=data:application/json;base64,{"version":3,"sources":["../src/EmitterPubsubBroker.js"],"names":["Promise","require","Redis","msgpack","EventEmitter","RedisConnector","constructor","options","pub","sub","subscribe","catchReturn","on","_onMessage","bind","emit","buf","data","ch","toString","publish","unsubscribe","close","all","quit","MemoryConnector","resolve","then","EmitterPubsubBroker","connect","prefix","includeChannel","clientChannels","Map","channelClients","connector","serialize","_dispatch","_channelAddClient","client","clients","get","Set","set","add","_channelRemoveClient","nclients","delete","size","_makeMessage","msg","encode","channel","channels","unsubscribeall","each","name","args","send","sender","id","getSubscriptions","res","plen","length","push","slice","clear","message","decode","module","exports"],"mappings":"AAAA;;;;AAEA,MAAMA,UAAUC,QAAQ,UAAR,CAAhB;AACA,MAAMC,QAAQD,QAAQ,SAAR,CAAd;AACA,MAAME,UAAUF,QAAQ,cAAR,CAAhB;;eACyBA,QAAQ,eAAR,C;;MAAjBG,Y,YAAAA,Y;;AAER;;;;;;;AAOA;;;;;;;;;;AAUA;;;;;;;;;AASA;;;;;;;;;AASA;;;;;;;;AAQA;;;;;;;AAOA;;;;;;;;AAQA,MAAMC,cAAN,SAA6BD,YAA7B,CAA0C;AACxCE,cAAaC,OAAb,EAAsB;AACpB;AACA,SAAKC,GAAL,GAAW,IAAIN,KAAJ,CAAUK,OAAV,CAAX;AACA,SAAKE,GAAL,GAAW,IAAIP,KAAJ,CAAUK,OAAV,CAAX;AACA,SAAKE,GAAL,CAASC,SAAT,GAAqBC,WAArB;AACA,SAAKF,GAAL,CAASG,EAAT,CAAY,eAAZ,EAA6B,KAAKC,UAAL,CAAgBC,IAAhB,CAAqB,IAArB,CAA7B;AACA,SAAKL,GAAL,CAASG,EAAT,CAAY,OAAZ,EAAqB,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAArB;AACA,SAAKN,GAAL,CAASI,EAAT,CAAY,OAAZ,EAAqB,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAArB;AACD;;AAEDD,aAAYG,GAAZ,EAAiBC,IAAjB,EAAuB;AACrB,QAAIC,KAAKF,IAAIG,QAAJ,EAAT;AACA,SAAKJ,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB;AACD;;AAEDG,UAASF,EAAT,EAAaD,IAAb,EAAmB;AACjB,WAAO,KAAKT,GAAL,CAASY,OAAT,CAAiBF,EAAjB,EAAqBD,IAArB,CAAP;AACD;;AAEDP,YAAWQ,EAAX,EAAe;AACb,WAAO,KAAKT,GAAL,CAASC,SAAT,CAAmBQ,EAAnB,CAAP;AACD;;AAEDG,cAAaH,EAAb,EAAiB;AACf,WAAO,KAAKT,GAAL,CAASY,WAAT,CAAqBH,EAArB,CAAP;AACD;;AAEDI,UAAS;AACP,WAAOtB,QAAQuB,GAAR,CAAY,CAAC,KAAKf,GAAL,CAASgB,IAAT,EAAD,EAAkB,KAAKf,GAAL,CAASe,IAAT,EAAlB,CAAZ,CAAP;AACD;AA9BuC;;AAiC1C,MAAMC,eAAN,SAA8BrB,YAA9B,CAA2C;AACzCE,cAAaC,OAAb,EAAsB;AACpB;AACD;;AAEDa,UAASF,EAAT,EAAaD,IAAb,EAAmB;AACjB,WAAOjB,QAAQ0B,OAAR,GACJC,IADI,CACC,MAAM,KAAKZ,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB,CADP,CAAP;AAED;;AAEDP,YAAWQ,EAAX,EAAe;AACb,WAAOlB,QAAQ0B,OAAR,EAAP;AACD;;AAEDL,cAAaH,EAAb,EAAiB;AACf,WAAOlB,QAAQ0B,OAAR,EAAP;AACD;;AAEDJ,UAAS;AACP,WAAOtB,QAAQ0B,OAAR,EAAP;AACD;AApBwC;;AAuB3C;;;;;;;;;;AAUA;;;AAGA,MAAME,mBAAN,SAAkCxB,YAAlC,CAA+C;AAC7C;;;;;;;AAOAE,cAAaC,OAAb,EAAsB;AACpB;AACA,QAAIA,WAAW,IAAX,IAAmB,OAAOA,OAAP,KAAmB,QAA1C,EAAoD;AAClDA,gBAAU,EAAEsB,SAAStB,OAAX,EAAV;AACD;AACD,SAAKuB,MAAL,GAAcvB,QAAQuB,MAAR,IAAkB,wBAAhC;AACA,SAAKC,cAAL,GAAsBxB,QAAQwB,cAA9B;AACA,SAAKC,cAAL,GAAsB,IAAIC,GAAJ,EAAtB;AACA,SAAKC,cAAL,GAAsB,IAAID,GAAJ,EAAtB;AACA,QAAI1B,QAAQsB,OAAR,IAAmBtB,QAAQ4B,SAA/B,EAA0C;AACxC,WAAKA,SAAL,GAAiB5B,QAAQ4B,SAAR,IAAqB,IAAI9B,cAAJ,CAAmBE,QAAQsB,OAA3B,CAAtC;AACA,WAAKO,SAAL,GAAiB,IAAjB;AACD,KAHD,MAGO;AACL,WAAKD,SAAL,GAAiB,IAAIV,eAAJ,EAAjB;AACA,WAAKW,SAAL,GAAiB,KAAjB;AACD;AACD,SAAKD,SAAL,CAAevB,EAAf,CAAkB,SAAlB,EAA6B,KAAKyB,SAAL,CAAevB,IAAf,CAAoB,IAApB,CAA7B;AACA;;;;;;;AAOA,SAAKqB,SAAL,CAAevB,EAAf,CAAkB,OAAlB,EAA2B,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAA3B;AACD;;AAEDwB,oBAAmBC,MAAnB,EAA2BrB,EAA3B,EAA+B;AAC7B,QAAIsB,UAAU,KAAKN,cAAL,CAAoBO,GAApB,CAAwBvB,EAAxB,CAAd;AACA,QAAIsB,WAAW,IAAf,EAAqB;AACnBA,gBAAU,IAAIE,GAAJ,EAAV;AACA,WAAKR,cAAL,CAAoBS,GAApB,CAAwBzB,EAAxB,EAA4BsB,OAA5B;AACAA,cAAQI,GAAR,CAAYL,MAAZ;AACA,aAAO,KAAKJ,SAAL,CAAezB,SAAf,CAAyBQ,EAAzB,CAAP;AACD,KALD,MAKO;AACLsB,cAAQI,GAAR,CAAYL,MAAZ;AACA,aAAOvC,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAEDmB,uBAAsBN,MAAtB,EAA8BrB,EAA9B,EAAkC;AAChC,QAAIsB,UAAU,KAAKN,cAAL,CAAoBO,GAApB,CAAwBvB,EAAxB,CAAd;AACA,QAAI4B,QAAJ;AACA,QAAIN,WAAW,IAAf,EAAqB;AACnBA,cAAQO,MAAR,CAAeR,MAAf;AACAO,iBAAWN,QAAQQ,IAAnB;AACD;AACD,QAAIF,aAAa,CAAjB,EAAoB;AAClB,aAAO,KAAKX,SAAL,CAAed,WAAf,CAA2BH,EAA3B,CAAP;AACD,KAFD,MAEO;AACL,aAAOlB,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAEDuB,eAAcC,GAAd,EAAmB;AACjB,WAAO,KAAKd,SAAL,GAAiBjC,QAAQgD,MAAR,CAAeD,GAAf,CAAjB,GAAuCA,GAA9C;AACD;;AAED;;;;;;;AAOAxC,YAAW6B,MAAX,EAAmBa,OAAnB,EAA4B;AAC1B,QAAIC,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,QAAI,CAACc,QAAL,EAAe;AACbA,iBAAW,IAAIX,GAAJ,EAAX;AACA,WAAKV,cAAL,CAAoBW,GAApB,CAAwBJ,MAAxB,EAAgCc,QAAhC;AACD;AACD,QAAInC,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACAC,aAAST,GAAT,CAAa1B,EAAb;AACA,WAAO,KAAKoB,iBAAL,CAAuBC,MAAvB,EAA+BrB,EAA/B,CAAP;AACD;;AAED;;;;;;;AAOAG,cAAakB,MAAb,EAAqBa,OAArB,EAA8B;AAC5B,QAAIC,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,QAAIrB,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACA,QAAIC,QAAJ,EAAc;AACZA,eAASN,MAAT,CAAgB7B,EAAhB;AACD;AACD,WAAO,KAAK2B,oBAAL,CAA0BN,MAA1B,EAAkCrB,EAAlC,CAAP;AACD;;AAED;;;;;;AAMAoC,iBAAgBf,MAAhB,EAAwB;AACtB,QAAIc,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,SAAKP,cAAL,CAAoBe,MAApB,CAA2BR,MAA3B;AACA,QAAIc,QAAJ,EAAc;AACZ,aAAOrD,QAAQuD,IAAR,CACLF,QADK,EACK,KAAKR,oBAAL,CAA0B/B,IAA1B,CAA+B,IAA/B,EAAqCyB,MAArC,CADL,CAAP;AAED,KAHD,MAGO;AACL,aAAOvC,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAED;;;;;;;;AAQAN,UAASgC,OAAT,EAAkBI,IAAlB,EAAiC;AAAA,sCAANC,IAAM;AAANA,UAAM;AAAA;;AAC/B,QAAIvC,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACA,QAAIF,MAAM,KAAKD,YAAL,CAAkB,EAACO,UAAD,EAAOC,UAAP,EAAlB,CAAV;AACA,WAAO,KAAKtB,SAAL,CAAef,OAAf,CAAuBF,EAAvB,EAA2BgC,GAA3B,CAAP;AACD;;AAED;;;;;;;;;;AAUAQ,OAAMnB,MAAN,EAAca,OAAd,EAAuBI,IAAvB,EAAsC;AAAA,uCAANC,IAAM;AAANA,UAAM;AAAA;;AACpC,QAAIvC,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACA,QAAIO,SAASpB,OAAOqB,EAApB;AACA,QAAIV,MAAM,KAAKD,YAAL,CAAkB,EAACU,cAAD,EAASH,UAAT,EAAeC,UAAf,EAAlB,CAAV;AACA,WAAO,KAAKtB,SAAL,CAAef,OAAf,CAAuBF,EAAvB,EAA2BgC,GAA3B,CAAP;AACD;;AAED;;;;;;AAMAW,mBAAkBtB,MAAlB,EAA0B;AACxB,QAAIc,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,QAAIuB,MAAM,EAAV;AACA,QAAIT,QAAJ,EAAc;AACZ,UAAIU,OAAO,KAAKjC,MAAL,CAAYkC,MAAvB;AACA,WAAK,IAAIZ,OAAT,IAAoBC,QAApB,EAA8B;AAC5BS,YAAIG,IAAJ,CAASb,QAAQc,KAAR,CAAcH,IAAd,CAAT;AACD;AACF;AACD,WAAOD,GAAP;AACD;;AAED;;;;;AAKAxC,UAAS;AACP,SAAKY,cAAL,CAAoBiC,KAApB;AACA,SAAKnC,cAAL,CAAoBmC,KAApB;AACA,WAAO,KAAKhC,SAAL,CAAeb,KAAf,EAAP;AACD;;AAEDe,YAAWnB,EAAX,EAAeD,IAAf,EAAqB;AACnB,QAAImC,UAAUlC,GAAGgD,KAAH,CAAS,KAAKpC,MAAL,CAAYkC,MAArB,CAAd;AACA,QAAII,UAAU,KAAKhC,SAAL,GAAiBjC,QAAQkE,MAAR,CAAepD,IAAf,CAAjB,GAAwCA,IAAtD;AACA,QAAIuB,UAAU,KAAKN,cAAL,CAAoBO,GAApB,CAAwBvB,EAAxB,CAAd;AACA;AACA,QAAIsB,OAAJ,EAAa;AACX,UAAIiB,IAAJ;AACA,UAAI,KAAK1B,cAAT,EAAyB;AACvB0B,gBAAQW,QAAQZ,IAAhB,EAAsBJ,OAAtB,4BAAkCgB,QAAQX,IAA1C;AACD,OAFD,MAEO;AACLA,gBAAQW,QAAQZ,IAAhB,4BAAyBY,QAAQX,IAAjC;AACD;AACD,WAAK,IAAIlB,MAAT,IAAmBC,OAAnB,EAA4B;AAC1B,YAAI,CAAC4B,QAAQT,MAAT,IAAmBpB,OAAOqB,EAAP,KAAcQ,QAAQT,MAA7C,EAAqD;AACnDpB,iBAAOxB,IAAP,kCAAe0C,IAAf;AACD;AACF;AACF;AACF;;AAnM4C;;AAuM/Ca,OAAOC,OAAP,GAAiB3C,mBAAjB","file":"EmitterPubsubBroker.js","sourcesContent":["'use strict'\n\nconst Promise = require('bluebird')\nconst Redis = require('ioredis')\nconst msgpack = require('msgpack-lite')\nconst { EventEmitter } = require('eventemitter3')\n\n/**\n * Interface for connector implementations.\n *\n * @interface Connector\n * @extends EventEmitter\n */\n\n/**\n * @method\n * @instance\n * @name publish\n * @memberOf Connector\n * @param {string} channel Channel.\n * @param {Buffer} data Data.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name subscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name unsubscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name close\n * @memberOf Connector\n * @return {Promise<undefined>}\n */\n\n/**\n * @event message\n * @memberOf Connector\n * @param {string} event Event name.\n * @param {Buffer} data Event data.\n */\n\n/**\n * Event will be listened by {@link EmitterPubsubBroker} instance.\n *\n * @event error\n * @memberOf Connector\n * @param {Error} error Error.\n */\n\nclass RedisConnector extends EventEmitter {\n  constructor (options) {\n    super()\n    this.pub = new Redis(options)\n    this.sub = new Redis(options)\n    this.sub.subscribe().catchReturn()\n    this.sub.on('messageBuffer', this._onMessage.bind(this))\n    this.sub.on('error', this.emit.bind(this))\n    this.pub.on('error', this.emit.bind(this))\n  }\n\n  _onMessage (buf, data) {\n    let ch = buf.toString()\n    this.emit('message', ch, data)\n  }\n\n  publish (ch, data) {\n    return this.pub.publish(ch, data)\n  }\n\n  subscribe (ch) {\n    return this.sub.subscribe(ch)\n  }\n\n  unsubscribe (ch) {\n    return this.sub.unsubscribe(ch)\n  }\n\n  close () {\n    return Promise.all([this.pub.quit(), this.sub.quit()])\n  }\n}\n\nclass MemoryConnector extends EventEmitter {\n  constructor (options) {\n    super()\n  }\n\n  publish (ch, data) {\n    return Promise.resolve()\n      .then(() => this.emit('message', ch, data))\n  }\n\n  subscribe (ch) {\n    return Promise.resolve()\n  }\n\n  unsubscribe (ch) {\n    return Promise.resolve()\n  }\n\n  close () {\n    return Promise.resolve()\n  }\n}\n\n/**\n * @typedef {Object} EmitterPubsubBroker.Options\n *\n * @property {string} connect Connect string.\n * @property {string} [prefix='emitter-pubsub-broker:'] Prefix.\n * @property {boolean} [includeChannel=false] Include channel as the\n * first argument.\n * @property {Connector} [connector] Custom connector implementation.\n */\n\n/**\n * @extends EventEmitter\n */\nclass EmitterPubsubBroker extends EventEmitter {\n  /**\n   * Creates a broker.\n   *\n   * @param {EmitterPubsubBroker.Options|string} options Options or a\n   * connect if a string. If connect string is empty, then an\n   * in-memory connector is used.\n   */\n  constructor (options) {\n    super()\n    if (options == null || typeof options === 'string') {\n      options = { connect: options }\n    }\n    this.prefix = options.prefix || 'emitter-pubsub-broker:'\n    this.includeChannel = options.includeChannel\n    this.clientChannels = new Map()\n    this.channelClients = new Map()\n    if (options.connect || options.connector) {\n      this.connector = options.connector || new RedisConnector(options.connect)\n      this.serialize = true\n    } else {\n      this.connector = new MemoryConnector()\n      this.serialize = false\n    }\n    this.connector.on('message', this._dispatch.bind(this))\n    /**\n     * Connector error. Does not throw if there are no listeners.\n     *\n     * @event error\n     * @memberOf EmitterPubsubBroker\n     * @param {Error} error Error.\n     */\n    this.connector.on('error', this.emit.bind(this))\n  }\n\n  _channelAddClient (client, ch) {\n    let clients = this.channelClients.get(ch)\n    if (clients == null) {\n      clients = new Set()\n      this.channelClients.set(ch, clients)\n      clients.add(client)\n      return this.connector.subscribe(ch)\n    } else {\n      clients.add(client)\n      return Promise.resolve()\n    }\n  }\n\n  _channelRemoveClient (client, ch) {\n    let clients = this.channelClients.get(ch)\n    let nclients\n    if (clients != null) {\n      clients.delete(client)\n      nclients = clients.size\n    }\n    if (nclients === 0) {\n      return this.connector.unsubscribe(ch)\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  _makeMessage (msg) {\n    return this.serialize ? msgpack.encode(msg) : msg\n  }\n\n  /**\n   * Subscribes emitter to a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  subscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    if (!channels) {\n      channels = new Set()\n      this.clientChannels.set(client, channels)\n    }\n    let ch = this.prefix + channel\n    channels.add(ch)\n    return this._channelAddClient(client, ch)\n  }\n\n  /**\n   * Unsubscribes emitter from a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  unsubscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    let ch = this.prefix + channel\n    if (channels) {\n      channels.delete(ch)\n    }\n    return this._channelRemoveClient(client, ch)\n  }\n\n  /**\n   * Unsubscribes emitter from all channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Promise<undefined>}\n   */\n  unsubscribeall (client) {\n    let channels = this.clientChannels.get(client)\n    this.clientChannels.delete(client)\n    if (channels) {\n      return Promise.each(\n        channels, this._channelRemoveClient.bind(this, client))\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  /**\n   * Publish an event to a channel.\n   *\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  publish (channel, name, ...args) {\n    let ch = this.prefix + channel\n    let msg = this._makeMessage({name, args})\n    return this.connector.publish(ch, msg)\n  }\n\n  /**\n   * Publish an event to a channel, excluding the sender. The client\n   * object __MUST__ have an unique `id` field.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  send (client, channel, name, ...args) {\n    let ch = this.prefix + channel\n    let sender = client.id\n    let msg = this._makeMessage({sender, name, args})\n    return this.connector.publish(ch, msg)\n  }\n\n  /**\n   * Returns client subscriptions.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Array<string>}\n   */\n  getSubscriptions (client) {\n    let channels = this.clientChannels.get(client)\n    let res = []\n    if (channels) {\n      let plen = this.prefix.length\n      for (let channel of channels) {\n        res.push(channel.slice(plen))\n      }\n    }\n    return res\n  }\n\n  /**\n   * Closes broker.\n   *\n   * @return {Promise<undefined>}\n   */\n  close () {\n    this.channelClients.clear()\n    this.clientChannels.clear()\n    return this.connector.close()\n  }\n\n  _dispatch (ch, data) {\n    let channel = ch.slice(this.prefix.length)\n    let message = this.serialize ? msgpack.decode(data) : data\n    let clients = this.channelClients.get(ch)\n    /* istanbul ignore else */\n    if (clients) {\n      let args\n      if (this.includeChannel) {\n        args = [message.name, channel, ...message.args]\n      } else {\n        args = [message.name, ...message.args]\n      }\n      for (let client of clients) {\n        if (!message.sender || client.id !== message.sender) {\n          client.emit(...args)\n        }\n      }\n    }\n  }\n\n}\n\nmodule.exports = EmitterPubsubBroker\n"]} |
{ | ||
"name": "emitter-pubsub-broker", | ||
"version": "0.3.0", | ||
"version": "0.4.0", | ||
"private": false, | ||
@@ -35,3 +35,2 @@ "description": "An utility for connecting EventEmitters via a pubsub.", | ||
"dependencies": { | ||
"babel-runtime": "^6.9.2", | ||
"bluebird": "^3.3.5", | ||
@@ -45,4 +44,3 @@ "eventemitter3": "^2.0.2", | ||
"babel-cli": "^6.11.4", | ||
"babel-plugin-transform-runtime": "^6.9.0", | ||
"babel-preset-es2015": "^6.9.0", | ||
"babel-preset-es2015-node4": "^2.1.0", | ||
"chai": "^3.5.0", | ||
@@ -54,3 +52,3 @@ "codecov": "^1.0.1", | ||
"mocha": "^3.0.0", | ||
"nyc": "^8.1.0", | ||
"nyc": "^10.0.0", | ||
"standard": "^8.0.0" | ||
@@ -57,0 +55,0 @@ }, |
@@ -49,3 +49,3 @@ | ||
[API](https://an-sh.github.io/emitter-pubsub-broker/0.3/index.html) | ||
[API](https://an-sh.github.io/emitter-pubsub-broker/0.4/index.html) | ||
documentation is available online. | ||
@@ -52,0 +52,0 @@ |
Sorry, the diff of this file is not supported yet
5
10
48235
765
- Removedbabel-runtime@^6.9.2
- Removedbabel-runtime@6.26.0(transitive)
- Removedcore-js@2.6.12(transitive)
- Removedregenerator-runtime@0.11.1(transitive)