aedes-cached-persistence
Advanced tools
Comparing version
39
index.js
'use strict' | ||
var Qlobber = require('qlobber').Qlobber | ||
var QlobberSub = require('qlobber/aedes/qlobber-sub') | ||
var Packet = require('aedes-packet') | ||
@@ -29,3 +29,3 @@ var EE = require('events').EventEmitter | ||
this._parallel = parallel() | ||
this._matcher = new Qlobber(QlobberOpts) | ||
this._trie = new QlobberSub(QlobberOpts) | ||
this._waiting = {} | ||
@@ -46,10 +46,9 @@ | ||
if (packet.topic === newSubTopic) { | ||
if (!checkSubsForClient(sub, that._matcher.match(sub.topic))) { | ||
that._matcher.add(sub.topic, sub) | ||
if (sub.qos > 0) { | ||
that._trie.add(sub.topic, sub) | ||
} else { | ||
that._trie.remove(sub.topic, sub) | ||
} | ||
} else if (packet.topic === rmSubTopic) { | ||
that._matcher | ||
.match(sub.topic) | ||
.filter(matching, sub) | ||
.forEach(rmSub, that._matcher) | ||
that._trie.remove(sub.topic, sub) | ||
} | ||
@@ -67,10 +66,2 @@ } | ||
function matching (sub) { | ||
return sub.topic === this.topic && sub.clientId === this.clientId | ||
} | ||
function rmSub (sub) { | ||
this.remove(sub.topic, sub) | ||
} | ||
inherits(CachedPersistence, EE) | ||
@@ -99,3 +90,2 @@ | ||
subs = subs.filter(qosGreaterThanOne) | ||
if (subs.length === 0) { | ||
@@ -120,6 +110,2 @@ return cb(null, client) | ||
function qosGreaterThanOne (sub) { | ||
return sub.qos > 0 | ||
} | ||
function brokerPublish (subs, cb) { | ||
@@ -173,3 +159,3 @@ var encoded = JSON.stringify({clientId: this.client.id, subs: subs}) | ||
cb(null, this._matcher.match(topic)) | ||
cb(null, this._trie.match(topic)) | ||
} | ||
@@ -234,12 +220,3 @@ | ||
function checkSubsForClient (sub, savedSubs) { | ||
for (var i = 0; i < savedSubs.length; i++) { | ||
if (sub.topic === savedSubs[i].topic && sub.clientId === savedSubs[i].clientId) { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
module.exports = CachedPersistence | ||
module.exports.Packet = Packet |
{ | ||
"name": "aedes-cached-persistence", | ||
"version": "5.1.0", | ||
"version": "6.0.0", | ||
"description": "Abstract class to write an Aedes persistence with in-process caching of subscriptions", | ||
@@ -29,4 +29,4 @@ "main": "index.js", | ||
"mqemitter": "^2.0.0", | ||
"standard": "^10.0.0", | ||
"tape": "^4.7.0", | ||
"standard": "^10.0.3", | ||
"tape": "^4.8.0", | ||
"through2": "^2.0.1" | ||
@@ -36,7 +36,7 @@ }, | ||
"aedes-packet": "^1.0.0", | ||
"aedes-persistence": "^5.0.2", | ||
"aedes-persistence": "^5.1.1", | ||
"fastparallel": "^2.3.0", | ||
"multistream": "^2.1.0", | ||
"qlobber": "^0.8.0" | ||
"qlobber": "^1.8.0" | ||
} | ||
} |
74
test.js
@@ -16,4 +16,3 @@ 'use strict' | ||
this._retained = [] | ||
this._subscriptions = [] | ||
this._subscriptionsCount = 0 | ||
this._subscriptions = new Map() | ||
this._clientsCount = 0 | ||
@@ -41,8 +40,7 @@ this._outgoing = {} | ||
MyPersistence.prototype.addSubscriptions = function (client, subs, cb) { | ||
var that = this | ||
var stored = this._subscriptions[client.id] | ||
var stored = this._subscriptions.get(client.id) | ||
if (!stored) { | ||
stored = [] | ||
this._subscriptions[client.id] = stored | ||
stored = new Map() | ||
this._subscriptions.set(client.id, stored) | ||
this._clientsCount++ | ||
@@ -52,2 +50,3 @@ } | ||
var subsObjs = subs.map(function mapSub (sub) { | ||
stored.set(sub.topic, sub.qos) | ||
return { | ||
@@ -60,16 +59,2 @@ clientId: client.id, | ||
subsObjs.forEach(function eachSub (sub) { | ||
if (sub.qos > 0) { | ||
if (!checkSubsForClient(sub, that._matcher.match(sub.topic))) { | ||
that._subscriptionsCount++ | ||
that._matcher.add(sub.topic, sub) | ||
stored.push(sub) | ||
} | ||
} else { | ||
if (!checkSubsForClient(sub, that._subscriptions[client.id])) { | ||
stored.push(sub) | ||
} | ||
} | ||
}) | ||
this._addedSubscriptions(client, subsObjs, cb) | ||
@@ -79,27 +64,25 @@ } | ||
MyPersistence.prototype.removeSubscriptions = function (client, subs, cb) { | ||
var that = this | ||
var stored = this._subscriptions[client.id] | ||
var stored = this._subscriptions.get(client.id) | ||
var removed = [] | ||
if (!stored) { | ||
stored = [] | ||
this._subscriptions[client.id] = stored | ||
} | ||
if (stored) { | ||
for (var i = 0; i < subs.length; i += 1) { | ||
var topic = subs[i] | ||
var qos = stored.get(topic) | ||
if (qos !== undefined) { | ||
if (qos > 0) { | ||
removed.push({ | ||
clientId: client.id, | ||
topic: topic, | ||
qos: qos | ||
}) | ||
} | ||
stored.delete(topic) | ||
} | ||
} | ||
this._subscriptions[client.id] = stored.filter(function noSub (storedSub) { | ||
var toKeep = subs.indexOf(storedSub.topic) < 0 | ||
if (!toKeep) { | ||
that._subscriptionsCount-- | ||
removed.push({ | ||
clientId: client.id, | ||
topic: storedSub.topic, | ||
qos: storedSub.qos | ||
}) | ||
if (stored.size === 0) { | ||
this._clientsCount-- | ||
this._subscriptions.delete(client.id) | ||
} | ||
return toKeep | ||
}) | ||
if (this._subscriptions[client.id].length === 0) { | ||
this._clientsCount-- | ||
delete this._subscriptions[client.id] | ||
} | ||
@@ -110,11 +93,2 @@ | ||
function checkSubsForClient (sub, savedSubs) { | ||
for (var i = 0; i < savedSubs.length; i++) { | ||
if (sub.topic === savedSubs[i].topic && sub.clientId === savedSubs[i].clientId) { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
abs({ | ||
@@ -121,0 +95,0 @@ test: test, |
13072
-8.54%263
-13.77%- Removed
Updated
Updated