emitter-pubsub-broker
Advanced tools
Comparing version 0.4.0 to 0.5.0
@@ -5,2 +5,32 @@ # Change Log | ||
<a name="0.5.0"></a> | ||
# [0.5.0](https://github.com/an-sh/emitter-pubsub-broker/compare/v0.4.0...v0.5.0) (2017-01-06) | ||
### Bug Fixes | ||
* ignore broadcast errors ([e895e96](https://github.com/an-sh/emitter-pubsub-broker/commit/e895e96)) | ||
### Code Refactoring | ||
* change getSubscriptions return type ([338a47a](https://github.com/an-sh/emitter-pubsub-broker/commit/338a47a)) | ||
* rename unsubscribeall to unsubscribeAll ([3a25f07](https://github.com/an-sh/emitter-pubsub-broker/commit/3a25f07)) | ||
### Features | ||
* add custom serialisation support ([c3f218d](https://github.com/an-sh/emitter-pubsub-broker/commit/c3f218d)) | ||
* add getClients ([f728105](https://github.com/an-sh/emitter-pubsub-broker/commit/f728105)) | ||
* support broadcast data encoding ([392c173](https://github.com/an-sh/emitter-pubsub-broker/commit/392c173)) | ||
### BREAKING CHANGES | ||
* Now getSubscriptions method returns a shared Set | ||
instead of a fresh Array. | ||
* Rename unsubscribeall to unsubscribeAll. | ||
<a name="0.4.0"></a> | ||
@@ -7,0 +37,0 @@ # [0.4.0](https://github.com/an-sh/emitter-pubsub-broker/compare/v0.3.0...v0.4.0) (2016-11-29) |
@@ -127,8 +127,47 @@ 'use strict'; | ||
/** | ||
* Messages encoder. Encoded messages will be used to emit messages | ||
* via event emitters. May also return promises for an asynchronous | ||
* execution. | ||
* | ||
* @callback EmitterPubsubBroker.Encoder | ||
* @param {*} args Emit arguments. | ||
* @return {Promise<Object>|Object} Data to send. | ||
*/ | ||
/** | ||
* Messages serialisation. Serialised messages will be used internally | ||
* via a communication {@link Connector}. May also return promises | ||
* for an asynchronous execution. | ||
* | ||
* @callback EmitterPubsubBroker.Serialize | ||
* @param {Object} data Data. | ||
* @return {Promise<Object>|Object} Serialised data. | ||
*/ | ||
/** | ||
* Messages deserialisation. Serialised messages will be used | ||
* internally via a communication {@link Connector}. May also return | ||
* promises for an asynchronous execution. | ||
* | ||
* @callback EmitterPubsubBroker.Deserialize | ||
* @param {Object} data Serialised data. | ||
* @return {Promise<Object>|Object} Data. | ||
*/ | ||
/** | ||
* @typedef {Object} EmitterPubsubBroker.Options | ||
* | ||
* @property {string} connect Connect string. | ||
* @property {string} [prefix='emitter-pubsub-broker:'] Prefix. | ||
* @property {string} [connect] Connect string for a connector. | ||
* @property {string} [prefix='emitter-pubsub-broker:'] Prefix for a connector. | ||
* @property {boolean} [includeChannel=false] Include channel as the | ||
* first argument. | ||
* @property {EmitterPubsubBroker.Encoder} [encoder] Optional encoder | ||
* to run before broadcasting. | ||
* @property {string} [method='emit'] An alternative emit method. | ||
* @property {EmitterPubsubBroker.Serialize} | ||
* [serialize=msgpack.encode] Serialisation function to use with a | ||
* connector. | ||
* @property {EmitterPubsubBroker.Deserialize} | ||
* [deserialize=msgpack.decode] Deserialisation function to use with a | ||
* connector. | ||
* @property {Connector} [connector] Custom connector implementation. | ||
@@ -154,2 +193,6 @@ */ | ||
this.prefix = options.prefix || 'emitter-pubsub-broker:'; | ||
this.method = options.method || 'emit'; | ||
this.serialize = options.serialize || msgpack.encode; | ||
this.deserialize = options.deserialize || msgpack.decode; | ||
this.encoder = options.encoder; | ||
this.includeChannel = options.includeChannel; | ||
@@ -160,6 +203,6 @@ this.clientChannels = new Map(); | ||
this.connector = options.connector || new RedisConnector(options.connect); | ||
this.serialize = true; | ||
this.useSerialization = true; | ||
} else { | ||
this.connector = new MemoryConnector(); | ||
this.serialize = false; | ||
this.useSerialization = false; | ||
} | ||
@@ -177,8 +220,9 @@ this.connector.on('message', this._dispatch.bind(this)); | ||
_channelAddClient(client, ch) { | ||
let clients = this.channelClients.get(ch); | ||
_channelAddClient(client, channel) { | ||
let clients = this.channelClients.get(channel); | ||
if (clients == null) { | ||
clients = new Set(); | ||
this.channelClients.set(ch, clients); | ||
this.channelClients.set(channel, clients); | ||
clients.add(client); | ||
let ch = this.prefix + channel; | ||
return this.connector.subscribe(ch); | ||
@@ -191,4 +235,4 @@ } else { | ||
_channelRemoveClient(client, ch) { | ||
let clients = this.channelClients.get(ch); | ||
_channelRemoveClient(client, channel) { | ||
let clients = this.channelClients.get(channel); | ||
let nclients; | ||
@@ -200,2 +244,3 @@ if (clients != null) { | ||
if (nclients === 0) { | ||
let ch = this.prefix + channel; | ||
return this.connector.unsubscribe(ch); | ||
@@ -208,5 +253,9 @@ } else { | ||
_makeMessage(msg) { | ||
return this.serialize ? msgpack.encode(msg) : msg; | ||
return Promise.try(() => this.useSerialization ? this.serialize(msg) : msg); | ||
} | ||
_unpackMessage(data) { | ||
return Promise.try(() => this.useSerialization ? this.deserialize(data) : data); | ||
} | ||
/** | ||
@@ -225,5 +274,4 @@ * Subscribes emitter to a channel. | ||
} | ||
let ch = this.prefix + channel; | ||
channels.add(ch); | ||
return this._channelAddClient(client, ch); | ||
channels.add(channel); | ||
return this._channelAddClient(client, channel); | ||
} | ||
@@ -240,11 +288,10 @@ | ||
let channels = this.clientChannels.get(client); | ||
let ch = this.prefix + channel; | ||
if (channels) { | ||
channels.delete(ch); | ||
channels.delete(channel); | ||
} | ||
return this._channelRemoveClient(client, ch); | ||
return this._channelRemoveClient(client, channel); | ||
} | ||
/** | ||
* Unsubscribes emitter from all channel. | ||
* Unsubscribes emitter from all channels. | ||
* | ||
@@ -254,3 +301,3 @@ * @param {EventEmitter} client Emitter. | ||
*/ | ||
unsubscribeall(client) { | ||
unsubscribeAll(client) { | ||
let channels = this.clientChannels.get(client); | ||
@@ -279,4 +326,3 @@ this.clientChannels.delete(client); | ||
let ch = this.prefix + channel; | ||
let msg = this._makeMessage({ name: name, args: args }); | ||
return this.connector.publish(ch, msg); | ||
return this._makeMessage({ name: name, args: args }).then(msg => this.connector.publish(ch, msg)); | ||
} | ||
@@ -301,25 +347,28 @@ | ||
let sender = client.id; | ||
let msg = this._makeMessage({ sender: sender, name: name, args: args }); | ||
return this.connector.publish(ch, msg); | ||
return this._makeMessage({ sender: sender, name: name, args: args }).then(msg => this.connector.publish(ch, msg)); | ||
} | ||
/** | ||
* Returns client subscriptions. | ||
* Returns set of client subscriptions. The result __MUST NOT__ be | ||
* modified. | ||
* | ||
* @param {EventEmitter} client Emitter. | ||
* @return {Array<string>} | ||
* @return {Set<string>|undefined} | ||
*/ | ||
getSubscriptions(client) { | ||
let channels = this.clientChannels.get(client); | ||
let res = []; | ||
if (channels) { | ||
let plen = this.prefix.length; | ||
for (let channel of channels) { | ||
res.push(channel.slice(plen)); | ||
} | ||
} | ||
return res; | ||
return this.clientChannels.get(client); | ||
} | ||
/** | ||
* Returns _internal_ set of channel clients of EmitterPubsubBroker | ||
* instance. The result __MUST NOT__ be modified. | ||
* | ||
* @param {string} channel Channel. | ||
* @return {Set<EventEmitter>|undefined} | ||
*/ | ||
getClients(channel) { | ||
return this.channelClients.get(channel); | ||
} | ||
/** | ||
* Closes broker. | ||
@@ -336,19 +385,20 @@ * | ||
_dispatch(ch, data) { | ||
let channel = ch.slice(this.prefix.length); | ||
let message = this.serialize ? msgpack.decode(data) : data; | ||
let clients = this.channelClients.get(ch); | ||
/* istanbul ignore else */ | ||
if (clients) { | ||
let args; | ||
if (this.includeChannel) { | ||
args = [message.name, channel].concat(_toConsumableArray(message.args)); | ||
} else { | ||
args = [message.name].concat(_toConsumableArray(message.args)); | ||
this._unpackMessage(data).then(message => { | ||
let channel = ch.slice(this.prefix.length); | ||
let clients = this.channelClients.get(channel); | ||
/* istanbul ignore else */ | ||
if (clients) { | ||
let args = this.includeChannel ? [message.name, channel].concat(_toConsumableArray(message.args)) : [message.name].concat(_toConsumableArray(message.args)); | ||
Promise.try(() => this.encoder ? this.encoder(args) : args).then(data => { | ||
const method = this.method; | ||
const encoder = this.encoder; | ||
const sender = message.sender; | ||
clients.forEach(client => { | ||
if (!sender || client.id !== sender) { | ||
Promise.try(() => encoder ? client[method](data) : client[method].apply(client, _toConsumableArray(data))).catchReturn(); | ||
} | ||
}); | ||
}); | ||
} | ||
for (let client of clients) { | ||
if (!message.sender || client.id !== message.sender) { | ||
client.emit.apply(client, _toConsumableArray(args)); | ||
} | ||
} | ||
} | ||
}); | ||
} | ||
@@ -358,3 +408,6 @@ | ||
// compatibility | ||
EmitterPubsubBroker.prototype.unsubscribeall = EmitterPubsubBroker.prototype.unsubscribeAll; | ||
module.exports = EmitterPubsubBroker; | ||
//# sourceMappingURL=data:application/json;base64, | ||
//# sourceMappingURL=data:application/json;base64, |
{ | ||
"name": "emitter-pubsub-broker", | ||
"version": "0.4.0", | ||
"version": "0.5.0", | ||
"private": false, | ||
@@ -5,0 +5,0 @@ "description": "An utility for connecting EventEmitters via a pubsub.", |
@@ -49,3 +49,3 @@ | ||
[API](https://an-sh.github.io/emitter-pubsub-broker/0.4/index.html) | ||
[API](https://an-sh.github.io/emitter-pubsub-broker/0.5/index.html) | ||
documentation is available online. | ||
@@ -52,0 +52,0 @@ |
@@ -123,8 +123,47 @@ 'use strict' | ||
/** | ||
* Messages encoder. Encoded messages will be used to emit messages | ||
* via event emitters. May also return promises for an asynchronous | ||
* execution. | ||
* | ||
* @callback EmitterPubsubBroker.Encoder | ||
* @param {*} args Emit arguments. | ||
* @return {Promise<Object>|Object} Data to send. | ||
*/ | ||
/** | ||
* Messages serialisation. Serialised messages will be used internally | ||
* via a communication {@link Connector}. May also return promises | ||
* for an asynchronous execution. | ||
* | ||
* @callback EmitterPubsubBroker.Serialize | ||
* @param {Object} data Data. | ||
* @return {Promise<Object>|Object} Serialised data. | ||
*/ | ||
/** | ||
* Messages deserialisation. Serialised messages will be used | ||
* internally via a communication {@link Connector}. May also return | ||
* promises for an asynchronous execution. | ||
* | ||
* @callback EmitterPubsubBroker.Deserialize | ||
* @param {Object} data Serialised data. | ||
* @return {Promise<Object>|Object} Data. | ||
*/ | ||
/** | ||
* @typedef {Object} EmitterPubsubBroker.Options | ||
* | ||
* @property {string} connect Connect string. | ||
* @property {string} [prefix='emitter-pubsub-broker:'] Prefix. | ||
* @property {string} [connect] Connect string for a connector. | ||
* @property {string} [prefix='emitter-pubsub-broker:'] Prefix for a connector. | ||
* @property {boolean} [includeChannel=false] Include channel as the | ||
* first argument. | ||
* @property {EmitterPubsubBroker.Encoder} [encoder] Optional encoder | ||
* to run before broadcasting. | ||
* @property {string} [method='emit'] An alternative emit method. | ||
* @property {EmitterPubsubBroker.Serialize} | ||
* [serialize=msgpack.encode] Serialisation function to use with a | ||
* connector. | ||
* @property {EmitterPubsubBroker.Deserialize} | ||
* [deserialize=msgpack.decode] Deserialisation function to use with a | ||
* connector. | ||
* @property {Connector} [connector] Custom connector implementation. | ||
@@ -150,2 +189,6 @@ */ | ||
this.prefix = options.prefix || 'emitter-pubsub-broker:' | ||
this.method = options.method || 'emit' | ||
this.serialize = options.serialize || msgpack.encode | ||
this.deserialize = options.deserialize || msgpack.decode | ||
this.encoder = options.encoder | ||
this.includeChannel = options.includeChannel | ||
@@ -156,6 +199,6 @@ this.clientChannels = new Map() | ||
this.connector = options.connector || new RedisConnector(options.connect) | ||
this.serialize = true | ||
this.useSerialization = true | ||
} else { | ||
this.connector = new MemoryConnector() | ||
this.serialize = false | ||
this.useSerialization = false | ||
} | ||
@@ -173,8 +216,9 @@ this.connector.on('message', this._dispatch.bind(this)) | ||
_channelAddClient (client, ch) { | ||
let clients = this.channelClients.get(ch) | ||
_channelAddClient (client, channel) { | ||
let clients = this.channelClients.get(channel) | ||
if (clients == null) { | ||
clients = new Set() | ||
this.channelClients.set(ch, clients) | ||
this.channelClients.set(channel, clients) | ||
clients.add(client) | ||
let ch = this.prefix + channel | ||
return this.connector.subscribe(ch) | ||
@@ -187,4 +231,4 @@ } else { | ||
_channelRemoveClient (client, ch) { | ||
let clients = this.channelClients.get(ch) | ||
_channelRemoveClient (client, channel) { | ||
let clients = this.channelClients.get(channel) | ||
let nclients | ||
@@ -196,2 +240,3 @@ if (clients != null) { | ||
if (nclients === 0) { | ||
let ch = this.prefix + channel | ||
return this.connector.unsubscribe(ch) | ||
@@ -204,5 +249,9 @@ } else { | ||
_makeMessage (msg) { | ||
return this.serialize ? msgpack.encode(msg) : msg | ||
return Promise.try(() => this.useSerialization ? this.serialize(msg) : msg) | ||
} | ||
_unpackMessage (data) { | ||
return Promise.try(() => this.useSerialization ? this.deserialize(data) : data) | ||
} | ||
/** | ||
@@ -221,5 +270,4 @@ * Subscribes emitter to a channel. | ||
} | ||
let ch = this.prefix + channel | ||
channels.add(ch) | ||
return this._channelAddClient(client, ch) | ||
channels.add(channel) | ||
return this._channelAddClient(client, channel) | ||
} | ||
@@ -236,11 +284,10 @@ | ||
let channels = this.clientChannels.get(client) | ||
let ch = this.prefix + channel | ||
if (channels) { | ||
channels.delete(ch) | ||
channels.delete(channel) | ||
} | ||
return this._channelRemoveClient(client, ch) | ||
return this._channelRemoveClient(client, channel) | ||
} | ||
/** | ||
* Unsubscribes emitter from all channel. | ||
* Unsubscribes emitter from all channels. | ||
* | ||
@@ -250,3 +297,3 @@ * @param {EventEmitter} client Emitter. | ||
*/ | ||
unsubscribeall (client) { | ||
unsubscribeAll (client) { | ||
let channels = this.clientChannels.get(client) | ||
@@ -272,4 +319,4 @@ this.clientChannels.delete(client) | ||
let ch = this.prefix + channel | ||
let msg = this._makeMessage({name, args}) | ||
return this.connector.publish(ch, msg) | ||
return this._makeMessage({name, args}) | ||
.then(msg => this.connector.publish(ch, msg)) | ||
} | ||
@@ -290,25 +337,29 @@ | ||
let sender = client.id | ||
let msg = this._makeMessage({sender, name, args}) | ||
return this.connector.publish(ch, msg) | ||
return this._makeMessage({sender, name, args}) | ||
.then(msg => this.connector.publish(ch, msg)) | ||
} | ||
/** | ||
* Returns client subscriptions. | ||
* Returns set of client subscriptions. The result __MUST NOT__ be | ||
* modified. | ||
* | ||
* @param {EventEmitter} client Emitter. | ||
* @return {Array<string>} | ||
* @return {Set<string>|undefined} | ||
*/ | ||
getSubscriptions (client) { | ||
let channels = this.clientChannels.get(client) | ||
let res = [] | ||
if (channels) { | ||
let plen = this.prefix.length | ||
for (let channel of channels) { | ||
res.push(channel.slice(plen)) | ||
} | ||
} | ||
return res | ||
return this.clientChannels.get(client) | ||
} | ||
/** | ||
* Returns _internal_ set of channel clients of EmitterPubsubBroker | ||
* instance. The result __MUST NOT__ be modified. | ||
* | ||
* @param {string} channel Channel. | ||
* @return {Set<EventEmitter>|undefined} | ||
*/ | ||
getClients (channel) { | ||
return this.channelClients.get(channel) | ||
} | ||
/** | ||
* Closes broker. | ||
@@ -325,19 +376,24 @@ * | ||
_dispatch (ch, data) { | ||
let channel = ch.slice(this.prefix.length) | ||
let message = this.serialize ? msgpack.decode(data) : data | ||
let clients = this.channelClients.get(ch) | ||
/* istanbul ignore else */ | ||
if (clients) { | ||
let args | ||
if (this.includeChannel) { | ||
args = [message.name, channel, ...message.args] | ||
} else { | ||
args = [message.name, ...message.args] | ||
this._unpackMessage(data).then(message => { | ||
let channel = ch.slice(this.prefix.length) | ||
let clients = this.channelClients.get(channel) | ||
/* istanbul ignore else */ | ||
if (clients) { | ||
let args = this.includeChannel | ||
? [message.name, channel, ...message.args] | ||
: [message.name, ...message.args] | ||
Promise.try(() => this.encoder ? this.encoder(args) : args).then(data => { | ||
const method = this.method | ||
const encoder = this.encoder | ||
const sender = message.sender | ||
clients.forEach(client => { | ||
if (!sender || client.id !== sender) { | ||
Promise | ||
.try(() => encoder ? client[method](data) : client[method](...data)) | ||
.catchReturn() | ||
} | ||
}) | ||
}) | ||
} | ||
for (let client of clients) { | ||
if (!message.sender || client.id !== message.sender) { | ||
client.emit(...args) | ||
} | ||
} | ||
} | ||
}) | ||
} | ||
@@ -347,2 +403,5 @@ | ||
// compatibility | ||
EmitterPubsubBroker.prototype.unsubscribeall = EmitterPubsubBroker.prototype.unsubscribeAll | ||
module.exports = EmitterPubsubBroker |
@@ -6,2 +6,4 @@ 'use strict' | ||
const eventToPromise = require('event-to-promise') | ||
const msgpack = require('msgpack-lite') | ||
const Promise = require('bluebird') | ||
const { expect } = require('chai') | ||
@@ -46,2 +48,32 @@ const { EventEmitter } = require('events') | ||
it('should emit published encoded messages', function (done) { | ||
broker = new EmitterPubsubBroker({connect, encoder: JSON.stringify, method: 'send'}) | ||
let client = new EventEmitter() | ||
client.send = function (args) { | ||
let [ev, x, y] = JSON.parse(args) | ||
expect(ev).equal('myEvent') | ||
expect(x).equal(1) | ||
expect(y).equal('2') | ||
done() | ||
} | ||
broker.subscribe(client, 'my-channel').then(() => { | ||
broker.publish('my-channel', 'myEvent', 1, '2') | ||
}) | ||
}) | ||
it('should use custom serialisation', function () { | ||
let serialize = (data) => Promise.try(() => msgpack.encode(data)) | ||
let deserialize = (data) => Promise.try(() => msgpack.decode(data)) | ||
broker = new EmitterPubsubBroker({connect, serialize, deserialize}) | ||
let client = new EventEmitter() | ||
return broker.subscribe(client, 'my-channel').then(() => { | ||
broker.publish('my-channel', 'myEvent', 1, '2') | ||
return eventToPromise(client, 'myEvent', {array: true}).then(args => { | ||
let [x, y] = args | ||
expect(x).equal(1) | ||
expect(y).equal('2') | ||
}) | ||
}) | ||
}) | ||
it('should prepend a channel argument', function () { | ||
@@ -87,4 +119,5 @@ broker = new EmitterPubsubBroker({connect, includeChannel: true}) | ||
eventToPromise(client1, 'myEvent').then(notReachable) | ||
return Promise.all([eventToPromise(client2, 'myEvent'), | ||
new Promise(resolve => setTimeout(resolve, 1000))]) | ||
return Promise.all([ | ||
eventToPromise(client2, 'myEvent'), | ||
new Promise(resolve => setTimeout(resolve, 1000))]) | ||
}) | ||
@@ -96,12 +129,28 @@ }) | ||
let client1 = new EventEmitter() | ||
return Promise.all([broker.subscribe(client1, 'my-channel'), | ||
broker.subscribe(client1, 'channel')]) | ||
return Promise.all([ | ||
broker.subscribe(client1, 'my-channel'), | ||
broker.subscribe(client1, 'channel')]) | ||
.then(() => { | ||
let subs = broker.getSubscriptions(client1) | ||
expect(subs).lengthOf(2) | ||
expect(subs).include('my-channel') | ||
expect(subs).include('channel') | ||
expect(subs.size).equal(2) | ||
expect(subs.has('my-channel')).true | ||
expect(subs.has('channel')).true | ||
}) | ||
}) | ||
it('should get clients set', function () { | ||
broker = new EmitterPubsubBroker(connect) | ||
let client1 = new EventEmitter() | ||
let client2 = new EventEmitter() | ||
return Promise.all([ | ||
broker.subscribe(client1, 'channel'), | ||
broker.subscribe(client2, 'channel')]) | ||
.then(() => { | ||
let subs = broker.getClients('channel') | ||
expect(subs.size).equal(2) | ||
expect(subs.has(client1)).true | ||
expect(subs.has(client2)).true | ||
}) | ||
}) | ||
it('should unsubscribe all', function () { | ||
@@ -112,5 +161,6 @@ this.timeout(4000) | ||
let client1 = new EventEmitter() | ||
return Promise.all([broker.subscribe(client1, 'my-channel'), | ||
broker.subscribe(client1, 'channel')]) | ||
.then(() => broker.unsubscribeall(client1)) | ||
return Promise.all([ | ||
broker.subscribe(client1, 'my-channel'), | ||
broker.subscribe(client1, 'channel')]) | ||
.then(() => broker.unsubscribeAll(client1)) | ||
.then(() => { | ||
@@ -130,8 +180,8 @@ expect(broker.getSubscriptions(client1)).empty | ||
it('should handle non-existent unsubscribeall', function () { | ||
it('should handle non-existent unsubscribeAll', function () { | ||
broker = new EmitterPubsubBroker(connect) | ||
let client1 = new EventEmitter() | ||
return broker.unsubscribeall(client1) | ||
return broker.unsubscribeAll(client1) | ||
}) | ||
})) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
59703
912