Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

emitter-pubsub-broker

Package Overview
Dependencies
Maintainers
1
Versions
9
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

emitter-pubsub-broker - npm Package Compare versions

Comparing version 0.4.0 to 0.5.0

30

CHANGELOG.md

@@ -5,2 +5,32 @@ # Change Log

<a name="0.5.0"></a>
# [0.5.0](https://github.com/an-sh/emitter-pubsub-broker/compare/v0.4.0...v0.5.0) (2017-01-06)
### Bug Fixes
* ignore broadcast errors ([e895e96](https://github.com/an-sh/emitter-pubsub-broker/commit/e895e96))
### Code Refactoring
* change getSubscriptions return type ([338a47a](https://github.com/an-sh/emitter-pubsub-broker/commit/338a47a))
* rename unsubscribeall to unsubscribeAll ([3a25f07](https://github.com/an-sh/emitter-pubsub-broker/commit/3a25f07))
### Features
* add custom serialisation support ([c3f218d](https://github.com/an-sh/emitter-pubsub-broker/commit/c3f218d))
* add getClients ([f728105](https://github.com/an-sh/emitter-pubsub-broker/commit/f728105))
* support broadcast data encoding ([392c173](https://github.com/an-sh/emitter-pubsub-broker/commit/392c173))
### BREAKING CHANGES
* Now getSubscriptions method returns a shared Set
instead of a fresh Array.
* Rename unsubscribeall to unsubscribeAll.
<a name="0.4.0"></a>

@@ -7,0 +37,0 @@ # [0.4.0](https://github.com/an-sh/emitter-pubsub-broker/compare/v0.3.0...v0.4.0) (2016-11-29)

153

lib/EmitterPubsubBroker.js

@@ -127,8 +127,47 @@ 'use strict';

/**
* Messages encoder. Encoded messages will be used to emit messages
* via event emitters. May also return promises for an asynchronous
* execution.
*
* @callback EmitterPubsubBroker.Encoder
* @param {*} args Emit arguments.
* @return {Promise<Object>|Object} Data to send.
*/
/**
* Messages serialisation. Serialised messages will be used internally
* via a communication {@link Connector}. May also return promises
* for an asynchronous execution.
*
* @callback EmitterPubsubBroker.Serialize
* @param {Object} data Data.
* @return {Promise<Object>|Object} Serialised data.
*/
/**
* Messages deserialisation. Serialised messages will be used
* internally via a communication {@link Connector}. May also return
* promises for an asynchronous execution.
*
* @callback EmitterPubsubBroker.Deserialize
* @param {Object} data Serialised data.
* @return {Promise<Object>|Object} Data.
*/
/**
* @typedef {Object} EmitterPubsubBroker.Options
*
* @property {string} connect Connect string.
* @property {string} [prefix='emitter-pubsub-broker:'] Prefix.
* @property {string} [connect] Connect string for a connector.
* @property {string} [prefix='emitter-pubsub-broker:'] Prefix for a connector.
* @property {boolean} [includeChannel=false] Include channel as the
* first argument.
* @property {EmitterPubsubBroker.Encoder} [encoder] Optional encoder
* to run before broadcasting.
* @property {string} [method='emit'] An alternative emit method.
* @property {EmitterPubsubBroker.Serialize}
* [serialize=msgpack.encode] Serialisation function to use with a
* connector.
* @property {EmitterPubsubBroker.Deserialize}
* [deserialize=msgpack.decode] Deserialisation function to use with a
* connector.
* @property {Connector} [connector] Custom connector implementation.

@@ -154,2 +193,6 @@ */

this.prefix = options.prefix || 'emitter-pubsub-broker:';
this.method = options.method || 'emit';
this.serialize = options.serialize || msgpack.encode;
this.deserialize = options.deserialize || msgpack.decode;
this.encoder = options.encoder;
this.includeChannel = options.includeChannel;

@@ -160,6 +203,6 @@ this.clientChannels = new Map();

this.connector = options.connector || new RedisConnector(options.connect);
this.serialize = true;
this.useSerialization = true;
} else {
this.connector = new MemoryConnector();
this.serialize = false;
this.useSerialization = false;
}

@@ -177,8 +220,9 @@ this.connector.on('message', this._dispatch.bind(this));

_channelAddClient(client, ch) {
let clients = this.channelClients.get(ch);
_channelAddClient(client, channel) {
let clients = this.channelClients.get(channel);
if (clients == null) {
clients = new Set();
this.channelClients.set(ch, clients);
this.channelClients.set(channel, clients);
clients.add(client);
let ch = this.prefix + channel;
return this.connector.subscribe(ch);

@@ -191,4 +235,4 @@ } else {

_channelRemoveClient(client, ch) {
let clients = this.channelClients.get(ch);
_channelRemoveClient(client, channel) {
let clients = this.channelClients.get(channel);
let nclients;

@@ -200,2 +244,3 @@ if (clients != null) {

if (nclients === 0) {
let ch = this.prefix + channel;
return this.connector.unsubscribe(ch);

@@ -208,5 +253,9 @@ } else {

_makeMessage(msg) {
return this.serialize ? msgpack.encode(msg) : msg;
return Promise.try(() => this.useSerialization ? this.serialize(msg) : msg);
}
_unpackMessage(data) {
return Promise.try(() => this.useSerialization ? this.deserialize(data) : data);
}
/**

@@ -225,5 +274,4 @@ * Subscribes emitter to a channel.

}
let ch = this.prefix + channel;
channels.add(ch);
return this._channelAddClient(client, ch);
channels.add(channel);
return this._channelAddClient(client, channel);
}

@@ -240,11 +288,10 @@

let channels = this.clientChannels.get(client);
let ch = this.prefix + channel;
if (channels) {
channels.delete(ch);
channels.delete(channel);
}
return this._channelRemoveClient(client, ch);
return this._channelRemoveClient(client, channel);
}
/**
* Unsubscribes emitter from all channel.
* Unsubscribes emitter from all channels.
*

@@ -254,3 +301,3 @@ * @param {EventEmitter} client Emitter.

*/
unsubscribeall(client) {
unsubscribeAll(client) {
let channels = this.clientChannels.get(client);

@@ -279,4 +326,3 @@ this.clientChannels.delete(client);

let ch = this.prefix + channel;
let msg = this._makeMessage({ name: name, args: args });
return this.connector.publish(ch, msg);
return this._makeMessage({ name: name, args: args }).then(msg => this.connector.publish(ch, msg));
}

@@ -301,25 +347,28 @@

let sender = client.id;
let msg = this._makeMessage({ sender: sender, name: name, args: args });
return this.connector.publish(ch, msg);
return this._makeMessage({ sender: sender, name: name, args: args }).then(msg => this.connector.publish(ch, msg));
}
/**
* Returns client subscriptions.
* Returns set of client subscriptions. The result __MUST NOT__ be
* modified.
*
* @param {EventEmitter} client Emitter.
* @return {Array<string>}
* @return {Set<string>|undefined}
*/
getSubscriptions(client) {
let channels = this.clientChannels.get(client);
let res = [];
if (channels) {
let plen = this.prefix.length;
for (let channel of channels) {
res.push(channel.slice(plen));
}
}
return res;
return this.clientChannels.get(client);
}
/**
* Returns _internal_ set of channel clients of EmitterPubsubBroker
* instance. The result __MUST NOT__ be modified.
*
* @param {string} channel Channel.
* @return {Set<EventEmitter>|undefined}
*/
getClients(channel) {
return this.channelClients.get(channel);
}
/**
* Closes broker.

@@ -336,19 +385,20 @@ *

_dispatch(ch, data) {
let channel = ch.slice(this.prefix.length);
let message = this.serialize ? msgpack.decode(data) : data;
let clients = this.channelClients.get(ch);
/* istanbul ignore else */
if (clients) {
let args;
if (this.includeChannel) {
args = [message.name, channel].concat(_toConsumableArray(message.args));
} else {
args = [message.name].concat(_toConsumableArray(message.args));
this._unpackMessage(data).then(message => {
let channel = ch.slice(this.prefix.length);
let clients = this.channelClients.get(channel);
/* istanbul ignore else */
if (clients) {
let args = this.includeChannel ? [message.name, channel].concat(_toConsumableArray(message.args)) : [message.name].concat(_toConsumableArray(message.args));
Promise.try(() => this.encoder ? this.encoder(args) : args).then(data => {
const method = this.method;
const encoder = this.encoder;
const sender = message.sender;
clients.forEach(client => {
if (!sender || client.id !== sender) {
Promise.try(() => encoder ? client[method](data) : client[method].apply(client, _toConsumableArray(data))).catchReturn();
}
});
});
}
for (let client of clients) {
if (!message.sender || client.id !== message.sender) {
client.emit.apply(client, _toConsumableArray(args));
}
}
}
});
}

@@ -358,3 +408,6 @@

// compatibility
EmitterPubsubBroker.prototype.unsubscribeall = EmitterPubsubBroker.prototype.unsubscribeAll;
module.exports = EmitterPubsubBroker;
//# sourceMappingURL=data:application/json;base64,{"version":3,"sources":["../src/EmitterPubsubBroker.js"],"names":["Promise","require","Redis","msgpack","EventEmitter","RedisConnector","constructor","options","pub","sub","subscribe","catchReturn","on","_onMessage","bind","emit","buf","data","ch","toString","publish","unsubscribe","close","all","quit","MemoryConnector","resolve","then","EmitterPubsubBroker","connect","prefix","includeChannel","clientChannels","Map","channelClients","connector","serialize","_dispatch","_channelAddClient","client","clients","get","Set","set","add","_channelRemoveClient","nclients","delete","size","_makeMessage","msg","encode","channel","channels","unsubscribeall","each","name","args","send","sender","id","getSubscriptions","res","plen","length","push","slice","clear","message","decode","module","exports"],"mappings":"AAAA;;;;AAEA,MAAMA,UAAUC,QAAQ,UAAR,CAAhB;AACA,MAAMC,QAAQD,QAAQ,SAAR,CAAd;AACA,MAAME,UAAUF,QAAQ,cAAR,CAAhB;;eACyBA,QAAQ,eAAR,C;;MAAjBG,Y,YAAAA,Y;;AAER;;;;;;;AAOA;;;;;;;;;;AAUA;;;;;;;;;AASA;;;;;;;;;AASA;;;;;;;;AAQA;;;;;;;AAOA;;;;;;;;AAQA,MAAMC,cAAN,SAA6BD,YAA7B,CAA0C;AACxCE,cAAaC,OAAb,EAAsB;AACpB;AACA,SAAKC,GAAL,GAAW,IAAIN,KAAJ,CAAUK,OAAV,CAAX;AACA,SAAKE,GAAL,GAAW,IAAIP,KAAJ,CAAUK,OAAV,CAAX;AACA,SAAKE,GAAL,CAASC,SAAT,GAAqBC,WAArB;AACA,SAAKF,GAAL,CAASG,EAAT,CAAY,eAAZ,EAA6B,KAAKC,UAAL,CAAgBC,IAAhB,CAAqB,IAArB,CAA7B;AACA,SAAKL,GAAL,CAASG,EAAT,CAAY,OAAZ,EAAqB,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAArB;AACA,SAAKN,GAAL,CAASI,EAAT,CAAY,OAAZ,EAAqB,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAArB;AACD;;AAEDD,aAAYG,GAAZ,EAAiBC,IAAjB,EAAuB;AACrB,QAAIC,KAAKF,IAAIG,QAAJ,EAAT;AACA,SAAKJ,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB;AACD;;AAEDG,UAASF,EAAT,EAAaD,IAAb,EAAmB;AACjB,WAAO,KAAKT,GAAL,CAASY,OAAT,CAAiBF,EAAjB,EAAqBD,IAArB,CAAP;AACD;;AAEDP,YAAWQ,EAAX,EAAe;AACb,WAAO,KAAKT,GAAL,CAASC,SAAT,CAAmBQ,EAAnB,CAAP;AACD;;AAEDG,cAAaH,EAAb,EAAiB;AACf,WAAO,KAAKT,GAAL,CAASY,WAAT,CAAqBH,EAArB,CAAP;AACD;;AAEDI,UAAS;AACP,WAAOtB,QAAQuB,GAAR,CAAY,CAAC,KAAKf,GAAL,CAASgB,IAAT,EAAD,EAAkB,KAAKf,GAAL,CAASe,IAAT,EAAlB,CAAZ,CAAP;AACD;AA9BuC;;AAiC1C,MAAMC,eAAN,SAA8BrB,YAA9B,CAA2C;AACzCE,cAAaC,OAAb,EAAsB;AACpB;AACD;;AAEDa,UAASF,EAAT,EAAaD,IAAb,EAAmB;AACjB,WAAOjB,QAAQ0B,OAAR,GACJC,IADI,CACC,MAAM,KAAKZ,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB,CADP,CAAP;AAED;;AAEDP,YAAWQ,EAAX,EAAe;AACb,WAAOlB,QAAQ0B,OAAR,EAAP;AACD;;AAEDL,cAAaH,EAAb,EAAiB;AACf,WAAOlB,QAAQ0B,OAAR,EAAP;AACD;;AAEDJ,UAAS;AACP,WAAOtB,QAAQ0B,OAAR,EAAP;AACD;AApBwC;;AAuB3C;;;;;;;;;;AAUA;;;AAGA,MAAME,mBAAN,SAAkCxB,YAAlC,CAA+C;AAC7C;;;;;;;AAOAE,cAAaC,OAAb,EAAsB;AACpB;AACA,QAAIA,WAAW,IAAX,IAAmB,OAAOA,OAAP,KAAmB,QAA1C,EAAoD;AAClDA,gBAAU,EAAEsB,SAAStB,OAAX,EAAV;AACD;AACD,SAAKuB,MAAL,GAAcvB,QAAQuB,MAAR,IAAkB,wBAAhC;AACA,SAAKC,cAAL,GAAsBxB,QAAQwB,cAA9B;AACA,SAAKC,cAAL,GAAsB,IAAIC,GAAJ,EAAtB;AACA,SAAKC,cAAL,GAAsB,IAAID,GAAJ,EAAtB;AACA,QAAI1B,QAAQsB,OAAR,IAAmBtB,QAAQ4B,SAA/B,EAA0C;AACxC,WAAKA,SAAL,GAAiB5B,QAAQ4B,SAAR,IAAqB,IAAI9B,cAAJ,CAAmBE,QAAQsB,OAA3B,CAAtC;AACA,WAAKO,SAAL,GAAiB,IAAjB;AACD,KAHD,MAGO;AACL,WAAKD,SAAL,GAAiB,IAAIV,eAAJ,EAAjB;AACA,WAAKW,SAAL,GAAiB,KAAjB;AACD;AACD,SAAKD,SAAL,CAAevB,EAAf,CAAkB,SAAlB,EAA6B,KAAKyB,SAAL,CAAevB,IAAf,CAAoB,IAApB,CAA7B;AACA;;;;;;;AAOA,SAAKqB,SAAL,CAAevB,EAAf,CAAkB,OAAlB,EAA2B,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAA3B;AACD;;AAEDwB,oBAAmBC,MAAnB,EAA2BrB,EAA3B,EAA+B;AAC7B,QAAIsB,UAAU,KAAKN,cAAL,CAAoBO,GAApB,CAAwBvB,EAAxB,CAAd;AACA,QAAIsB,WAAW,IAAf,EAAqB;AACnBA,gBAAU,IAAIE,GAAJ,EAAV;AACA,WAAKR,cAAL,CAAoBS,GAApB,CAAwBzB,EAAxB,EAA4BsB,OAA5B;AACAA,cAAQI,GAAR,CAAYL,MAAZ;AACA,aAAO,KAAKJ,SAAL,CAAezB,SAAf,CAAyBQ,EAAzB,CAAP;AACD,KALD,MAKO;AACLsB,cAAQI,GAAR,CAAYL,MAAZ;AACA,aAAOvC,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAEDmB,uBAAsBN,MAAtB,EAA8BrB,EAA9B,EAAkC;AAChC,QAAIsB,UAAU,KAAKN,cAAL,CAAoBO,GAApB,CAAwBvB,EAAxB,CAAd;AACA,QAAI4B,QAAJ;AACA,QAAIN,WAAW,IAAf,EAAqB;AACnBA,cAAQO,MAAR,CAAeR,MAAf;AACAO,iBAAWN,QAAQQ,IAAnB;AACD;AACD,QAAIF,aAAa,CAAjB,EAAoB;AAClB,aAAO,KAAKX,SAAL,CAAed,WAAf,CAA2BH,EAA3B,CAAP;AACD,KAFD,MAEO;AACL,aAAOlB,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAEDuB,eAAcC,GAAd,EAAmB;AACjB,WAAO,KAAKd,SAAL,GAAiBjC,QAAQgD,MAAR,CAAeD,GAAf,CAAjB,GAAuCA,GAA9C;AACD;;AAED;;;;;;;AAOAxC,YAAW6B,MAAX,EAAmBa,OAAnB,EAA4B;AAC1B,QAAIC,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,QAAI,CAACc,QAAL,EAAe;AACbA,iBAAW,IAAIX,GAAJ,EAAX;AACA,WAAKV,cAAL,CAAoBW,GAApB,CAAwBJ,MAAxB,EAAgCc,QAAhC;AACD;AACD,QAAInC,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACAC,aAAST,GAAT,CAAa1B,EAAb;AACA,WAAO,KAAKoB,iBAAL,CAAuBC,MAAvB,EAA+BrB,EAA/B,CAAP;AACD;;AAED;;;;;;;AAOAG,cAAakB,MAAb,EAAqBa,OAArB,EAA8B;AAC5B,QAAIC,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,QAAIrB,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACA,QAAIC,QAAJ,EAAc;AACZA,eAASN,MAAT,CAAgB7B,EAAhB;AACD;AACD,WAAO,KAAK2B,oBAAL,CAA0BN,MAA1B,EAAkCrB,EAAlC,CAAP;AACD;;AAED;;;;;;AAMAoC,iBAAgBf,MAAhB,EAAwB;AACtB,QAAIc,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,SAAKP,cAAL,CAAoBe,MAApB,CAA2BR,MAA3B;AACA,QAAIc,QAAJ,EAAc;AACZ,aAAOrD,QAAQuD,IAAR,CACLF,QADK,EACK,KAAKR,oBAAL,CAA0B/B,IAA1B,CAA+B,IAA/B,EAAqCyB,MAArC,CADL,CAAP;AAED,KAHD,MAGO;AACL,aAAOvC,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAED;;;;;;;;AAQAN,UAASgC,OAAT,EAAkBI,IAAlB,EAAiC;AAAA,sCAANC,IAAM;AAANA,UAAM;AAAA;;AAC/B,QAAIvC,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACA,QAAIF,MAAM,KAAKD,YAAL,CAAkB,EAACO,UAAD,EAAOC,UAAP,EAAlB,CAAV;AACA,WAAO,KAAKtB,SAAL,CAAef,OAAf,CAAuBF,EAAvB,EAA2BgC,GAA3B,CAAP;AACD;;AAED;;;;;;;;;;AAUAQ,OAAMnB,MAAN,EAAca,OAAd,EAAuBI,IAAvB,EAAsC;AAAA,uCAANC,IAAM;AAANA,UAAM;AAAA;;AACpC,QAAIvC,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACA,QAAIO,SAASpB,OAAOqB,EAApB;AACA,QAAIV,MAAM,KAAKD,YAAL,CAAkB,EAACU,cAAD,EAASH,UAAT,EAAeC,UAAf,EAAlB,CAAV;AACA,WAAO,KAAKtB,SAAL,CAAef,OAAf,CAAuBF,EAAvB,EAA2BgC,GAA3B,CAAP;AACD;;AAED;;;;;;AAMAW,mBAAkBtB,MAAlB,EAA0B;AACxB,QAAIc,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,QAAIuB,MAAM,EAAV;AACA,QAAIT,QAAJ,EAAc;AACZ,UAAIU,OAAO,KAAKjC,MAAL,CAAYkC,MAAvB;AACA,WAAK,IAAIZ,OAAT,IAAoBC,QAApB,EAA8B;AAC5BS,YAAIG,IAAJ,CAASb,QAAQc,KAAR,CAAcH,IAAd,CAAT;AACD;AACF;AACD,WAAOD,GAAP;AACD;;AAED;;;;;AAKAxC,UAAS;AACP,SAAKY,cAAL,CAAoBiC,KAApB;AACA,SAAKnC,cAAL,CAAoBmC,KAApB;AACA,WAAO,KAAKhC,SAAL,CAAeb,KAAf,EAAP;AACD;;AAEDe,YAAWnB,EAAX,EAAeD,IAAf,EAAqB;AACnB,QAAImC,UAAUlC,GAAGgD,KAAH,CAAS,KAAKpC,MAAL,CAAYkC,MAArB,CAAd;AACA,QAAII,UAAU,KAAKhC,SAAL,GAAiBjC,QAAQkE,MAAR,CAAepD,IAAf,CAAjB,GAAwCA,IAAtD;AACA,QAAIuB,UAAU,KAAKN,cAAL,CAAoBO,GAApB,CAAwBvB,EAAxB,CAAd;AACA;AACA,QAAIsB,OAAJ,EAAa;AACX,UAAIiB,IAAJ;AACA,UAAI,KAAK1B,cAAT,EAAyB;AACvB0B,gBAAQW,QAAQZ,IAAhB,EAAsBJ,OAAtB,4BAAkCgB,QAAQX,IAA1C;AACD,OAFD,MAEO;AACLA,gBAAQW,QAAQZ,IAAhB,4BAAyBY,QAAQX,IAAjC;AACD;AACD,WAAK,IAAIlB,MAAT,IAAmBC,OAAnB,EAA4B;AAC1B,YAAI,CAAC4B,QAAQT,MAAT,IAAmBpB,OAAOqB,EAAP,KAAcQ,QAAQT,MAA7C,EAAqD;AACnDpB,iBAAOxB,IAAP,kCAAe0C,IAAf;AACD;AACF;AACF;AACF;;AAnM4C;;AAuM/Ca,OAAOC,OAAP,GAAiB3C,mBAAjB","file":"EmitterPubsubBroker.js","sourcesContent":["'use strict'\n\nconst Promise = require('bluebird')\nconst Redis = require('ioredis')\nconst msgpack = require('msgpack-lite')\nconst { EventEmitter } = require('eventemitter3')\n\n/**\n * Interface for connector implementations.\n *\n * @interface Connector\n * @extends EventEmitter\n */\n\n/**\n * @method\n * @instance\n * @name publish\n * @memberOf Connector\n * @param {string} channel Channel.\n * @param {Buffer} data Data.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name subscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name unsubscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name close\n * @memberOf Connector\n * @return {Promise<undefined>}\n */\n\n/**\n * @event message\n * @memberOf Connector\n * @param {string} event Event name.\n * @param {Buffer} data Event data.\n */\n\n/**\n * Event will be listened by {@link EmitterPubsubBroker} instance.\n *\n * @event error\n * @memberOf Connector\n * @param {Error} error Error.\n */\n\nclass RedisConnector extends EventEmitter {\n  constructor (options) {\n    super()\n    this.pub = new Redis(options)\n    this.sub = new Redis(options)\n    this.sub.subscribe().catchReturn()\n    this.sub.on('messageBuffer', this._onMessage.bind(this))\n    this.sub.on('error', this.emit.bind(this))\n    this.pub.on('error', this.emit.bind(this))\n  }\n\n  _onMessage (buf, data) {\n    let ch = buf.toString()\n    this.emit('message', ch, data)\n  }\n\n  publish (ch, data) {\n    return this.pub.publish(ch, data)\n  }\n\n  subscribe (ch) {\n    return this.sub.subscribe(ch)\n  }\n\n  unsubscribe (ch) {\n    return this.sub.unsubscribe(ch)\n  }\n\n  close () {\n    return Promise.all([this.pub.quit(), this.sub.quit()])\n  }\n}\n\nclass MemoryConnector extends EventEmitter {\n  constructor (options) {\n    super()\n  }\n\n  publish (ch, data) {\n    return Promise.resolve()\n      .then(() => this.emit('message', ch, data))\n  }\n\n  subscribe (ch) {\n    return Promise.resolve()\n  }\n\n  unsubscribe (ch) {\n    return Promise.resolve()\n  }\n\n  close () {\n    return Promise.resolve()\n  }\n}\n\n/**\n * @typedef {Object} EmitterPubsubBroker.Options\n *\n * @property {string} connect Connect string.\n * @property {string} [prefix='emitter-pubsub-broker:'] Prefix.\n * @property {boolean} [includeChannel=false] Include channel as the\n * first argument.\n * @property {Connector} [connector] Custom connector implementation.\n */\n\n/**\n * @extends EventEmitter\n */\nclass EmitterPubsubBroker extends EventEmitter {\n  /**\n   * Creates a broker.\n   *\n   * @param {EmitterPubsubBroker.Options|string} options Options or a\n   * connect if a string. If connect string is empty, then an\n   * in-memory connector is used.\n   */\n  constructor (options) {\n    super()\n    if (options == null || typeof options === 'string') {\n      options = { connect: options }\n    }\n    this.prefix = options.prefix || 'emitter-pubsub-broker:'\n    this.includeChannel = options.includeChannel\n    this.clientChannels = new Map()\n    this.channelClients = new Map()\n    if (options.connect || options.connector) {\n      this.connector = options.connector || new RedisConnector(options.connect)\n      this.serialize = true\n    } else {\n      this.connector = new MemoryConnector()\n      this.serialize = false\n    }\n    this.connector.on('message', this._dispatch.bind(this))\n    /**\n     * Connector error. Does not throw if there are no listeners.\n     *\n     * @event error\n     * @memberOf EmitterPubsubBroker\n     * @param {Error} error Error.\n     */\n    this.connector.on('error', this.emit.bind(this))\n  }\n\n  _channelAddClient (client, ch) {\n    let clients = this.channelClients.get(ch)\n    if (clients == null) {\n      clients = new Set()\n      this.channelClients.set(ch, clients)\n      clients.add(client)\n      return this.connector.subscribe(ch)\n    } else {\n      clients.add(client)\n      return Promise.resolve()\n    }\n  }\n\n  _channelRemoveClient (client, ch) {\n    let clients = this.channelClients.get(ch)\n    let nclients\n    if (clients != null) {\n      clients.delete(client)\n      nclients = clients.size\n    }\n    if (nclients === 0) {\n      return this.connector.unsubscribe(ch)\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  _makeMessage (msg) {\n    return this.serialize ? msgpack.encode(msg) : msg\n  }\n\n  /**\n   * Subscribes emitter to a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  subscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    if (!channels) {\n      channels = new Set()\n      this.clientChannels.set(client, channels)\n    }\n    let ch = this.prefix + channel\n    channels.add(ch)\n    return this._channelAddClient(client, ch)\n  }\n\n  /**\n   * Unsubscribes emitter from a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  unsubscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    let ch = this.prefix + channel\n    if (channels) {\n      channels.delete(ch)\n    }\n    return this._channelRemoveClient(client, ch)\n  }\n\n  /**\n   * Unsubscribes emitter from all channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Promise<undefined>}\n   */\n  unsubscribeall (client) {\n    let channels = this.clientChannels.get(client)\n    this.clientChannels.delete(client)\n    if (channels) {\n      return Promise.each(\n        channels, this._channelRemoveClient.bind(this, client))\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  /**\n   * Publish an event to a channel.\n   *\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  publish (channel, name, ...args) {\n    let ch = this.prefix + channel\n    let msg = this._makeMessage({name, args})\n    return this.connector.publish(ch, msg)\n  }\n\n  /**\n   * Publish an event to a channel, excluding the sender. The client\n   * object __MUST__ have an unique `id` field.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  send (client, channel, name, ...args) {\n    let ch = this.prefix + channel\n    let sender = client.id\n    let msg = this._makeMessage({sender, name, args})\n    return this.connector.publish(ch, msg)\n  }\n\n  /**\n   * Returns client subscriptions.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Array<string>}\n   */\n  getSubscriptions (client) {\n    let channels = this.clientChannels.get(client)\n    let res = []\n    if (channels) {\n      let plen = this.prefix.length\n      for (let channel of channels) {\n        res.push(channel.slice(plen))\n      }\n    }\n    return res\n  }\n\n  /**\n   * Closes broker.\n   *\n   * @return {Promise<undefined>}\n   */\n  close () {\n    this.channelClients.clear()\n    this.clientChannels.clear()\n    return this.connector.close()\n  }\n\n  _dispatch (ch, data) {\n    let channel = ch.slice(this.prefix.length)\n    let message = this.serialize ? msgpack.decode(data) : data\n    let clients = this.channelClients.get(ch)\n    /* istanbul ignore else */\n    if (clients) {\n      let args\n      if (this.includeChannel) {\n        args = [message.name, channel, ...message.args]\n      } else {\n        args = [message.name, ...message.args]\n      }\n      for (let client of clients) {\n        if (!message.sender || client.id !== message.sender) {\n          client.emit(...args)\n        }\n      }\n    }\n  }\n\n}\n\nmodule.exports = EmitterPubsubBroker\n"]}
//# sourceMappingURL=data:application/json;base64,{"version":3,"sources":["../src/EmitterPubsubBroker.js"],"names":["Promise","require","Redis","msgpack","EventEmitter","RedisConnector","constructor","options","pub","sub","subscribe","catchReturn","on","_onMessage","bind","emit","buf","data","ch","toString","publish","unsubscribe","close","all","quit","MemoryConnector","resolve","then","EmitterPubsubBroker","connect","prefix","method","serialize","encode","deserialize","decode","encoder","includeChannel","clientChannels","Map","channelClients","connector","useSerialization","_dispatch","_channelAddClient","client","channel","clients","get","Set","set","add","_channelRemoveClient","nclients","delete","size","_makeMessage","msg","try","_unpackMessage","channels","unsubscribeAll","each","name","args","send","sender","id","getSubscriptions","getClients","clear","message","slice","length","forEach","prototype","unsubscribeall","module","exports"],"mappings":"AAAA;;;;AAEA,MAAMA,UAAUC,QAAQ,UAAR,CAAhB;AACA,MAAMC,QAAQD,QAAQ,SAAR,CAAd;AACA,MAAME,UAAUF,QAAQ,cAAR,CAAhB;;eACyBA,QAAQ,eAAR,C;;MAAjBG,Y,YAAAA,Y;;AAER;;;;;;;AAOA;;;;;;;;;;AAUA;;;;;;;;;AASA;;;;;;;;;AASA;;;;;;;;AAQA;;;;;;;AAOA;;;;;;;;AAQA,MAAMC,cAAN,SAA6BD,YAA7B,CAA0C;AACxCE,cAAaC,OAAb,EAAsB;AACpB;AACA,SAAKC,GAAL,GAAW,IAAIN,KAAJ,CAAUK,OAAV,CAAX;AACA,SAAKE,GAAL,GAAW,IAAIP,KAAJ,CAAUK,OAAV,CAAX;AACA,SAAKE,GAAL,CAASC,SAAT,GAAqBC,WAArB;AACA,SAAKF,GAAL,CAASG,EAAT,CAAY,eAAZ,EAA6B,KAAKC,UAAL,CAAgBC,IAAhB,CAAqB,IAArB,CAA7B;AACA,SAAKL,GAAL,CAASG,EAAT,CAAY,OAAZ,EAAqB,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAArB;AACA,SAAKN,GAAL,CAASI,EAAT,CAAY,OAAZ,EAAqB,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAArB;AACD;;AAEDD,aAAYG,GAAZ,EAAiBC,IAAjB,EAAuB;AACrB,QAAIC,KAAKF,IAAIG,QAAJ,EAAT;AACA,SAAKJ,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB;AACD;;AAEDG,UAASF,EAAT,EAAaD,IAAb,EAAmB;AACjB,WAAO,KAAKT,GAAL,CAASY,OAAT,CAAiBF,EAAjB,EAAqBD,IAArB,CAAP;AACD;;AAEDP,YAAWQ,EAAX,EAAe;AACb,WAAO,KAAKT,GAAL,CAASC,SAAT,CAAmBQ,EAAnB,CAAP;AACD;;AAEDG,cAAaH,EAAb,EAAiB;AACf,WAAO,KAAKT,GAAL,CAASY,WAAT,CAAqBH,EAArB,CAAP;AACD;;AAEDI,UAAS;AACP,WAAOtB,QAAQuB,GAAR,CAAY,CAAC,KAAKf,GAAL,CAASgB,IAAT,EAAD,EAAkB,KAAKf,GAAL,CAASe,IAAT,EAAlB,CAAZ,CAAP;AACD;AA9BuC;;AAiC1C,MAAMC,eAAN,SAA8BrB,YAA9B,CAA2C;AACzCE,cAAaC,OAAb,EAAsB;AACpB;AACD;;AAEDa,UAASF,EAAT,EAAaD,IAAb,EAAmB;AACjB,WAAOjB,QAAQ0B,OAAR,GACJC,IADI,CACC,MAAM,KAAKZ,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB,CADP,CAAP;AAED;;AAEDP,YAAWQ,EAAX,EAAe;AACb,WAAOlB,QAAQ0B,OAAR,EAAP;AACD;;AAEDL,cAAaH,EAAb,EAAiB;AACf,WAAOlB,QAAQ0B,OAAR,EAAP;AACD;;AAEDJ,UAAS;AACP,WAAOtB,QAAQ0B,OAAR,EAAP;AACD;AApBwC;;AAuB3C;;;;;;;;;;AAUA;;;;;;;;;;AAUA;;;;;;;;;;AAUA;;;;;;;;;;;;;;;;;;;AAmBA;;;AAGA,MAAME,mBAAN,SAAkCxB,YAAlC,CAA+C;AAC7C;;;;;;;AAOAE,cAAaC,OAAb,EAAsB;AACpB;AACA,QAAIA,WAAW,IAAX,IAAmB,OAAOA,OAAP,KAAmB,QAA1C,EAAoD;AAClDA,gBAAU,EAAEsB,SAAStB,OAAX,EAAV;AACD;AACD,SAAKuB,MAAL,GAAcvB,QAAQuB,MAAR,IAAkB,wBAAhC;AACA,SAAKC,MAAL,GAAcxB,QAAQwB,MAAR,IAAkB,MAAhC;AACA,SAAKC,SAAL,GAAiBzB,QAAQyB,SAAR,IAAqB7B,QAAQ8B,MAA9C;AACA,SAAKC,WAAL,GAAmB3B,QAAQ2B,WAAR,IAAuB/B,QAAQgC,MAAlD;AACA,SAAKC,OAAL,GAAe7B,QAAQ6B,OAAvB;AACA,SAAKC,cAAL,GAAsB9B,QAAQ8B,cAA9B;AACA,SAAKC,cAAL,GAAsB,IAAIC,GAAJ,EAAtB;AACA,SAAKC,cAAL,GAAsB,IAAID,GAAJ,EAAtB;AACA,QAAIhC,QAAQsB,OAAR,IAAmBtB,QAAQkC,SAA/B,EAA0C;AACxC,WAAKA,SAAL,GAAiBlC,QAAQkC,SAAR,IAAqB,IAAIpC,cAAJ,CAAmBE,QAAQsB,OAA3B,CAAtC;AACA,WAAKa,gBAAL,GAAwB,IAAxB;AACD,KAHD,MAGO;AACL,WAAKD,SAAL,GAAiB,IAAIhB,eAAJ,EAAjB;AACA,WAAKiB,gBAAL,GAAwB,KAAxB;AACD;AACD,SAAKD,SAAL,CAAe7B,EAAf,CAAkB,SAAlB,EAA6B,KAAK+B,SAAL,CAAe7B,IAAf,CAAoB,IAApB,CAA7B;AACA;;;;;;;AAOA,SAAK2B,SAAL,CAAe7B,EAAf,CAAkB,OAAlB,EAA2B,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAA3B;AACD;;AAED8B,oBAAmBC,MAAnB,EAA2BC,OAA3B,EAAoC;AAClC,QAAIC,UAAU,KAAKP,cAAL,CAAoBQ,GAApB,CAAwBF,OAAxB,CAAd;AACA,QAAIC,WAAW,IAAf,EAAqB;AACnBA,gBAAU,IAAIE,GAAJ,EAAV;AACA,WAAKT,cAAL,CAAoBU,GAApB,CAAwBJ,OAAxB,EAAiCC,OAAjC;AACAA,cAAQI,GAAR,CAAYN,MAAZ;AACA,UAAI3B,KAAK,KAAKY,MAAL,GAAcgB,OAAvB;AACA,aAAO,KAAKL,SAAL,CAAe/B,SAAf,CAAyBQ,EAAzB,CAAP;AACD,KAND,MAMO;AACL6B,cAAQI,GAAR,CAAYN,MAAZ;AACA,aAAO7C,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAED0B,uBAAsBP,MAAtB,EAA8BC,OAA9B,EAAuC;AACrC,QAAIC,UAAU,KAAKP,cAAL,CAAoBQ,GAApB,CAAwBF,OAAxB,CAAd;AACA,QAAIO,QAAJ;AACA,QAAIN,WAAW,IAAf,EAAqB;AACnBA,cAAQO,MAAR,CAAeT,MAAf;AACAQ,iBAAWN,QAAQQ,IAAnB;AACD;AACD,QAAIF,aAAa,CAAjB,EAAoB;AAClB,UAAInC,KAAK,KAAKY,MAAL,GAAcgB,OAAvB;AACA,aAAO,KAAKL,SAAL,CAAepB,WAAf,CAA2BH,EAA3B,CAAP;AACD,KAHD,MAGO;AACL,aAAOlB,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAED8B,eAAcC,GAAd,EAAmB;AACjB,WAAOzD,QAAQ0D,GAAR,CAAY,MAAM,KAAKhB,gBAAL,GAAwB,KAAKV,SAAL,CAAeyB,GAAf,CAAxB,GAA8CA,GAAhE,CAAP;AACD;;AAEDE,iBAAgB1C,IAAhB,EAAsB;AACpB,WAAOjB,QAAQ0D,GAAR,CAAY,MAAM,KAAKhB,gBAAL,GAAwB,KAAKR,WAAL,CAAiBjB,IAAjB,CAAxB,GAAiDA,IAAnE,CAAP;AACD;;AAED;;;;;;;AAOAP,YAAWmC,MAAX,EAAmBC,OAAnB,EAA4B;AAC1B,QAAIc,WAAW,KAAKtB,cAAL,CAAoBU,GAApB,CAAwBH,MAAxB,CAAf;AACA,QAAI,CAACe,QAAL,EAAe;AACbA,iBAAW,IAAIX,GAAJ,EAAX;AACA,WAAKX,cAAL,CAAoBY,GAApB,CAAwBL,MAAxB,EAAgCe,QAAhC;AACD;AACDA,aAAST,GAAT,CAAaL,OAAb;AACA,WAAO,KAAKF,iBAAL,CAAuBC,MAAvB,EAA+BC,OAA/B,CAAP;AACD;;AAED;;;;;;;AAOAzB,cAAawB,MAAb,EAAqBC,OAArB,EAA8B;AAC5B,QAAIc,WAAW,KAAKtB,cAAL,CAAoBU,GAApB,CAAwBH,MAAxB,CAAf;AACA,QAAIe,QAAJ,EAAc;AACZA,eAASN,MAAT,CAAgBR,OAAhB;AACD;AACD,WAAO,KAAKM,oBAAL,CAA0BP,MAA1B,EAAkCC,OAAlC,CAAP;AACD;;AAED;;;;;;AAMAe,iBAAgBhB,MAAhB,EAAwB;AACtB,QAAIe,WAAW,KAAKtB,cAAL,CAAoBU,GAApB,CAAwBH,MAAxB,CAAf;AACA,SAAKP,cAAL,CAAoBgB,MAApB,CAA2BT,MAA3B;AACA,QAAIe,QAAJ,EAAc;AACZ,aAAO5D,QAAQ8D,IAAR,CACLF,QADK,EACK,KAAKR,oBAAL,CAA0BtC,IAA1B,CAA+B,IAA/B,EAAqC+B,MAArC,CADL,CAAP;AAED,KAHD,MAGO;AACL,aAAO7C,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAED;;;;;;;;AAQAN,UAAS0B,OAAT,EAAkBiB,IAAlB,EAAiC;AAAA,sCAANC,IAAM;AAANA,UAAM;AAAA;;AAC/B,QAAI9C,KAAK,KAAKY,MAAL,GAAcgB,OAAvB;AACA,WAAO,KAAKU,YAAL,CAAkB,EAACO,UAAD,EAAOC,UAAP,EAAlB,EACJrC,IADI,CACC8B,OAAO,KAAKhB,SAAL,CAAerB,OAAf,CAAuBF,EAAvB,EAA2BuC,GAA3B,CADR,CAAP;AAED;;AAED;;;;;;;;;;AAUAQ,OAAMpB,MAAN,EAAcC,OAAd,EAAuBiB,IAAvB,EAAsC;AAAA,uCAANC,IAAM;AAANA,UAAM;AAAA;;AACpC,QAAI9C,KAAK,KAAKY,MAAL,GAAcgB,OAAvB;AACA,QAAIoB,SAASrB,OAAOsB,EAApB;AACA,WAAO,KAAKX,YAAL,CAAkB,EAACU,cAAD,EAASH,UAAT,EAAeC,UAAf,EAAlB,EACJrC,IADI,CACC8B,OAAO,KAAKhB,SAAL,CAAerB,OAAf,CAAuBF,EAAvB,EAA2BuC,GAA3B,CADR,CAAP;AAED;;AAED;;;;;;;AAOAW,mBAAkBvB,MAAlB,EAA0B;AACxB,WAAO,KAAKP,cAAL,CAAoBU,GAApB,CAAwBH,MAAxB,CAAP;AACD;;AAED;;;;;;;AAOAwB,aAAYvB,OAAZ,EAAqB;AACnB,WAAO,KAAKN,cAAL,CAAoBQ,GAApB,CAAwBF,OAAxB,CAAP;AACD;;AAED;;;;;AAKAxB,UAAS;AACP,SAAKkB,cAAL,CAAoB8B,KAApB;AACA,SAAKhC,cAAL,CAAoBgC,KAApB;AACA,WAAO,KAAK7B,SAAL,CAAenB,KAAf,EAAP;AACD;;AAEDqB,YAAWzB,EAAX,EAAeD,IAAf,EAAqB;AACnB,SAAK0C,cAAL,CAAoB1C,IAApB,EAA0BU,IAA1B,CAA+B4C,WAAW;AACxC,UAAIzB,UAAU5B,GAAGsD,KAAH,CAAS,KAAK1C,MAAL,CAAY2C,MAArB,CAAd;AACA,UAAI1B,UAAU,KAAKP,cAAL,CAAoBQ,GAApB,CAAwBF,OAAxB,CAAd;AACA;AACA,UAAIC,OAAJ,EAAa;AACX,YAAIiB,OAAO,KAAK3B,cAAL,IACNkC,QAAQR,IADF,EACQjB,OADR,4BACoByB,QAAQP,IAD5B,MAENO,QAAQR,IAFF,4BAEWQ,QAAQP,IAFnB,EAAX;AAGAhE,gBAAQ0D,GAAR,CAAY,MAAM,KAAKtB,OAAL,GAAe,KAAKA,OAAL,CAAa4B,IAAb,CAAf,GAAoCA,IAAtD,EAA4DrC,IAA5D,CAAiEV,QAAQ;AACvE,gBAAMc,SAAS,KAAKA,MAApB;AACA,gBAAMK,UAAU,KAAKA,OAArB;AACA,gBAAM8B,SAASK,QAAQL,MAAvB;AACAnB,kBAAQ2B,OAAR,CAAgB7B,UAAU;AACxB,gBAAI,CAACqB,MAAD,IAAWrB,OAAOsB,EAAP,KAAcD,MAA7B,EAAqC;AACnClE,sBACG0D,GADH,CACO,MAAMtB,UAAUS,OAAOd,MAAP,EAAed,IAAf,CAAV,GAAiC4B,OAAOd,MAAP,mCAAkBd,IAAlB,EAD9C,EAEGN,WAFH;AAGD;AACF,WAND;AAOD,SAXD;AAYD;AACF,KArBD;AAsBD;;AApN4C;;AAwN/C;AACAiB,oBAAoB+C,SAApB,CAA8BC,cAA9B,GAA+ChD,oBAAoB+C,SAApB,CAA8Bd,cAA7E;;AAEAgB,OAAOC,OAAP,GAAiBlD,mBAAjB","file":"EmitterPubsubBroker.js","sourcesContent":["'use strict'\n\nconst Promise = require('bluebird')\nconst Redis = require('ioredis')\nconst msgpack = require('msgpack-lite')\nconst { EventEmitter } = require('eventemitter3')\n\n/**\n * Interface for connector implementations.\n *\n * @interface Connector\n * @extends EventEmitter\n */\n\n/**\n * @method\n * @instance\n * @name publish\n * @memberOf Connector\n * @param {string} channel Channel.\n * @param {Buffer} data Data.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name subscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name unsubscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name close\n * @memberOf Connector\n * @return {Promise<undefined>}\n */\n\n/**\n * @event message\n * @memberOf Connector\n * @param {string} event Event name.\n * @param {Buffer} data Event data.\n */\n\n/**\n * Event will be listened by {@link EmitterPubsubBroker} instance.\n *\n * @event error\n * @memberOf Connector\n * @param {Error} error Error.\n */\n\nclass RedisConnector extends EventEmitter {\n  constructor (options) {\n    super()\n    this.pub = new Redis(options)\n    this.sub = new Redis(options)\n    this.sub.subscribe().catchReturn()\n    this.sub.on('messageBuffer', this._onMessage.bind(this))\n    this.sub.on('error', this.emit.bind(this))\n    this.pub.on('error', this.emit.bind(this))\n  }\n\n  _onMessage (buf, data) {\n    let ch = buf.toString()\n    this.emit('message', ch, data)\n  }\n\n  publish (ch, data) {\n    return this.pub.publish(ch, data)\n  }\n\n  subscribe (ch) {\n    return this.sub.subscribe(ch)\n  }\n\n  unsubscribe (ch) {\n    return this.sub.unsubscribe(ch)\n  }\n\n  close () {\n    return Promise.all([this.pub.quit(), this.sub.quit()])\n  }\n}\n\nclass MemoryConnector extends EventEmitter {\n  constructor (options) {\n    super()\n  }\n\n  publish (ch, data) {\n    return Promise.resolve()\n      .then(() => this.emit('message', ch, data))\n  }\n\n  subscribe (ch) {\n    return Promise.resolve()\n  }\n\n  unsubscribe (ch) {\n    return Promise.resolve()\n  }\n\n  close () {\n    return Promise.resolve()\n  }\n}\n\n/**\n * Messages encoder. Encoded messages will be used to emit messages\n * via event emitters. May also return promises for an asynchronous\n * execution.\n *\n * @callback EmitterPubsubBroker.Encoder\n * @param {*} args Emit arguments.\n * @return {Promise<Object>|Object} Data to send.\n */\n\n/**\n * Messages serialisation. Serialised messages will be used internally\n * via a communication {@link Connector}.  May also return promises\n * for an asynchronous execution.\n *\n * @callback EmitterPubsubBroker.Serialize\n * @param {Object} data Data.\n * @return {Promise<Object>|Object} Serialised data.\n */\n\n/**\n * Messages deserialisation. Serialised messages will be used\n * internally via a communication {@link Connector}.  May also return\n * promises for an asynchronous execution.\n *\n * @callback EmitterPubsubBroker.Deserialize\n * @param {Object} data Serialised data.\n * @return {Promise<Object>|Object} Data.\n */\n\n/**\n * @typedef {Object} EmitterPubsubBroker.Options\n *\n * @property {string} [connect] Connect string for a connector.\n * @property {string} [prefix='emitter-pubsub-broker:'] Prefix for a connector.\n * @property {boolean} [includeChannel=false] Include channel as the\n * first argument.\n * @property {EmitterPubsubBroker.Encoder} [encoder] Optional encoder\n * to run before broadcasting.\n * @property {string} [method='emit'] An alternative emit method.\n * @property {EmitterPubsubBroker.Serialize}\n * [serialize=msgpack.encode] Serialisation function to use with a\n * connector.\n * @property {EmitterPubsubBroker.Deserialize}\n * [deserialize=msgpack.decode] Deserialisation function to use with a\n * connector.\n * @property {Connector} [connector] Custom connector implementation.\n */\n\n/**\n * @extends EventEmitter\n */\nclass EmitterPubsubBroker extends EventEmitter {\n  /**\n   * Creates a broker.\n   *\n   * @param {EmitterPubsubBroker.Options|string} options Options or a\n   * connect if a string. If connect string is empty, then an\n   * in-memory connector is used.\n   */\n  constructor (options) {\n    super()\n    if (options == null || typeof options === 'string') {\n      options = { connect: options }\n    }\n    this.prefix = options.prefix || 'emitter-pubsub-broker:'\n    this.method = options.method || 'emit'\n    this.serialize = options.serialize || msgpack.encode\n    this.deserialize = options.deserialize || msgpack.decode\n    this.encoder = options.encoder\n    this.includeChannel = options.includeChannel\n    this.clientChannels = new Map()\n    this.channelClients = new Map()\n    if (options.connect || options.connector) {\n      this.connector = options.connector || new RedisConnector(options.connect)\n      this.useSerialization = true\n    } else {\n      this.connector = new MemoryConnector()\n      this.useSerialization = false\n    }\n    this.connector.on('message', this._dispatch.bind(this))\n    /**\n     * Connector error. Does not throw if there are no listeners.\n     *\n     * @event error\n     * @memberOf EmitterPubsubBroker\n     * @param {Error} error Error.\n     */\n    this.connector.on('error', this.emit.bind(this))\n  }\n\n  _channelAddClient (client, channel) {\n    let clients = this.channelClients.get(channel)\n    if (clients == null) {\n      clients = new Set()\n      this.channelClients.set(channel, clients)\n      clients.add(client)\n      let ch = this.prefix + channel\n      return this.connector.subscribe(ch)\n    } else {\n      clients.add(client)\n      return Promise.resolve()\n    }\n  }\n\n  _channelRemoveClient (client, channel) {\n    let clients = this.channelClients.get(channel)\n    let nclients\n    if (clients != null) {\n      clients.delete(client)\n      nclients = clients.size\n    }\n    if (nclients === 0) {\n      let ch = this.prefix + channel\n      return this.connector.unsubscribe(ch)\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  _makeMessage (msg) {\n    return Promise.try(() => this.useSerialization ? this.serialize(msg) : msg)\n  }\n\n  _unpackMessage (data) {\n    return Promise.try(() => this.useSerialization ? this.deserialize(data) : data)\n  }\n\n  /**\n   * Subscribes emitter to a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  subscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    if (!channels) {\n      channels = new Set()\n      this.clientChannels.set(client, channels)\n    }\n    channels.add(channel)\n    return this._channelAddClient(client, channel)\n  }\n\n  /**\n   * Unsubscribes emitter from a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  unsubscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    if (channels) {\n      channels.delete(channel)\n    }\n    return this._channelRemoveClient(client, channel)\n  }\n\n  /**\n   * Unsubscribes emitter from all channels.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Promise<undefined>}\n   */\n  unsubscribeAll (client) {\n    let channels = this.clientChannels.get(client)\n    this.clientChannels.delete(client)\n    if (channels) {\n      return Promise.each(\n        channels, this._channelRemoveClient.bind(this, client))\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  /**\n   * Publish an event to a channel.\n   *\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  publish (channel, name, ...args) {\n    let ch = this.prefix + channel\n    return this._makeMessage({name, args})\n      .then(msg => this.connector.publish(ch, msg))\n  }\n\n  /**\n   * Publish an event to a channel, excluding the sender. The client\n   * object __MUST__ have an unique `id` field.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  send (client, channel, name, ...args) {\n    let ch = this.prefix + channel\n    let sender = client.id\n    return this._makeMessage({sender, name, args})\n      .then(msg => this.connector.publish(ch, msg))\n  }\n\n  /**\n   * Returns set of client subscriptions. The result __MUST NOT__ be\n   * modified.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Set<string>|undefined}\n   */\n  getSubscriptions (client) {\n    return this.clientChannels.get(client)\n  }\n\n  /**\n   * Returns _internal_ set of channel clients of EmitterPubsubBroker\n   * instance. The result __MUST NOT__ be modified.\n   *\n   * @param {string} channel Channel.\n   * @return {Set<EventEmitter>|undefined}\n   */\n  getClients (channel) {\n    return this.channelClients.get(channel)\n  }\n\n  /**\n   * Closes broker.\n   *\n   * @return {Promise<undefined>}\n   */\n  close () {\n    this.channelClients.clear()\n    this.clientChannels.clear()\n    return this.connector.close()\n  }\n\n  _dispatch (ch, data) {\n    this._unpackMessage(data).then(message => {\n      let channel = ch.slice(this.prefix.length)\n      let clients = this.channelClients.get(channel)\n      /* istanbul ignore else */\n      if (clients) {\n        let args = this.includeChannel\n          ? [message.name, channel, ...message.args]\n          : [message.name, ...message.args]\n        Promise.try(() => this.encoder ? this.encoder(args) : args).then(data => {\n          const method = this.method\n          const encoder = this.encoder\n          const sender = message.sender\n          clients.forEach(client => {\n            if (!sender || client.id !== sender) {\n              Promise\n                .try(() => encoder ? client[method](data) : client[method](...data))\n                .catchReturn()\n            }\n          })\n        })\n      }\n    })\n  }\n\n}\n\n// compatibility\nEmitterPubsubBroker.prototype.unsubscribeall = EmitterPubsubBroker.prototype.unsubscribeAll\n\nmodule.exports = EmitterPubsubBroker\n"]}
{
"name": "emitter-pubsub-broker",
"version": "0.4.0",
"version": "0.5.0",
"private": false,

@@ -5,0 +5,0 @@ "description": "An utility for connecting EventEmitters via a pubsub.",

@@ -49,3 +49,3 @@

[API](https://an-sh.github.io/emitter-pubsub-broker/0.4/index.html)
[API](https://an-sh.github.io/emitter-pubsub-broker/0.5/index.html)
documentation is available online.

@@ -52,0 +52,0 @@

@@ -123,8 +123,47 @@ 'use strict'

/**
* Messages encoder. Encoded messages will be used to emit messages
* via event emitters. May also return promises for an asynchronous
* execution.
*
* @callback EmitterPubsubBroker.Encoder
* @param {*} args Emit arguments.
* @return {Promise<Object>|Object} Data to send.
*/
/**
* Messages serialisation. Serialised messages will be used internally
* via a communication {@link Connector}. May also return promises
* for an asynchronous execution.
*
* @callback EmitterPubsubBroker.Serialize
* @param {Object} data Data.
* @return {Promise<Object>|Object} Serialised data.
*/
/**
* Messages deserialisation. Serialised messages will be used
* internally via a communication {@link Connector}. May also return
* promises for an asynchronous execution.
*
* @callback EmitterPubsubBroker.Deserialize
* @param {Object} data Serialised data.
* @return {Promise<Object>|Object} Data.
*/
/**
* @typedef {Object} EmitterPubsubBroker.Options
*
* @property {string} connect Connect string.
* @property {string} [prefix='emitter-pubsub-broker:'] Prefix.
* @property {string} [connect] Connect string for a connector.
* @property {string} [prefix='emitter-pubsub-broker:'] Prefix for a connector.
* @property {boolean} [includeChannel=false] Include channel as the
* first argument.
* @property {EmitterPubsubBroker.Encoder} [encoder] Optional encoder
* to run before broadcasting.
* @property {string} [method='emit'] An alternative emit method.
* @property {EmitterPubsubBroker.Serialize}
* [serialize=msgpack.encode] Serialisation function to use with a
* connector.
* @property {EmitterPubsubBroker.Deserialize}
* [deserialize=msgpack.decode] Deserialisation function to use with a
* connector.
* @property {Connector} [connector] Custom connector implementation.

@@ -150,2 +189,6 @@ */

this.prefix = options.prefix || 'emitter-pubsub-broker:'
this.method = options.method || 'emit'
this.serialize = options.serialize || msgpack.encode
this.deserialize = options.deserialize || msgpack.decode
this.encoder = options.encoder
this.includeChannel = options.includeChannel

@@ -156,6 +199,6 @@ this.clientChannels = new Map()

this.connector = options.connector || new RedisConnector(options.connect)
this.serialize = true
this.useSerialization = true
} else {
this.connector = new MemoryConnector()
this.serialize = false
this.useSerialization = false
}

@@ -173,8 +216,9 @@ this.connector.on('message', this._dispatch.bind(this))

_channelAddClient (client, ch) {
let clients = this.channelClients.get(ch)
_channelAddClient (client, channel) {
let clients = this.channelClients.get(channel)
if (clients == null) {
clients = new Set()
this.channelClients.set(ch, clients)
this.channelClients.set(channel, clients)
clients.add(client)
let ch = this.prefix + channel
return this.connector.subscribe(ch)

@@ -187,4 +231,4 @@ } else {

_channelRemoveClient (client, ch) {
let clients = this.channelClients.get(ch)
_channelRemoveClient (client, channel) {
let clients = this.channelClients.get(channel)
let nclients

@@ -196,2 +240,3 @@ if (clients != null) {

if (nclients === 0) {
let ch = this.prefix + channel
return this.connector.unsubscribe(ch)

@@ -204,5 +249,9 @@ } else {

_makeMessage (msg) {
return this.serialize ? msgpack.encode(msg) : msg
return Promise.try(() => this.useSerialization ? this.serialize(msg) : msg)
}
_unpackMessage (data) {
return Promise.try(() => this.useSerialization ? this.deserialize(data) : data)
}
/**

@@ -221,5 +270,4 @@ * Subscribes emitter to a channel.

}
let ch = this.prefix + channel
channels.add(ch)
return this._channelAddClient(client, ch)
channels.add(channel)
return this._channelAddClient(client, channel)
}

@@ -236,11 +284,10 @@

let channels = this.clientChannels.get(client)
let ch = this.prefix + channel
if (channels) {
channels.delete(ch)
channels.delete(channel)
}
return this._channelRemoveClient(client, ch)
return this._channelRemoveClient(client, channel)
}
/**
* Unsubscribes emitter from all channel.
* Unsubscribes emitter from all channels.
*

@@ -250,3 +297,3 @@ * @param {EventEmitter} client Emitter.

*/
unsubscribeall (client) {
unsubscribeAll (client) {
let channels = this.clientChannels.get(client)

@@ -272,4 +319,4 @@ this.clientChannels.delete(client)

let ch = this.prefix + channel
let msg = this._makeMessage({name, args})
return this.connector.publish(ch, msg)
return this._makeMessage({name, args})
.then(msg => this.connector.publish(ch, msg))
}

@@ -290,25 +337,29 @@

let sender = client.id
let msg = this._makeMessage({sender, name, args})
return this.connector.publish(ch, msg)
return this._makeMessage({sender, name, args})
.then(msg => this.connector.publish(ch, msg))
}
/**
* Returns client subscriptions.
* Returns set of client subscriptions. The result __MUST NOT__ be
* modified.
*
* @param {EventEmitter} client Emitter.
* @return {Array<string>}
* @return {Set<string>|undefined}
*/
getSubscriptions (client) {
let channels = this.clientChannels.get(client)
let res = []
if (channels) {
let plen = this.prefix.length
for (let channel of channels) {
res.push(channel.slice(plen))
}
}
return res
return this.clientChannels.get(client)
}
/**
* Returns _internal_ set of channel clients of EmitterPubsubBroker
* instance. The result __MUST NOT__ be modified.
*
* @param {string} channel Channel.
* @return {Set<EventEmitter>|undefined}
*/
getClients (channel) {
return this.channelClients.get(channel)
}
/**
* Closes broker.

@@ -325,19 +376,24 @@ *

_dispatch (ch, data) {
let channel = ch.slice(this.prefix.length)
let message = this.serialize ? msgpack.decode(data) : data
let clients = this.channelClients.get(ch)
/* istanbul ignore else */
if (clients) {
let args
if (this.includeChannel) {
args = [message.name, channel, ...message.args]
} else {
args = [message.name, ...message.args]
this._unpackMessage(data).then(message => {
let channel = ch.slice(this.prefix.length)
let clients = this.channelClients.get(channel)
/* istanbul ignore else */
if (clients) {
let args = this.includeChannel
? [message.name, channel, ...message.args]
: [message.name, ...message.args]
Promise.try(() => this.encoder ? this.encoder(args) : args).then(data => {
const method = this.method
const encoder = this.encoder
const sender = message.sender
clients.forEach(client => {
if (!sender || client.id !== sender) {
Promise
.try(() => encoder ? client[method](data) : client[method](...data))
.catchReturn()
}
})
})
}
for (let client of clients) {
if (!message.sender || client.id !== message.sender) {
client.emit(...args)
}
}
}
})
}

@@ -347,2 +403,5 @@

// compatibility
EmitterPubsubBroker.prototype.unsubscribeall = EmitterPubsubBroker.prototype.unsubscribeAll
module.exports = EmitterPubsubBroker

@@ -6,2 +6,4 @@ 'use strict'

const eventToPromise = require('event-to-promise')
const msgpack = require('msgpack-lite')
const Promise = require('bluebird')
const { expect } = require('chai')

@@ -46,2 +48,32 @@ const { EventEmitter } = require('events')

it('should emit published encoded messages', function (done) {
broker = new EmitterPubsubBroker({connect, encoder: JSON.stringify, method: 'send'})
let client = new EventEmitter()
client.send = function (args) {
let [ev, x, y] = JSON.parse(args)
expect(ev).equal('myEvent')
expect(x).equal(1)
expect(y).equal('2')
done()
}
broker.subscribe(client, 'my-channel').then(() => {
broker.publish('my-channel', 'myEvent', 1, '2')
})
})
it('should use custom serialisation', function () {
let serialize = (data) => Promise.try(() => msgpack.encode(data))
let deserialize = (data) => Promise.try(() => msgpack.decode(data))
broker = new EmitterPubsubBroker({connect, serialize, deserialize})
let client = new EventEmitter()
return broker.subscribe(client, 'my-channel').then(() => {
broker.publish('my-channel', 'myEvent', 1, '2')
return eventToPromise(client, 'myEvent', {array: true}).then(args => {
let [x, y] = args
expect(x).equal(1)
expect(y).equal('2')
})
})
})
it('should prepend a channel argument', function () {

@@ -87,4 +119,5 @@ broker = new EmitterPubsubBroker({connect, includeChannel: true})

eventToPromise(client1, 'myEvent').then(notReachable)
return Promise.all([eventToPromise(client2, 'myEvent'),
new Promise(resolve => setTimeout(resolve, 1000))])
return Promise.all([
eventToPromise(client2, 'myEvent'),
new Promise(resolve => setTimeout(resolve, 1000))])
})

@@ -96,12 +129,28 @@ })

let client1 = new EventEmitter()
return Promise.all([broker.subscribe(client1, 'my-channel'),
broker.subscribe(client1, 'channel')])
return Promise.all([
broker.subscribe(client1, 'my-channel'),
broker.subscribe(client1, 'channel')])
.then(() => {
let subs = broker.getSubscriptions(client1)
expect(subs).lengthOf(2)
expect(subs).include('my-channel')
expect(subs).include('channel')
expect(subs.size).equal(2)
expect(subs.has('my-channel')).true
expect(subs.has('channel')).true
})
})
it('should get clients set', function () {
broker = new EmitterPubsubBroker(connect)
let client1 = new EventEmitter()
let client2 = new EventEmitter()
return Promise.all([
broker.subscribe(client1, 'channel'),
broker.subscribe(client2, 'channel')])
.then(() => {
let subs = broker.getClients('channel')
expect(subs.size).equal(2)
expect(subs.has(client1)).true
expect(subs.has(client2)).true
})
})
it('should unsubscribe all', function () {

@@ -112,5 +161,6 @@ this.timeout(4000)

let client1 = new EventEmitter()
return Promise.all([broker.subscribe(client1, 'my-channel'),
broker.subscribe(client1, 'channel')])
.then(() => broker.unsubscribeall(client1))
return Promise.all([
broker.subscribe(client1, 'my-channel'),
broker.subscribe(client1, 'channel')])
.then(() => broker.unsubscribeAll(client1))
.then(() => {

@@ -130,8 +180,8 @@ expect(broker.getSubscriptions(client1)).empty

it('should handle non-existent unsubscribeall', function () {
it('should handle non-existent unsubscribeAll', function () {
broker = new EmitterPubsubBroker(connect)
let client1 = new EventEmitter()
return broker.unsubscribeall(client1)
return broker.unsubscribeAll(client1)
})
}))
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc