emitter-pubsub-broker
Advanced tools
Comparing version 0.4.0 to 0.5.0
@@ -5,2 +5,32 @@ # Change Log | ||
<a name="0.5.0"></a> | ||
# [0.5.0](https://github.com/an-sh/emitter-pubsub-broker/compare/v0.4.0...v0.5.0) (2017-01-06) | ||
### Bug Fixes | ||
* ignore broadcast errors ([e895e96](https://github.com/an-sh/emitter-pubsub-broker/commit/e895e96)) | ||
### Code Refactoring | ||
* change getSubscriptions return type ([338a47a](https://github.com/an-sh/emitter-pubsub-broker/commit/338a47a)) | ||
* rename unsubscribeall to unsubscribeAll ([3a25f07](https://github.com/an-sh/emitter-pubsub-broker/commit/3a25f07)) | ||
### Features | ||
* add custom serialisation support ([c3f218d](https://github.com/an-sh/emitter-pubsub-broker/commit/c3f218d)) | ||
* add getClients ([f728105](https://github.com/an-sh/emitter-pubsub-broker/commit/f728105)) | ||
* support broadcast data encoding ([392c173](https://github.com/an-sh/emitter-pubsub-broker/commit/392c173)) | ||
### BREAKING CHANGES | ||
* Now getSubscriptions method returns a shared Set | ||
instead of a fresh Array. | ||
* Rename unsubscribeall to unsubscribeAll. | ||
<a name="0.4.0"></a> | ||
@@ -7,0 +37,0 @@ # [0.4.0](https://github.com/an-sh/emitter-pubsub-broker/compare/v0.3.0...v0.4.0) (2016-11-29) |
@@ -127,8 +127,47 @@ 'use strict'; | ||
/** | ||
* Messages encoder. Encoded messages will be used to emit messages | ||
* via event emitters. May also return promises for an asynchronous | ||
* execution. | ||
* | ||
* @callback EmitterPubsubBroker.Encoder | ||
* @param {*} args Emit arguments. | ||
* @return {Promise<Object>|Object} Data to send. | ||
*/ | ||
/** | ||
* Messages serialisation. Serialised messages will be used internally | ||
* via a communication {@link Connector}. May also return promises | ||
* for an asynchronous execution. | ||
* | ||
* @callback EmitterPubsubBroker.Serialize | ||
* @param {Object} data Data. | ||
* @return {Promise<Object>|Object} Serialised data. | ||
*/ | ||
/** | ||
* Messages deserialisation. Serialised messages will be used | ||
* internally via a communication {@link Connector}. May also return | ||
* promises for an asynchronous execution. | ||
* | ||
* @callback EmitterPubsubBroker.Deserialize | ||
* @param {Object} data Serialised data. | ||
* @return {Promise<Object>|Object} Data. | ||
*/ | ||
/** | ||
* @typedef {Object} EmitterPubsubBroker.Options | ||
* | ||
* @property {string} connect Connect string. | ||
* @property {string} [prefix='emitter-pubsub-broker:'] Prefix. | ||
* @property {string} [connect] Connect string for a connector. | ||
* @property {string} [prefix='emitter-pubsub-broker:'] Prefix for a connector. | ||
* @property {boolean} [includeChannel=false] Include channel as the | ||
* first argument. | ||
* @property {EmitterPubsubBroker.Encoder} [encoder] Optional encoder | ||
* to run before broadcasting. | ||
* @property {string} [method='emit'] An alternative emit method. | ||
* @property {EmitterPubsubBroker.Serialize} | ||
* [serialize=msgpack.encode] Serialisation function to use with a | ||
* connector. | ||
* @property {EmitterPubsubBroker.Deserialize} | ||
* [deserialize=msgpack.decode] Deserialisation function to use with a | ||
* connector. | ||
* @property {Connector} [connector] Custom connector implementation. | ||
@@ -154,2 +193,6 @@ */ | ||
this.prefix = options.prefix || 'emitter-pubsub-broker:'; | ||
this.method = options.method || 'emit'; | ||
this.serialize = options.serialize || msgpack.encode; | ||
this.deserialize = options.deserialize || msgpack.decode; | ||
this.encoder = options.encoder; | ||
this.includeChannel = options.includeChannel; | ||
@@ -160,6 +203,6 @@ this.clientChannels = new Map(); | ||
this.connector = options.connector || new RedisConnector(options.connect); | ||
this.serialize = true; | ||
this.useSerialization = true; | ||
} else { | ||
this.connector = new MemoryConnector(); | ||
this.serialize = false; | ||
this.useSerialization = false; | ||
} | ||
@@ -177,8 +220,9 @@ this.connector.on('message', this._dispatch.bind(this)); | ||
_channelAddClient(client, ch) { | ||
let clients = this.channelClients.get(ch); | ||
_channelAddClient(client, channel) { | ||
let clients = this.channelClients.get(channel); | ||
if (clients == null) { | ||
clients = new Set(); | ||
this.channelClients.set(ch, clients); | ||
this.channelClients.set(channel, clients); | ||
clients.add(client); | ||
let ch = this.prefix + channel; | ||
return this.connector.subscribe(ch); | ||
@@ -191,4 +235,4 @@ } else { | ||
_channelRemoveClient(client, ch) { | ||
let clients = this.channelClients.get(ch); | ||
_channelRemoveClient(client, channel) { | ||
let clients = this.channelClients.get(channel); | ||
let nclients; | ||
@@ -200,2 +244,3 @@ if (clients != null) { | ||
if (nclients === 0) { | ||
let ch = this.prefix + channel; | ||
return this.connector.unsubscribe(ch); | ||
@@ -208,5 +253,9 @@ } else { | ||
_makeMessage(msg) { | ||
return this.serialize ? msgpack.encode(msg) : msg; | ||
return Promise.try(() => this.useSerialization ? this.serialize(msg) : msg); | ||
} | ||
_unpackMessage(data) { | ||
return Promise.try(() => this.useSerialization ? this.deserialize(data) : data); | ||
} | ||
/** | ||
@@ -225,5 +274,4 @@ * Subscribes emitter to a channel. | ||
} | ||
let ch = this.prefix + channel; | ||
channels.add(ch); | ||
return this._channelAddClient(client, ch); | ||
channels.add(channel); | ||
return this._channelAddClient(client, channel); | ||
} | ||
@@ -240,11 +288,10 @@ | ||
let channels = this.clientChannels.get(client); | ||
let ch = this.prefix + channel; | ||
if (channels) { | ||
channels.delete(ch); | ||
channels.delete(channel); | ||
} | ||
return this._channelRemoveClient(client, ch); | ||
return this._channelRemoveClient(client, channel); | ||
} | ||
/** | ||
* Unsubscribes emitter from all channel. | ||
* Unsubscribes emitter from all channels. | ||
* | ||
@@ -254,3 +301,3 @@ * @param {EventEmitter} client Emitter. | ||
*/ | ||
unsubscribeall(client) { | ||
unsubscribeAll(client) { | ||
let channels = this.clientChannels.get(client); | ||
@@ -279,4 +326,3 @@ this.clientChannels.delete(client); | ||
let ch = this.prefix + channel; | ||
let msg = this._makeMessage({ name: name, args: args }); | ||
return this.connector.publish(ch, msg); | ||
return this._makeMessage({ name: name, args: args }).then(msg => this.connector.publish(ch, msg)); | ||
} | ||
@@ -301,25 +347,28 @@ | ||
let sender = client.id; | ||
let msg = this._makeMessage({ sender: sender, name: name, args: args }); | ||
return this.connector.publish(ch, msg); | ||
return this._makeMessage({ sender: sender, name: name, args: args }).then(msg => this.connector.publish(ch, msg)); | ||
} | ||
/** | ||
* Returns client subscriptions. | ||
* Returns set of client subscriptions. The result __MUST NOT__ be | ||
* modified. | ||
* | ||
* @param {EventEmitter} client Emitter. | ||
* @return {Array<string>} | ||
* @return {Set<string>|undefined} | ||
*/ | ||
getSubscriptions(client) { | ||
let channels = this.clientChannels.get(client); | ||
let res = []; | ||
if (channels) { | ||
let plen = this.prefix.length; | ||
for (let channel of channels) { | ||
res.push(channel.slice(plen)); | ||
} | ||
} | ||
return res; | ||
return this.clientChannels.get(client); | ||
} | ||
/** | ||
* Returns _internal_ set of channel clients of EmitterPubsubBroker | ||
* instance. The result __MUST NOT__ be modified. | ||
* | ||
* @param {string} channel Channel. | ||
* @return {Set<EventEmitter>|undefined} | ||
*/ | ||
getClients(channel) { | ||
return this.channelClients.get(channel); | ||
} | ||
/** | ||
* Closes broker. | ||
@@ -336,19 +385,20 @@ * | ||
_dispatch(ch, data) { | ||
let channel = ch.slice(this.prefix.length); | ||
let message = this.serialize ? msgpack.decode(data) : data; | ||
let clients = this.channelClients.get(ch); | ||
/* istanbul ignore else */ | ||
if (clients) { | ||
let args; | ||
if (this.includeChannel) { | ||
args = [message.name, channel].concat(_toConsumableArray(message.args)); | ||
} else { | ||
args = [message.name].concat(_toConsumableArray(message.args)); | ||
this._unpackMessage(data).then(message => { | ||
let channel = ch.slice(this.prefix.length); | ||
let clients = this.channelClients.get(channel); | ||
/* istanbul ignore else */ | ||
if (clients) { | ||
let args = this.includeChannel ? [message.name, channel].concat(_toConsumableArray(message.args)) : [message.name].concat(_toConsumableArray(message.args)); | ||
Promise.try(() => this.encoder ? this.encoder(args) : args).then(data => { | ||
const method = this.method; | ||
const encoder = this.encoder; | ||
const sender = message.sender; | ||
clients.forEach(client => { | ||
if (!sender || client.id !== sender) { | ||
Promise.try(() => encoder ? client[method](data) : client[method].apply(client, _toConsumableArray(data))).catchReturn(); | ||
} | ||
}); | ||
}); | ||
} | ||
for (let client of clients) { | ||
if (!message.sender || client.id !== message.sender) { | ||
client.emit.apply(client, _toConsumableArray(args)); | ||
} | ||
} | ||
} | ||
}); | ||
} | ||
@@ -358,3 +408,6 @@ | ||
// compatibility | ||
EmitterPubsubBroker.prototype.unsubscribeall = EmitterPubsubBroker.prototype.unsubscribeAll; | ||
module.exports = EmitterPubsubBroker; | ||
//# sourceMappingURL=data:application/json;base64,{"version":3,"sources":["../src/EmitterPubsubBroker.js"],"names":["Promise","require","Redis","msgpack","EventEmitter","RedisConnector","constructor","options","pub","sub","subscribe","catchReturn","on","_onMessage","bind","emit","buf","data","ch","toString","publish","unsubscribe","close","all","quit","MemoryConnector","resolve","then","EmitterPubsubBroker","connect","prefix","includeChannel","clientChannels","Map","channelClients","connector","serialize","_dispatch","_channelAddClient","client","clients","get","Set","set","add","_channelRemoveClient","nclients","delete","size","_makeMessage","msg","encode","channel","channels","unsubscribeall","each","name","args","send","sender","id","getSubscriptions","res","plen","length","push","slice","clear","message","decode","module","exports"],"mappings":"AAAA;;;;AAEA,MAAMA,UAAUC,QAAQ,UAAR,CAAhB;AACA,MAAMC,QAAQD,QAAQ,SAAR,CAAd;AACA,MAAME,UAAUF,QAAQ,cAAR,CAAhB;;eACyBA,QAAQ,eAAR,C;;MAAjBG,Y,YAAAA,Y;;AAER;;;;;;;AAOA;;;;;;;;;;AAUA;;;;;;;;;AASA;;;;;;;;;AASA;;;;;;;;AAQA;;;;;;;AAOA;;;;;;;;AAQA,MAAMC,cAAN,SAA6BD,YAA7B,CAA0C;AACxCE,cAAaC,OAAb,EAAsB;AACpB;AACA,SAAKC,GAAL,GAAW,IAAIN,KAAJ,CAAUK,OAAV,CAAX;AACA,SAAKE,GAAL,GAAW,IAAIP,KAAJ,CAAUK,OAAV,CAAX;AACA,SAAKE,GAAL,CAASC,SAAT,GAAqBC,WAArB;AACA,SAAKF,GAAL,CAASG,EAAT,CAAY,eAAZ,EAA6B,KAAKC,UAAL,CAAgBC,IAAhB,CAAqB,IAArB,CAA7B;AACA,SAAKL,GAAL,CAASG,EAAT,CAAY,OAAZ,EAAqB,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAArB;AACA,SAAKN,GAAL,CAASI,EAAT,CAAY,OAAZ,EAAqB,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAArB;AACD;;AAEDD,aAAYG,GAAZ,EAAiBC,IAAjB,EAAuB;AACrB,QAAIC,KAAKF,IAAIG,QAAJ,EAAT;AACA,SAAKJ,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB;AACD;;AAEDG,UAASF,EAAT,EAAaD,IAAb,EAAmB;AACjB,WAAO,KAAKT,GAAL,CAASY,OAAT,CAAiBF,EAAjB,EAAqBD,IAArB,CAAP;AACD;;AAEDP,YAAWQ,EAAX,EAAe;AACb,WAAO,KAAKT,GAAL,CAASC,SAAT,CAAmBQ,EAAnB,CAAP;AACD;;AAEDG,cAAaH,EAAb,EAAiB;AACf,WAAO,KAAKT,GAAL,CAASY,WAAT,CAAqBH,EAArB,CAAP;AACD;;AAEDI,UAAS;AACP,WAAOtB,QAAQuB,GAAR,CAAY,CAAC,KAAKf,GAAL,CAASgB,IAAT,EAAD,EAAkB,KAAKf,GAAL,CAASe,IAAT,EAAlB,CAAZ,CAAP;AACD;AA9BuC;;AAiC1C,MAAMC,eAAN,SAA8BrB,YAA9B,CAA2C;AACzCE,cAAaC,OAAb,EAAsB;AACpB;AACD;;AAEDa,UAASF,EAAT,EAAaD,IAAb,EAAmB;AACjB,WAAOjB,QAAQ0B,OAAR,GACJC,IADI,CACC,MAAM,KAAKZ,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB,CADP,CAAP;AAED;;AAEDP,YAAWQ,EAAX,EAAe;AACb,WAAOlB,QAAQ0B,OAAR,EAAP;AACD;;AAEDL,cAAaH,EAAb,EAAiB;AACf,WAAOlB,QAAQ0B,OAAR,EAAP;AACD;;AAEDJ,UAAS;AACP,WAAOtB,QAAQ0B,OAAR,EAAP;AACD;AApBwC;;AAuB3C;;;;;;;;;;AAUA;;;AAGA,MAAME,mBAAN,SAAkCxB,YAAlC,CAA+C;AAC7C;;;;;;;AAOAE,cAAaC,OAAb,EAAsB;AACpB;AACA,QAAIA,WAAW,IAAX,IAAmB,OAAOA,OAAP,KAAmB,QAA1C,EAAoD;AAClDA,gBAAU,EAAEsB,SAAStB,OAAX,EAAV;AACD;AACD,SAAKuB,MAAL,GAAcvB,QAAQuB,MAAR,IAAkB,wBAAhC;AACA,SAAKC,cAAL,GAAsBxB,QAAQwB,cAA9B;AACA,SAAKC,cAAL,GAAsB,IAAIC,GAAJ,EAAtB;AACA,SAAKC,cAAL,GAAsB,IAAID,GAAJ,EAAtB;AACA,QAAI1B,QAAQsB,OAAR,IAAmBtB,QAAQ4B,SAA/B,EAA0C;AACxC,WAAKA,SAAL,GAAiB5B,QAAQ4B,SAAR,IAAqB,IAAI9B,cAAJ,CAAmBE,QAAQsB,OAA3B,CAAtC;AACA,WAAKO,SAAL,GAAiB,IAAjB;AACD,KAHD,MAGO;AACL,WAAKD,SAAL,GAAiB,IAAIV,eAAJ,EAAjB;AACA,WAAKW,SAAL,GAAiB,KAAjB;AACD;AACD,SAAKD,SAAL,CAAevB,EAAf,CAAkB,SAAlB,EAA6B,KAAKyB,SAAL,CAAevB,IAAf,CAAoB,IAApB,CAA7B;AACA;;;;;;;AAOA,SAAKqB,SAAL,CAAevB,EAAf,CAAkB,OAAlB,EAA2B,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAA3B;AACD;;AAEDwB,oBAAmBC,MAAnB,EAA2BrB,EAA3B,EAA+B;AAC7B,QAAIsB,UAAU,KAAKN,cAAL,CAAoBO,GAApB,CAAwBvB,EAAxB,CAAd;AACA,QAAIsB,WAAW,IAAf,EAAqB;AACnBA,gBAAU,IAAIE,GAAJ,EAAV;AACA,WAAKR,cAAL,CAAoBS,GAApB,CAAwBzB,EAAxB,EAA4BsB,OAA5B;AACAA,cAAQI,GAAR,CAAYL,MAAZ;AACA,aAAO,KAAKJ,SAAL,CAAezB,SAAf,CAAyBQ,EAAzB,CAAP;AACD,KALD,MAKO;AACLsB,cAAQI,GAAR,CAAYL,MAAZ;AACA,aAAOvC,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAEDmB,uBAAsBN,MAAtB,EAA8BrB,EAA9B,EAAkC;AAChC,QAAIsB,UAAU,KAAKN,cAAL,CAAoBO,GAApB,CAAwBvB,EAAxB,CAAd;AACA,QAAI4B,QAAJ;AACA,QAAIN,WAAW,IAAf,EAAqB;AACnBA,cAAQO,MAAR,CAAeR,MAAf;AACAO,iBAAWN,QAAQQ,IAAnB;AACD;AACD,QAAIF,aAAa,CAAjB,EAAoB;AAClB,aAAO,KAAKX,SAAL,CAAed,WAAf,CAA2BH,EAA3B,CAAP;AACD,KAFD,MAEO;AACL,aAAOlB,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAEDuB,eAAcC,GAAd,EAAmB;AACjB,WAAO,KAAKd,SAAL,GAAiBjC,QAAQgD,MAAR,CAAeD,GAAf,CAAjB,GAAuCA,GAA9C;AACD;;AAED;;;;;;;AAOAxC,YAAW6B,MAAX,EAAmBa,OAAnB,EAA4B;AAC1B,QAAIC,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,QAAI,CAACc,QAAL,EAAe;AACbA,iBAAW,IAAIX,GAAJ,EAAX;AACA,WAAKV,cAAL,CAAoBW,GAApB,CAAwBJ,MAAxB,EAAgCc,QAAhC;AACD;AACD,QAAInC,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACAC,aAAST,GAAT,CAAa1B,EAAb;AACA,WAAO,KAAKoB,iBAAL,CAAuBC,MAAvB,EAA+BrB,EAA/B,CAAP;AACD;;AAED;;;;;;;AAOAG,cAAakB,MAAb,EAAqBa,OAArB,EAA8B;AAC5B,QAAIC,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,QAAIrB,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACA,QAAIC,QAAJ,EAAc;AACZA,eAASN,MAAT,CAAgB7B,EAAhB;AACD;AACD,WAAO,KAAK2B,oBAAL,CAA0BN,MAA1B,EAAkCrB,EAAlC,CAAP;AACD;;AAED;;;;;;AAMAoC,iBAAgBf,MAAhB,EAAwB;AACtB,QAAIc,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,SAAKP,cAAL,CAAoBe,MAApB,CAA2BR,MAA3B;AACA,QAAIc,QAAJ,EAAc;AACZ,aAAOrD,QAAQuD,IAAR,CACLF,QADK,EACK,KAAKR,oBAAL,CAA0B/B,IAA1B,CAA+B,IAA/B,EAAqCyB,MAArC,CADL,CAAP;AAED,KAHD,MAGO;AACL,aAAOvC,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAED;;;;;;;;AAQAN,UAASgC,OAAT,EAAkBI,IAAlB,EAAiC;AAAA,sCAANC,IAAM;AAANA,UAAM;AAAA;;AAC/B,QAAIvC,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACA,QAAIF,MAAM,KAAKD,YAAL,CAAkB,EAACO,UAAD,EAAOC,UAAP,EAAlB,CAAV;AACA,WAAO,KAAKtB,SAAL,CAAef,OAAf,CAAuBF,EAAvB,EAA2BgC,GAA3B,CAAP;AACD;;AAED;;;;;;;;;;AAUAQ,OAAMnB,MAAN,EAAca,OAAd,EAAuBI,IAAvB,EAAsC;AAAA,uCAANC,IAAM;AAANA,UAAM;AAAA;;AACpC,QAAIvC,KAAK,KAAKY,MAAL,GAAcsB,OAAvB;AACA,QAAIO,SAASpB,OAAOqB,EAApB;AACA,QAAIV,MAAM,KAAKD,YAAL,CAAkB,EAACU,cAAD,EAASH,UAAT,EAAeC,UAAf,EAAlB,CAAV;AACA,WAAO,KAAKtB,SAAL,CAAef,OAAf,CAAuBF,EAAvB,EAA2BgC,GAA3B,CAAP;AACD;;AAED;;;;;;AAMAW,mBAAkBtB,MAAlB,EAA0B;AACxB,QAAIc,WAAW,KAAKrB,cAAL,CAAoBS,GAApB,CAAwBF,MAAxB,CAAf;AACA,QAAIuB,MAAM,EAAV;AACA,QAAIT,QAAJ,EAAc;AACZ,UAAIU,OAAO,KAAKjC,MAAL,CAAYkC,MAAvB;AACA,WAAK,IAAIZ,OAAT,IAAoBC,QAApB,EAA8B;AAC5BS,YAAIG,IAAJ,CAASb,QAAQc,KAAR,CAAcH,IAAd,CAAT;AACD;AACF;AACD,WAAOD,GAAP;AACD;;AAED;;;;;AAKAxC,UAAS;AACP,SAAKY,cAAL,CAAoBiC,KAApB;AACA,SAAKnC,cAAL,CAAoBmC,KAApB;AACA,WAAO,KAAKhC,SAAL,CAAeb,KAAf,EAAP;AACD;;AAEDe,YAAWnB,EAAX,EAAeD,IAAf,EAAqB;AACnB,QAAImC,UAAUlC,GAAGgD,KAAH,CAAS,KAAKpC,MAAL,CAAYkC,MAArB,CAAd;AACA,QAAII,UAAU,KAAKhC,SAAL,GAAiBjC,QAAQkE,MAAR,CAAepD,IAAf,CAAjB,GAAwCA,IAAtD;AACA,QAAIuB,UAAU,KAAKN,cAAL,CAAoBO,GAApB,CAAwBvB,EAAxB,CAAd;AACA;AACA,QAAIsB,OAAJ,EAAa;AACX,UAAIiB,IAAJ;AACA,UAAI,KAAK1B,cAAT,EAAyB;AACvB0B,gBAAQW,QAAQZ,IAAhB,EAAsBJ,OAAtB,4BAAkCgB,QAAQX,IAA1C;AACD,OAFD,MAEO;AACLA,gBAAQW,QAAQZ,IAAhB,4BAAyBY,QAAQX,IAAjC;AACD;AACD,WAAK,IAAIlB,MAAT,IAAmBC,OAAnB,EAA4B;AAC1B,YAAI,CAAC4B,QAAQT,MAAT,IAAmBpB,OAAOqB,EAAP,KAAcQ,QAAQT,MAA7C,EAAqD;AACnDpB,iBAAOxB,IAAP,kCAAe0C,IAAf;AACD;AACF;AACF;AACF;;AAnM4C;;AAuM/Ca,OAAOC,OAAP,GAAiB3C,mBAAjB","file":"EmitterPubsubBroker.js","sourcesContent":["'use strict'\n\nconst Promise = require('bluebird')\nconst Redis = require('ioredis')\nconst msgpack = require('msgpack-lite')\nconst { EventEmitter } = require('eventemitter3')\n\n/**\n * Interface for connector implementations.\n *\n * @interface Connector\n * @extends EventEmitter\n */\n\n/**\n * @method\n * @instance\n * @name publish\n * @memberOf Connector\n * @param {string} channel Channel.\n * @param {Buffer} data Data.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name subscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name unsubscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name close\n * @memberOf Connector\n * @return {Promise<undefined>}\n */\n\n/**\n * @event message\n * @memberOf Connector\n * @param {string} event Event name.\n * @param {Buffer} data Event data.\n */\n\n/**\n * Event will be listened by {@link EmitterPubsubBroker} instance.\n *\n * @event error\n * @memberOf Connector\n * @param {Error} error Error.\n */\n\nclass RedisConnector extends EventEmitter {\n  constructor (options) {\n    super()\n    this.pub = new Redis(options)\n    this.sub = new Redis(options)\n    this.sub.subscribe().catchReturn()\n    this.sub.on('messageBuffer', this._onMessage.bind(this))\n    this.sub.on('error', this.emit.bind(this))\n    this.pub.on('error', this.emit.bind(this))\n  }\n\n  _onMessage (buf, data) {\n    let ch = buf.toString()\n    this.emit('message', ch, data)\n  }\n\n  publish (ch, data) {\n    return this.pub.publish(ch, data)\n  }\n\n  subscribe (ch) {\n    return this.sub.subscribe(ch)\n  }\n\n  unsubscribe (ch) {\n    return this.sub.unsubscribe(ch)\n  }\n\n  close () {\n    return Promise.all([this.pub.quit(), this.sub.quit()])\n  }\n}\n\nclass MemoryConnector extends EventEmitter {\n  constructor (options) {\n    super()\n  }\n\n  publish (ch, data) {\n    return Promise.resolve()\n      .then(() => this.emit('message', ch, data))\n  }\n\n  subscribe (ch) {\n    return Promise.resolve()\n  }\n\n  unsubscribe (ch) {\n    return Promise.resolve()\n  }\n\n  close () {\n    return Promise.resolve()\n  }\n}\n\n/**\n * @typedef {Object} EmitterPubsubBroker.Options\n *\n * @property {string} connect Connect string.\n * @property {string} [prefix='emitter-pubsub-broker:'] Prefix.\n * @property {boolean} [includeChannel=false] Include channel as the\n * first argument.\n * @property {Connector} [connector] Custom connector implementation.\n */\n\n/**\n * @extends EventEmitter\n */\nclass EmitterPubsubBroker extends EventEmitter {\n  /**\n   * Creates a broker.\n   *\n   * @param {EmitterPubsubBroker.Options|string} options Options or a\n   * connect if a string. If connect string is empty, then an\n   * in-memory connector is used.\n   */\n  constructor (options) {\n    super()\n    if (options == null || typeof options === 'string') {\n      options = { connect: options }\n    }\n    this.prefix = options.prefix || 'emitter-pubsub-broker:'\n    this.includeChannel = options.includeChannel\n    this.clientChannels = new Map()\n    this.channelClients = new Map()\n    if (options.connect || options.connector) {\n      this.connector = options.connector || new RedisConnector(options.connect)\n      this.serialize = true\n    } else {\n      this.connector = new MemoryConnector()\n      this.serialize = false\n    }\n    this.connector.on('message', this._dispatch.bind(this))\n    /**\n     * Connector error. Does not throw if there are no listeners.\n     *\n     * @event error\n     * @memberOf EmitterPubsubBroker\n     * @param {Error} error Error.\n     */\n    this.connector.on('error', this.emit.bind(this))\n  }\n\n  _channelAddClient (client, ch) {\n    let clients = this.channelClients.get(ch)\n    if (clients == null) {\n      clients = new Set()\n      this.channelClients.set(ch, clients)\n      clients.add(client)\n      return this.connector.subscribe(ch)\n    } else {\n      clients.add(client)\n      return Promise.resolve()\n    }\n  }\n\n  _channelRemoveClient (client, ch) {\n    let clients = this.channelClients.get(ch)\n    let nclients\n    if (clients != null) {\n      clients.delete(client)\n      nclients = clients.size\n    }\n    if (nclients === 0) {\n      return this.connector.unsubscribe(ch)\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  _makeMessage (msg) {\n    return this.serialize ? msgpack.encode(msg) : msg\n  }\n\n  /**\n   * Subscribes emitter to a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  subscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    if (!channels) {\n      channels = new Set()\n      this.clientChannels.set(client, channels)\n    }\n    let ch = this.prefix + channel\n    channels.add(ch)\n    return this._channelAddClient(client, ch)\n  }\n\n  /**\n   * Unsubscribes emitter from a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  unsubscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    let ch = this.prefix + channel\n    if (channels) {\n      channels.delete(ch)\n    }\n    return this._channelRemoveClient(client, ch)\n  }\n\n  /**\n   * Unsubscribes emitter from all channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Promise<undefined>}\n   */\n  unsubscribeall (client) {\n    let channels = this.clientChannels.get(client)\n    this.clientChannels.delete(client)\n    if (channels) {\n      return Promise.each(\n        channels, this._channelRemoveClient.bind(this, client))\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  /**\n   * Publish an event to a channel.\n   *\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  publish (channel, name, ...args) {\n    let ch = this.prefix + channel\n    let msg = this._makeMessage({name, args})\n    return this.connector.publish(ch, msg)\n  }\n\n  /**\n   * Publish an event to a channel, excluding the sender. The client\n   * object __MUST__ have an unique `id` field.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  send (client, channel, name, ...args) {\n    let ch = this.prefix + channel\n    let sender = client.id\n    let msg = this._makeMessage({sender, name, args})\n    return this.connector.publish(ch, msg)\n  }\n\n  /**\n   * Returns client subscriptions.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Array<string>}\n   */\n  getSubscriptions (client) {\n    let channels = this.clientChannels.get(client)\n    let res = []\n    if (channels) {\n      let plen = this.prefix.length\n      for (let channel of channels) {\n        res.push(channel.slice(plen))\n      }\n    }\n    return res\n  }\n\n  /**\n   * Closes broker.\n   *\n   * @return {Promise<undefined>}\n   */\n  close () {\n    this.channelClients.clear()\n    this.clientChannels.clear()\n    return this.connector.close()\n  }\n\n  _dispatch (ch, data) {\n    let channel = ch.slice(this.prefix.length)\n    let message = this.serialize ? msgpack.decode(data) : data\n    let clients = this.channelClients.get(ch)\n    /* istanbul ignore else */\n    if (clients) {\n      let args\n      if (this.includeChannel) {\n        args = [message.name, channel, ...message.args]\n      } else {\n        args = [message.name, ...message.args]\n      }\n      for (let client of clients) {\n        if (!message.sender || client.id !== message.sender) {\n          client.emit(...args)\n        }\n      }\n    }\n  }\n\n}\n\nmodule.exports = EmitterPubsubBroker\n"]} | ||
//# sourceMappingURL=data:application/json;base64,{"version":3,"sources":["../src/EmitterPubsubBroker.js"],"names":["Promise","require","Redis","msgpack","EventEmitter","RedisConnector","constructor","options","pub","sub","subscribe","catchReturn","on","_onMessage","bind","emit","buf","data","ch","toString","publish","unsubscribe","close","all","quit","MemoryConnector","resolve","then","EmitterPubsubBroker","connect","prefix","method","serialize","encode","deserialize","decode","encoder","includeChannel","clientChannels","Map","channelClients","connector","useSerialization","_dispatch","_channelAddClient","client","channel","clients","get","Set","set","add","_channelRemoveClient","nclients","delete","size","_makeMessage","msg","try","_unpackMessage","channels","unsubscribeAll","each","name","args","send","sender","id","getSubscriptions","getClients","clear","message","slice","length","forEach","prototype","unsubscribeall","module","exports"],"mappings":"AAAA;;;;AAEA,MAAMA,UAAUC,QAAQ,UAAR,CAAhB;AACA,MAAMC,QAAQD,QAAQ,SAAR,CAAd;AACA,MAAME,UAAUF,QAAQ,cAAR,CAAhB;;eACyBA,QAAQ,eAAR,C;;MAAjBG,Y,YAAAA,Y;;AAER;;;;;;;AAOA;;;;;;;;;;AAUA;;;;;;;;;AASA;;;;;;;;;AASA;;;;;;;;AAQA;;;;;;;AAOA;;;;;;;;AAQA,MAAMC,cAAN,SAA6BD,YAA7B,CAA0C;AACxCE,cAAaC,OAAb,EAAsB;AACpB;AACA,SAAKC,GAAL,GAAW,IAAIN,KAAJ,CAAUK,OAAV,CAAX;AACA,SAAKE,GAAL,GAAW,IAAIP,KAAJ,CAAUK,OAAV,CAAX;AACA,SAAKE,GAAL,CAASC,SAAT,GAAqBC,WAArB;AACA,SAAKF,GAAL,CAASG,EAAT,CAAY,eAAZ,EAA6B,KAAKC,UAAL,CAAgBC,IAAhB,CAAqB,IAArB,CAA7B;AACA,SAAKL,GAAL,CAASG,EAAT,CAAY,OAAZ,EAAqB,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAArB;AACA,SAAKN,GAAL,CAASI,EAAT,CAAY,OAAZ,EAAqB,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAArB;AACD;;AAEDD,aAAYG,GAAZ,EAAiBC,IAAjB,EAAuB;AACrB,QAAIC,KAAKF,IAAIG,QAAJ,EAAT;AACA,SAAKJ,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB;AACD;;AAEDG,UAASF,EAAT,EAAaD,IAAb,EAAmB;AACjB,WAAO,KAAKT,GAAL,CAASY,OAAT,CAAiBF,EAAjB,EAAqBD,IAArB,CAAP;AACD;;AAEDP,YAAWQ,EAAX,EAAe;AACb,WAAO,KAAKT,GAAL,CAASC,SAAT,CAAmBQ,EAAnB,CAAP;AACD;;AAEDG,cAAaH,EAAb,EAAiB;AACf,WAAO,KAAKT,GAAL,CAASY,WAAT,CAAqBH,EAArB,CAAP;AACD;;AAEDI,UAAS;AACP,WAAOtB,QAAQuB,GAAR,CAAY,CAAC,KAAKf,GAAL,CAASgB,IAAT,EAAD,EAAkB,KAAKf,GAAL,CAASe,IAAT,EAAlB,CAAZ,CAAP;AACD;AA9BuC;;AAiC1C,MAAMC,eAAN,SAA8BrB,YAA9B,CAA2C;AACzCE,cAAaC,OAAb,EAAsB;AACpB;AACD;;AAEDa,UAASF,EAAT,EAAaD,IAAb,EAAmB;AACjB,WAAOjB,QAAQ0B,OAAR,GACJC,IADI,CACC,MAAM,KAAKZ,IAAL,CAAU,SAAV,EAAqBG,EAArB,EAAyBD,IAAzB,CADP,CAAP;AAED;;AAEDP,YAAWQ,EAAX,EAAe;AACb,WAAOlB,QAAQ0B,OAAR,EAAP;AACD;;AAEDL,cAAaH,EAAb,EAAiB;AACf,WAAOlB,QAAQ0B,OAAR,EAAP;AACD;;AAEDJ,UAAS;AACP,WAAOtB,QAAQ0B,OAAR,EAAP;AACD;AApBwC;;AAuB3C;;;;;;;;;;AAUA;;;;;;;;;;AAUA;;;;;;;;;;AAUA;;;;;;;;;;;;;;;;;;;AAmBA;;;AAGA,MAAME,mBAAN,SAAkCxB,YAAlC,CAA+C;AAC7C;;;;;;;AAOAE,cAAaC,OAAb,EAAsB;AACpB;AACA,QAAIA,WAAW,IAAX,IAAmB,OAAOA,OAAP,KAAmB,QAA1C,EAAoD;AAClDA,gBAAU,EAAEsB,SAAStB,OAAX,EAAV;AACD;AACD,SAAKuB,MAAL,GAAcvB,QAAQuB,MAAR,IAAkB,wBAAhC;AACA,SAAKC,MAAL,GAAcxB,QAAQwB,MAAR,IAAkB,MAAhC;AACA,SAAKC,SAAL,GAAiBzB,QAAQyB,SAAR,IAAqB7B,QAAQ8B,MAA9C;AACA,SAAKC,WAAL,GAAmB3B,QAAQ2B,WAAR,IAAuB/B,QAAQgC,MAAlD;AACA,SAAKC,OAAL,GAAe7B,QAAQ6B,OAAvB;AACA,SAAKC,cAAL,GAAsB9B,QAAQ8B,cAA9B;AACA,SAAKC,cAAL,GAAsB,IAAIC,GAAJ,EAAtB;AACA,SAAKC,cAAL,GAAsB,IAAID,GAAJ,EAAtB;AACA,QAAIhC,QAAQsB,OAAR,IAAmBtB,QAAQkC,SAA/B,EAA0C;AACxC,WAAKA,SAAL,GAAiBlC,QAAQkC,SAAR,IAAqB,IAAIpC,cAAJ,CAAmBE,QAAQsB,OAA3B,CAAtC;AACA,WAAKa,gBAAL,GAAwB,IAAxB;AACD,KAHD,MAGO;AACL,WAAKD,SAAL,GAAiB,IAAIhB,eAAJ,EAAjB;AACA,WAAKiB,gBAAL,GAAwB,KAAxB;AACD;AACD,SAAKD,SAAL,CAAe7B,EAAf,CAAkB,SAAlB,EAA6B,KAAK+B,SAAL,CAAe7B,IAAf,CAAoB,IAApB,CAA7B;AACA;;;;;;;AAOA,SAAK2B,SAAL,CAAe7B,EAAf,CAAkB,OAAlB,EAA2B,KAAKG,IAAL,CAAUD,IAAV,CAAe,IAAf,CAA3B;AACD;;AAED8B,oBAAmBC,MAAnB,EAA2BC,OAA3B,EAAoC;AAClC,QAAIC,UAAU,KAAKP,cAAL,CAAoBQ,GAApB,CAAwBF,OAAxB,CAAd;AACA,QAAIC,WAAW,IAAf,EAAqB;AACnBA,gBAAU,IAAIE,GAAJ,EAAV;AACA,WAAKT,cAAL,CAAoBU,GAApB,CAAwBJ,OAAxB,EAAiCC,OAAjC;AACAA,cAAQI,GAAR,CAAYN,MAAZ;AACA,UAAI3B,KAAK,KAAKY,MAAL,GAAcgB,OAAvB;AACA,aAAO,KAAKL,SAAL,CAAe/B,SAAf,CAAyBQ,EAAzB,CAAP;AACD,KAND,MAMO;AACL6B,cAAQI,GAAR,CAAYN,MAAZ;AACA,aAAO7C,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAED0B,uBAAsBP,MAAtB,EAA8BC,OAA9B,EAAuC;AACrC,QAAIC,UAAU,KAAKP,cAAL,CAAoBQ,GAApB,CAAwBF,OAAxB,CAAd;AACA,QAAIO,QAAJ;AACA,QAAIN,WAAW,IAAf,EAAqB;AACnBA,cAAQO,MAAR,CAAeT,MAAf;AACAQ,iBAAWN,QAAQQ,IAAnB;AACD;AACD,QAAIF,aAAa,CAAjB,EAAoB;AAClB,UAAInC,KAAK,KAAKY,MAAL,GAAcgB,OAAvB;AACA,aAAO,KAAKL,SAAL,CAAepB,WAAf,CAA2BH,EAA3B,CAAP;AACD,KAHD,MAGO;AACL,aAAOlB,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAED8B,eAAcC,GAAd,EAAmB;AACjB,WAAOzD,QAAQ0D,GAAR,CAAY,MAAM,KAAKhB,gBAAL,GAAwB,KAAKV,SAAL,CAAeyB,GAAf,CAAxB,GAA8CA,GAAhE,CAAP;AACD;;AAEDE,iBAAgB1C,IAAhB,EAAsB;AACpB,WAAOjB,QAAQ0D,GAAR,CAAY,MAAM,KAAKhB,gBAAL,GAAwB,KAAKR,WAAL,CAAiBjB,IAAjB,CAAxB,GAAiDA,IAAnE,CAAP;AACD;;AAED;;;;;;;AAOAP,YAAWmC,MAAX,EAAmBC,OAAnB,EAA4B;AAC1B,QAAIc,WAAW,KAAKtB,cAAL,CAAoBU,GAApB,CAAwBH,MAAxB,CAAf;AACA,QAAI,CAACe,QAAL,EAAe;AACbA,iBAAW,IAAIX,GAAJ,EAAX;AACA,WAAKX,cAAL,CAAoBY,GAApB,CAAwBL,MAAxB,EAAgCe,QAAhC;AACD;AACDA,aAAST,GAAT,CAAaL,OAAb;AACA,WAAO,KAAKF,iBAAL,CAAuBC,MAAvB,EAA+BC,OAA/B,CAAP;AACD;;AAED;;;;;;;AAOAzB,cAAawB,MAAb,EAAqBC,OAArB,EAA8B;AAC5B,QAAIc,WAAW,KAAKtB,cAAL,CAAoBU,GAApB,CAAwBH,MAAxB,CAAf;AACA,QAAIe,QAAJ,EAAc;AACZA,eAASN,MAAT,CAAgBR,OAAhB;AACD;AACD,WAAO,KAAKM,oBAAL,CAA0BP,MAA1B,EAAkCC,OAAlC,CAAP;AACD;;AAED;;;;;;AAMAe,iBAAgBhB,MAAhB,EAAwB;AACtB,QAAIe,WAAW,KAAKtB,cAAL,CAAoBU,GAApB,CAAwBH,MAAxB,CAAf;AACA,SAAKP,cAAL,CAAoBgB,MAApB,CAA2BT,MAA3B;AACA,QAAIe,QAAJ,EAAc;AACZ,aAAO5D,QAAQ8D,IAAR,CACLF,QADK,EACK,KAAKR,oBAAL,CAA0BtC,IAA1B,CAA+B,IAA/B,EAAqC+B,MAArC,CADL,CAAP;AAED,KAHD,MAGO;AACL,aAAO7C,QAAQ0B,OAAR,EAAP;AACD;AACF;;AAED;;;;;;;;AAQAN,UAAS0B,OAAT,EAAkBiB,IAAlB,EAAiC;AAAA,sCAANC,IAAM;AAANA,UAAM;AAAA;;AAC/B,QAAI9C,KAAK,KAAKY,MAAL,GAAcgB,OAAvB;AACA,WAAO,KAAKU,YAAL,CAAkB,EAACO,UAAD,EAAOC,UAAP,EAAlB,EACJrC,IADI,CACC8B,OAAO,KAAKhB,SAAL,CAAerB,OAAf,CAAuBF,EAAvB,EAA2BuC,GAA3B,CADR,CAAP;AAED;;AAED;;;;;;;;;;AAUAQ,OAAMpB,MAAN,EAAcC,OAAd,EAAuBiB,IAAvB,EAAsC;AAAA,uCAANC,IAAM;AAANA,UAAM;AAAA;;AACpC,QAAI9C,KAAK,KAAKY,MAAL,GAAcgB,OAAvB;AACA,QAAIoB,SAASrB,OAAOsB,EAApB;AACA,WAAO,KAAKX,YAAL,CAAkB,EAACU,cAAD,EAASH,UAAT,EAAeC,UAAf,EAAlB,EACJrC,IADI,CACC8B,OAAO,KAAKhB,SAAL,CAAerB,OAAf,CAAuBF,EAAvB,EAA2BuC,GAA3B,CADR,CAAP;AAED;;AAED;;;;;;;AAOAW,mBAAkBvB,MAAlB,EAA0B;AACxB,WAAO,KAAKP,cAAL,CAAoBU,GAApB,CAAwBH,MAAxB,CAAP;AACD;;AAED;;;;;;;AAOAwB,aAAYvB,OAAZ,EAAqB;AACnB,WAAO,KAAKN,cAAL,CAAoBQ,GAApB,CAAwBF,OAAxB,CAAP;AACD;;AAED;;;;;AAKAxB,UAAS;AACP,SAAKkB,cAAL,CAAoB8B,KAApB;AACA,SAAKhC,cAAL,CAAoBgC,KAApB;AACA,WAAO,KAAK7B,SAAL,CAAenB,KAAf,EAAP;AACD;;AAEDqB,YAAWzB,EAAX,EAAeD,IAAf,EAAqB;AACnB,SAAK0C,cAAL,CAAoB1C,IAApB,EAA0BU,IAA1B,CAA+B4C,WAAW;AACxC,UAAIzB,UAAU5B,GAAGsD,KAAH,CAAS,KAAK1C,MAAL,CAAY2C,MAArB,CAAd;AACA,UAAI1B,UAAU,KAAKP,cAAL,CAAoBQ,GAApB,CAAwBF,OAAxB,CAAd;AACA;AACA,UAAIC,OAAJ,EAAa;AACX,YAAIiB,OAAO,KAAK3B,cAAL,IACNkC,QAAQR,IADF,EACQjB,OADR,4BACoByB,QAAQP,IAD5B,MAENO,QAAQR,IAFF,4BAEWQ,QAAQP,IAFnB,EAAX;AAGAhE,gBAAQ0D,GAAR,CAAY,MAAM,KAAKtB,OAAL,GAAe,KAAKA,OAAL,CAAa4B,IAAb,CAAf,GAAoCA,IAAtD,EAA4DrC,IAA5D,CAAiEV,QAAQ;AACvE,gBAAMc,SAAS,KAAKA,MAApB;AACA,gBAAMK,UAAU,KAAKA,OAArB;AACA,gBAAM8B,SAASK,QAAQL,MAAvB;AACAnB,kBAAQ2B,OAAR,CAAgB7B,UAAU;AACxB,gBAAI,CAACqB,MAAD,IAAWrB,OAAOsB,EAAP,KAAcD,MAA7B,EAAqC;AACnClE,sBACG0D,GADH,CACO,MAAMtB,UAAUS,OAAOd,MAAP,EAAed,IAAf,CAAV,GAAiC4B,OAAOd,MAAP,mCAAkBd,IAAlB,EAD9C,EAEGN,WAFH;AAGD;AACF,WAND;AAOD,SAXD;AAYD;AACF,KArBD;AAsBD;;AApN4C;;AAwN/C;AACAiB,oBAAoB+C,SAApB,CAA8BC,cAA9B,GAA+ChD,oBAAoB+C,SAApB,CAA8Bd,cAA7E;;AAEAgB,OAAOC,OAAP,GAAiBlD,mBAAjB","file":"EmitterPubsubBroker.js","sourcesContent":["'use strict'\n\nconst Promise = require('bluebird')\nconst Redis = require('ioredis')\nconst msgpack = require('msgpack-lite')\nconst { EventEmitter } = require('eventemitter3')\n\n/**\n * Interface for connector implementations.\n *\n * @interface Connector\n * @extends EventEmitter\n */\n\n/**\n * @method\n * @instance\n * @name publish\n * @memberOf Connector\n * @param {string} channel Channel.\n * @param {Buffer} data Data.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name subscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name unsubscribe\n * @memberOf Connector\n * @param {string} channel Channel.\n * @return {Promise<undefined>}\n */\n\n/**\n * @method\n * @instance\n * @name close\n * @memberOf Connector\n * @return {Promise<undefined>}\n */\n\n/**\n * @event message\n * @memberOf Connector\n * @param {string} event Event name.\n * @param {Buffer} data Event data.\n */\n\n/**\n * Event will be listened by {@link EmitterPubsubBroker} instance.\n *\n * @event error\n * @memberOf Connector\n * @param {Error} error Error.\n */\n\nclass RedisConnector extends EventEmitter {\n  constructor (options) {\n    super()\n    this.pub = new Redis(options)\n    this.sub = new Redis(options)\n    this.sub.subscribe().catchReturn()\n    this.sub.on('messageBuffer', this._onMessage.bind(this))\n    this.sub.on('error', this.emit.bind(this))\n    this.pub.on('error', this.emit.bind(this))\n  }\n\n  _onMessage (buf, data) {\n    let ch = buf.toString()\n    this.emit('message', ch, data)\n  }\n\n  publish (ch, data) {\n    return this.pub.publish(ch, data)\n  }\n\n  subscribe (ch) {\n    return this.sub.subscribe(ch)\n  }\n\n  unsubscribe (ch) {\n    return this.sub.unsubscribe(ch)\n  }\n\n  close () {\n    return Promise.all([this.pub.quit(), this.sub.quit()])\n  }\n}\n\nclass MemoryConnector extends EventEmitter {\n  constructor (options) {\n    super()\n  }\n\n  publish (ch, data) {\n    return Promise.resolve()\n      .then(() => this.emit('message', ch, data))\n  }\n\n  subscribe (ch) {\n    return Promise.resolve()\n  }\n\n  unsubscribe (ch) {\n    return Promise.resolve()\n  }\n\n  close () {\n    return Promise.resolve()\n  }\n}\n\n/**\n * Messages encoder. Encoded messages will be used to emit messages\n * via event emitters. May also return promises for an asynchronous\n * execution.\n *\n * @callback EmitterPubsubBroker.Encoder\n * @param {*} args Emit arguments.\n * @return {Promise<Object>|Object} Data to send.\n */\n\n/**\n * Messages serialisation. Serialised messages will be used internally\n * via a communication {@link Connector}.  May also return promises\n * for an asynchronous execution.\n *\n * @callback EmitterPubsubBroker.Serialize\n * @param {Object} data Data.\n * @return {Promise<Object>|Object} Serialised data.\n */\n\n/**\n * Messages deserialisation. Serialised messages will be used\n * internally via a communication {@link Connector}.  May also return\n * promises for an asynchronous execution.\n *\n * @callback EmitterPubsubBroker.Deserialize\n * @param {Object} data Serialised data.\n * @return {Promise<Object>|Object} Data.\n */\n\n/**\n * @typedef {Object} EmitterPubsubBroker.Options\n *\n * @property {string} [connect] Connect string for a connector.\n * @property {string} [prefix='emitter-pubsub-broker:'] Prefix for a connector.\n * @property {boolean} [includeChannel=false] Include channel as the\n * first argument.\n * @property {EmitterPubsubBroker.Encoder} [encoder] Optional encoder\n * to run before broadcasting.\n * @property {string} [method='emit'] An alternative emit method.\n * @property {EmitterPubsubBroker.Serialize}\n * [serialize=msgpack.encode] Serialisation function to use with a\n * connector.\n * @property {EmitterPubsubBroker.Deserialize}\n * [deserialize=msgpack.decode] Deserialisation function to use with a\n * connector.\n * @property {Connector} [connector] Custom connector implementation.\n */\n\n/**\n * @extends EventEmitter\n */\nclass EmitterPubsubBroker extends EventEmitter {\n  /**\n   * Creates a broker.\n   *\n   * @param {EmitterPubsubBroker.Options|string} options Options or a\n   * connect if a string. If connect string is empty, then an\n   * in-memory connector is used.\n   */\n  constructor (options) {\n    super()\n    if (options == null || typeof options === 'string') {\n      options = { connect: options }\n    }\n    this.prefix = options.prefix || 'emitter-pubsub-broker:'\n    this.method = options.method || 'emit'\n    this.serialize = options.serialize || msgpack.encode\n    this.deserialize = options.deserialize || msgpack.decode\n    this.encoder = options.encoder\n    this.includeChannel = options.includeChannel\n    this.clientChannels = new Map()\n    this.channelClients = new Map()\n    if (options.connect || options.connector) {\n      this.connector = options.connector || new RedisConnector(options.connect)\n      this.useSerialization = true\n    } else {\n      this.connector = new MemoryConnector()\n      this.useSerialization = false\n    }\n    this.connector.on('message', this._dispatch.bind(this))\n    /**\n     * Connector error. Does not throw if there are no listeners.\n     *\n     * @event error\n     * @memberOf EmitterPubsubBroker\n     * @param {Error} error Error.\n     */\n    this.connector.on('error', this.emit.bind(this))\n  }\n\n  _channelAddClient (client, channel) {\n    let clients = this.channelClients.get(channel)\n    if (clients == null) {\n      clients = new Set()\n      this.channelClients.set(channel, clients)\n      clients.add(client)\n      let ch = this.prefix + channel\n      return this.connector.subscribe(ch)\n    } else {\n      clients.add(client)\n      return Promise.resolve()\n    }\n  }\n\n  _channelRemoveClient (client, channel) {\n    let clients = this.channelClients.get(channel)\n    let nclients\n    if (clients != null) {\n      clients.delete(client)\n      nclients = clients.size\n    }\n    if (nclients === 0) {\n      let ch = this.prefix + channel\n      return this.connector.unsubscribe(ch)\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  _makeMessage (msg) {\n    return Promise.try(() => this.useSerialization ? this.serialize(msg) : msg)\n  }\n\n  _unpackMessage (data) {\n    return Promise.try(() => this.useSerialization ? this.deserialize(data) : data)\n  }\n\n  /**\n   * Subscribes emitter to a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  subscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    if (!channels) {\n      channels = new Set()\n      this.clientChannels.set(client, channels)\n    }\n    channels.add(channel)\n    return this._channelAddClient(client, channel)\n  }\n\n  /**\n   * Unsubscribes emitter from a channel.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @return {Promise<undefined>}\n   */\n  unsubscribe (client, channel) {\n    let channels = this.clientChannels.get(client)\n    if (channels) {\n      channels.delete(channel)\n    }\n    return this._channelRemoveClient(client, channel)\n  }\n\n  /**\n   * Unsubscribes emitter from all channels.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Promise<undefined>}\n   */\n  unsubscribeAll (client) {\n    let channels = this.clientChannels.get(client)\n    this.clientChannels.delete(client)\n    if (channels) {\n      return Promise.each(\n        channels, this._channelRemoveClient.bind(this, client))\n    } else {\n      return Promise.resolve()\n    }\n  }\n\n  /**\n   * Publish an event to a channel.\n   *\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  publish (channel, name, ...args) {\n    let ch = this.prefix + channel\n    return this._makeMessage({name, args})\n      .then(msg => this.connector.publish(ch, msg))\n  }\n\n  /**\n   * Publish an event to a channel, excluding the sender. The client\n   * object __MUST__ have an unique `id` field.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @param {string} channel Channel.\n   * @param {string} name Event name.\n   * @param {*} args Arguments.\n   * @return {Promise<undefined>}\n   */\n  send (client, channel, name, ...args) {\n    let ch = this.prefix + channel\n    let sender = client.id\n    return this._makeMessage({sender, name, args})\n      .then(msg => this.connector.publish(ch, msg))\n  }\n\n  /**\n   * Returns set of client subscriptions. The result __MUST NOT__ be\n   * modified.\n   *\n   * @param {EventEmitter} client Emitter.\n   * @return {Set<string>|undefined}\n   */\n  getSubscriptions (client) {\n    return this.clientChannels.get(client)\n  }\n\n  /**\n   * Returns _internal_ set of channel clients of EmitterPubsubBroker\n   * instance. The result __MUST NOT__ be modified.\n   *\n   * @param {string} channel Channel.\n   * @return {Set<EventEmitter>|undefined}\n   */\n  getClients (channel) {\n    return this.channelClients.get(channel)\n  }\n\n  /**\n   * Closes broker.\n   *\n   * @return {Promise<undefined>}\n   */\n  close () {\n    this.channelClients.clear()\n    this.clientChannels.clear()\n    return this.connector.close()\n  }\n\n  _dispatch (ch, data) {\n    this._unpackMessage(data).then(message => {\n      let channel = ch.slice(this.prefix.length)\n      let clients = this.channelClients.get(channel)\n      /* istanbul ignore else */\n      if (clients) {\n        let args = this.includeChannel\n          ? [message.name, channel, ...message.args]\n          : [message.name, ...message.args]\n        Promise.try(() => this.encoder ? this.encoder(args) : args).then(data => {\n          const method = this.method\n          const encoder = this.encoder\n          const sender = message.sender\n          clients.forEach(client => {\n            if (!sender || client.id !== sender) {\n              Promise\n                .try(() => encoder ? client[method](data) : client[method](...data))\n                .catchReturn()\n            }\n          })\n        })\n      }\n    })\n  }\n\n}\n\n// compatibility\nEmitterPubsubBroker.prototype.unsubscribeall = EmitterPubsubBroker.prototype.unsubscribeAll\n\nmodule.exports = EmitterPubsubBroker\n"]} |
{ | ||
"name": "emitter-pubsub-broker", | ||
"version": "0.4.0", | ||
"version": "0.5.0", | ||
"private": false, | ||
@@ -5,0 +5,0 @@ "description": "An utility for connecting EventEmitters via a pubsub.", |
@@ -49,3 +49,3 @@ | ||
[API](https://an-sh.github.io/emitter-pubsub-broker/0.4/index.html) | ||
[API](https://an-sh.github.io/emitter-pubsub-broker/0.5/index.html) | ||
documentation is available online. | ||
@@ -52,0 +52,0 @@ |
@@ -123,8 +123,47 @@ 'use strict' | ||
/** | ||
* Messages encoder. Encoded messages will be used to emit messages | ||
* via event emitters. May also return promises for an asynchronous | ||
* execution. | ||
* | ||
* @callback EmitterPubsubBroker.Encoder | ||
* @param {*} args Emit arguments. | ||
* @return {Promise<Object>|Object} Data to send. | ||
*/ | ||
/** | ||
* Messages serialisation. Serialised messages will be used internally | ||
* via a communication {@link Connector}. May also return promises | ||
* for an asynchronous execution. | ||
* | ||
* @callback EmitterPubsubBroker.Serialize | ||
* @param {Object} data Data. | ||
* @return {Promise<Object>|Object} Serialised data. | ||
*/ | ||
/** | ||
* Messages deserialisation. Serialised messages will be used | ||
* internally via a communication {@link Connector}. May also return | ||
* promises for an asynchronous execution. | ||
* | ||
* @callback EmitterPubsubBroker.Deserialize | ||
* @param {Object} data Serialised data. | ||
* @return {Promise<Object>|Object} Data. | ||
*/ | ||
/** | ||
* @typedef {Object} EmitterPubsubBroker.Options | ||
* | ||
* @property {string} connect Connect string. | ||
* @property {string} [prefix='emitter-pubsub-broker:'] Prefix. | ||
* @property {string} [connect] Connect string for a connector. | ||
* @property {string} [prefix='emitter-pubsub-broker:'] Prefix for a connector. | ||
* @property {boolean} [includeChannel=false] Include channel as the | ||
* first argument. | ||
* @property {EmitterPubsubBroker.Encoder} [encoder] Optional encoder | ||
* to run before broadcasting. | ||
* @property {string} [method='emit'] An alternative emit method. | ||
* @property {EmitterPubsubBroker.Serialize} | ||
* [serialize=msgpack.encode] Serialisation function to use with a | ||
* connector. | ||
* @property {EmitterPubsubBroker.Deserialize} | ||
* [deserialize=msgpack.decode] Deserialisation function to use with a | ||
* connector. | ||
* @property {Connector} [connector] Custom connector implementation. | ||
@@ -150,2 +189,6 @@ */ | ||
this.prefix = options.prefix || 'emitter-pubsub-broker:' | ||
this.method = options.method || 'emit' | ||
this.serialize = options.serialize || msgpack.encode | ||
this.deserialize = options.deserialize || msgpack.decode | ||
this.encoder = options.encoder | ||
this.includeChannel = options.includeChannel | ||
@@ -156,6 +199,6 @@ this.clientChannels = new Map() | ||
this.connector = options.connector || new RedisConnector(options.connect) | ||
this.serialize = true | ||
this.useSerialization = true | ||
} else { | ||
this.connector = new MemoryConnector() | ||
this.serialize = false | ||
this.useSerialization = false | ||
} | ||
@@ -173,8 +216,9 @@ this.connector.on('message', this._dispatch.bind(this)) | ||
_channelAddClient (client, ch) { | ||
let clients = this.channelClients.get(ch) | ||
_channelAddClient (client, channel) { | ||
let clients = this.channelClients.get(channel) | ||
if (clients == null) { | ||
clients = new Set() | ||
this.channelClients.set(ch, clients) | ||
this.channelClients.set(channel, clients) | ||
clients.add(client) | ||
let ch = this.prefix + channel | ||
return this.connector.subscribe(ch) | ||
@@ -187,4 +231,4 @@ } else { | ||
_channelRemoveClient (client, ch) { | ||
let clients = this.channelClients.get(ch) | ||
_channelRemoveClient (client, channel) { | ||
let clients = this.channelClients.get(channel) | ||
let nclients | ||
@@ -196,2 +240,3 @@ if (clients != null) { | ||
if (nclients === 0) { | ||
let ch = this.prefix + channel | ||
return this.connector.unsubscribe(ch) | ||
@@ -204,5 +249,9 @@ } else { | ||
_makeMessage (msg) { | ||
return this.serialize ? msgpack.encode(msg) : msg | ||
return Promise.try(() => this.useSerialization ? this.serialize(msg) : msg) | ||
} | ||
_unpackMessage (data) { | ||
return Promise.try(() => this.useSerialization ? this.deserialize(data) : data) | ||
} | ||
/** | ||
@@ -221,5 +270,4 @@ * Subscribes emitter to a channel. | ||
} | ||
let ch = this.prefix + channel | ||
channels.add(ch) | ||
return this._channelAddClient(client, ch) | ||
channels.add(channel) | ||
return this._channelAddClient(client, channel) | ||
} | ||
@@ -236,11 +284,10 @@ | ||
let channels = this.clientChannels.get(client) | ||
let ch = this.prefix + channel | ||
if (channels) { | ||
channels.delete(ch) | ||
channels.delete(channel) | ||
} | ||
return this._channelRemoveClient(client, ch) | ||
return this._channelRemoveClient(client, channel) | ||
} | ||
/** | ||
* Unsubscribes emitter from all channel. | ||
* Unsubscribes emitter from all channels. | ||
* | ||
@@ -250,3 +297,3 @@ * @param {EventEmitter} client Emitter. | ||
*/ | ||
unsubscribeall (client) { | ||
unsubscribeAll (client) { | ||
let channels = this.clientChannels.get(client) | ||
@@ -272,4 +319,4 @@ this.clientChannels.delete(client) | ||
let ch = this.prefix + channel | ||
let msg = this._makeMessage({name, args}) | ||
return this.connector.publish(ch, msg) | ||
return this._makeMessage({name, args}) | ||
.then(msg => this.connector.publish(ch, msg)) | ||
} | ||
@@ -290,25 +337,29 @@ | ||
let sender = client.id | ||
let msg = this._makeMessage({sender, name, args}) | ||
return this.connector.publish(ch, msg) | ||
return this._makeMessage({sender, name, args}) | ||
.then(msg => this.connector.publish(ch, msg)) | ||
} | ||
/** | ||
* Returns client subscriptions. | ||
* Returns set of client subscriptions. The result __MUST NOT__ be | ||
* modified. | ||
* | ||
* @param {EventEmitter} client Emitter. | ||
* @return {Array<string>} | ||
* @return {Set<string>|undefined} | ||
*/ | ||
getSubscriptions (client) { | ||
let channels = this.clientChannels.get(client) | ||
let res = [] | ||
if (channels) { | ||
let plen = this.prefix.length | ||
for (let channel of channels) { | ||
res.push(channel.slice(plen)) | ||
} | ||
} | ||
return res | ||
return this.clientChannels.get(client) | ||
} | ||
/** | ||
* Returns _internal_ set of channel clients of EmitterPubsubBroker | ||
* instance. The result __MUST NOT__ be modified. | ||
* | ||
* @param {string} channel Channel. | ||
* @return {Set<EventEmitter>|undefined} | ||
*/ | ||
getClients (channel) { | ||
return this.channelClients.get(channel) | ||
} | ||
/** | ||
* Closes broker. | ||
@@ -325,19 +376,24 @@ * | ||
_dispatch (ch, data) { | ||
let channel = ch.slice(this.prefix.length) | ||
let message = this.serialize ? msgpack.decode(data) : data | ||
let clients = this.channelClients.get(ch) | ||
/* istanbul ignore else */ | ||
if (clients) { | ||
let args | ||
if (this.includeChannel) { | ||
args = [message.name, channel, ...message.args] | ||
} else { | ||
args = [message.name, ...message.args] | ||
this._unpackMessage(data).then(message => { | ||
let channel = ch.slice(this.prefix.length) | ||
let clients = this.channelClients.get(channel) | ||
/* istanbul ignore else */ | ||
if (clients) { | ||
let args = this.includeChannel | ||
? [message.name, channel, ...message.args] | ||
: [message.name, ...message.args] | ||
Promise.try(() => this.encoder ? this.encoder(args) : args).then(data => { | ||
const method = this.method | ||
const encoder = this.encoder | ||
const sender = message.sender | ||
clients.forEach(client => { | ||
if (!sender || client.id !== sender) { | ||
Promise | ||
.try(() => encoder ? client[method](data) : client[method](...data)) | ||
.catchReturn() | ||
} | ||
}) | ||
}) | ||
} | ||
for (let client of clients) { | ||
if (!message.sender || client.id !== message.sender) { | ||
client.emit(...args) | ||
} | ||
} | ||
} | ||
}) | ||
} | ||
@@ -347,2 +403,5 @@ | ||
// compatibility | ||
EmitterPubsubBroker.prototype.unsubscribeall = EmitterPubsubBroker.prototype.unsubscribeAll | ||
module.exports = EmitterPubsubBroker |
@@ -6,2 +6,4 @@ 'use strict' | ||
const eventToPromise = require('event-to-promise') | ||
const msgpack = require('msgpack-lite') | ||
const Promise = require('bluebird') | ||
const { expect } = require('chai') | ||
@@ -46,2 +48,32 @@ const { EventEmitter } = require('events') | ||
it('should emit published encoded messages', function (done) { | ||
broker = new EmitterPubsubBroker({connect, encoder: JSON.stringify, method: 'send'}) | ||
let client = new EventEmitter() | ||
client.send = function (args) { | ||
let [ev, x, y] = JSON.parse(args) | ||
expect(ev).equal('myEvent') | ||
expect(x).equal(1) | ||
expect(y).equal('2') | ||
done() | ||
} | ||
broker.subscribe(client, 'my-channel').then(() => { | ||
broker.publish('my-channel', 'myEvent', 1, '2') | ||
}) | ||
}) | ||
it('should use custom serialisation', function () { | ||
let serialize = (data) => Promise.try(() => msgpack.encode(data)) | ||
let deserialize = (data) => Promise.try(() => msgpack.decode(data)) | ||
broker = new EmitterPubsubBroker({connect, serialize, deserialize}) | ||
let client = new EventEmitter() | ||
return broker.subscribe(client, 'my-channel').then(() => { | ||
broker.publish('my-channel', 'myEvent', 1, '2') | ||
return eventToPromise(client, 'myEvent', {array: true}).then(args => { | ||
let [x, y] = args | ||
expect(x).equal(1) | ||
expect(y).equal('2') | ||
}) | ||
}) | ||
}) | ||
it('should prepend a channel argument', function () { | ||
@@ -87,4 +119,5 @@ broker = new EmitterPubsubBroker({connect, includeChannel: true}) | ||
eventToPromise(client1, 'myEvent').then(notReachable) | ||
return Promise.all([eventToPromise(client2, 'myEvent'), | ||
new Promise(resolve => setTimeout(resolve, 1000))]) | ||
return Promise.all([ | ||
eventToPromise(client2, 'myEvent'), | ||
new Promise(resolve => setTimeout(resolve, 1000))]) | ||
}) | ||
@@ -96,12 +129,28 @@ }) | ||
let client1 = new EventEmitter() | ||
return Promise.all([broker.subscribe(client1, 'my-channel'), | ||
broker.subscribe(client1, 'channel')]) | ||
return Promise.all([ | ||
broker.subscribe(client1, 'my-channel'), | ||
broker.subscribe(client1, 'channel')]) | ||
.then(() => { | ||
let subs = broker.getSubscriptions(client1) | ||
expect(subs).lengthOf(2) | ||
expect(subs).include('my-channel') | ||
expect(subs).include('channel') | ||
expect(subs.size).equal(2) | ||
expect(subs.has('my-channel')).true | ||
expect(subs.has('channel')).true | ||
}) | ||
}) | ||
it('should get clients set', function () { | ||
broker = new EmitterPubsubBroker(connect) | ||
let client1 = new EventEmitter() | ||
let client2 = new EventEmitter() | ||
return Promise.all([ | ||
broker.subscribe(client1, 'channel'), | ||
broker.subscribe(client2, 'channel')]) | ||
.then(() => { | ||
let subs = broker.getClients('channel') | ||
expect(subs.size).equal(2) | ||
expect(subs.has(client1)).true | ||
expect(subs.has(client2)).true | ||
}) | ||
}) | ||
it('should unsubscribe all', function () { | ||
@@ -112,5 +161,6 @@ this.timeout(4000) | ||
let client1 = new EventEmitter() | ||
return Promise.all([broker.subscribe(client1, 'my-channel'), | ||
broker.subscribe(client1, 'channel')]) | ||
.then(() => broker.unsubscribeall(client1)) | ||
return Promise.all([ | ||
broker.subscribe(client1, 'my-channel'), | ||
broker.subscribe(client1, 'channel')]) | ||
.then(() => broker.unsubscribeAll(client1)) | ||
.then(() => { | ||
@@ -130,8 +180,8 @@ expect(broker.getSubscriptions(client1)).empty | ||
it('should handle non-existent unsubscribeall', function () { | ||
it('should handle non-existent unsubscribeAll', function () { | ||
broker = new EmitterPubsubBroker(connect) | ||
let client1 = new EventEmitter() | ||
return broker.unsubscribeall(client1) | ||
return broker.unsubscribeAll(client1) | ||
}) | ||
})) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
59703
912