🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
DemoInstallSign in
Socket

aedes-cached-persistence

Package Overview
Dependencies
Maintainers
1
Versions
18
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

aedes-cached-persistence - npm Package Compare versions

Comparing version

to
6.0.0

39

index.js
'use strict'
var Qlobber = require('qlobber').Qlobber
var QlobberSub = require('qlobber/aedes/qlobber-sub')
var Packet = require('aedes-packet')

@@ -29,3 +29,3 @@ var EE = require('events').EventEmitter

this._parallel = parallel()
this._matcher = new Qlobber(QlobberOpts)
this._trie = new QlobberSub(QlobberOpts)
this._waiting = {}

@@ -46,10 +46,9 @@

if (packet.topic === newSubTopic) {
if (!checkSubsForClient(sub, that._matcher.match(sub.topic))) {
that._matcher.add(sub.topic, sub)
if (sub.qos > 0) {
that._trie.add(sub.topic, sub)
} else {
that._trie.remove(sub.topic, sub)
}
} else if (packet.topic === rmSubTopic) {
that._matcher
.match(sub.topic)
.filter(matching, sub)
.forEach(rmSub, that._matcher)
that._trie.remove(sub.topic, sub)
}

@@ -67,10 +66,2 @@ }

function matching (sub) {
return sub.topic === this.topic && sub.clientId === this.clientId
}
function rmSub (sub) {
this.remove(sub.topic, sub)
}
inherits(CachedPersistence, EE)

@@ -99,3 +90,2 @@

subs = subs.filter(qosGreaterThanOne)
if (subs.length === 0) {

@@ -120,6 +110,2 @@ return cb(null, client)

function qosGreaterThanOne (sub) {
return sub.qos > 0
}
function brokerPublish (subs, cb) {

@@ -173,3 +159,3 @@ var encoded = JSON.stringify({clientId: this.client.id, subs: subs})

cb(null, this._matcher.match(topic))
cb(null, this._trie.match(topic))
}

@@ -234,12 +220,3 @@

function checkSubsForClient (sub, savedSubs) {
for (var i = 0; i < savedSubs.length; i++) {
if (sub.topic === savedSubs[i].topic && sub.clientId === savedSubs[i].clientId) {
return true
}
}
return false
}
module.exports = CachedPersistence
module.exports.Packet = Packet
{
"name": "aedes-cached-persistence",
"version": "5.1.0",
"version": "6.0.0",
"description": "Abstract class to write an Aedes persistence with in-process caching of subscriptions",

@@ -29,4 +29,4 @@ "main": "index.js",

"mqemitter": "^2.0.0",
"standard": "^10.0.0",
"tape": "^4.7.0",
"standard": "^10.0.3",
"tape": "^4.8.0",
"through2": "^2.0.1"

@@ -36,7 +36,7 @@ },

"aedes-packet": "^1.0.0",
"aedes-persistence": "^5.0.2",
"aedes-persistence": "^5.1.1",
"fastparallel": "^2.3.0",
"multistream": "^2.1.0",
"qlobber": "^0.8.0"
"qlobber": "^1.8.0"
}
}

@@ -16,4 +16,3 @@ 'use strict'

this._retained = []
this._subscriptions = []
this._subscriptionsCount = 0
this._subscriptions = new Map()
this._clientsCount = 0

@@ -41,8 +40,7 @@ this._outgoing = {}

MyPersistence.prototype.addSubscriptions = function (client, subs, cb) {
var that = this
var stored = this._subscriptions[client.id]
var stored = this._subscriptions.get(client.id)
if (!stored) {
stored = []
this._subscriptions[client.id] = stored
stored = new Map()
this._subscriptions.set(client.id, stored)
this._clientsCount++

@@ -52,2 +50,3 @@ }

var subsObjs = subs.map(function mapSub (sub) {
stored.set(sub.topic, sub.qos)
return {

@@ -60,16 +59,2 @@ clientId: client.id,

subsObjs.forEach(function eachSub (sub) {
if (sub.qos > 0) {
if (!checkSubsForClient(sub, that._matcher.match(sub.topic))) {
that._subscriptionsCount++
that._matcher.add(sub.topic, sub)
stored.push(sub)
}
} else {
if (!checkSubsForClient(sub, that._subscriptions[client.id])) {
stored.push(sub)
}
}
})
this._addedSubscriptions(client, subsObjs, cb)

@@ -79,27 +64,25 @@ }

MyPersistence.prototype.removeSubscriptions = function (client, subs, cb) {
var that = this
var stored = this._subscriptions[client.id]
var stored = this._subscriptions.get(client.id)
var removed = []
if (!stored) {
stored = []
this._subscriptions[client.id] = stored
}
if (stored) {
for (var i = 0; i < subs.length; i += 1) {
var topic = subs[i]
var qos = stored.get(topic)
if (qos !== undefined) {
if (qos > 0) {
removed.push({
clientId: client.id,
topic: topic,
qos: qos
})
}
stored.delete(topic)
}
}
this._subscriptions[client.id] = stored.filter(function noSub (storedSub) {
var toKeep = subs.indexOf(storedSub.topic) < 0
if (!toKeep) {
that._subscriptionsCount--
removed.push({
clientId: client.id,
topic: storedSub.topic,
qos: storedSub.qos
})
if (stored.size === 0) {
this._clientsCount--
this._subscriptions.delete(client.id)
}
return toKeep
})
if (this._subscriptions[client.id].length === 0) {
this._clientsCount--
delete this._subscriptions[client.id]
}

@@ -110,11 +93,2 @@

function checkSubsForClient (sub, savedSubs) {
for (var i = 0; i < savedSubs.length; i++) {
if (sub.topic === savedSubs[i].topic && sub.clientId === savedSubs[i].clientId) {
return true
}
}
return false
}
abs({

@@ -121,0 +95,0 @@ test: test,