Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

aedes-persistence

Package Overview
Dependencies
Maintainers
4
Versions
44
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

aedes-persistence - npm Package Compare versions

Comparing version 8.1.3 to 9.0.0

490

abstract.js

@@ -1,6 +0,2 @@

'use strict'
const concat = require('concat-stream')
const pump = require('pump')
const through = require('through2')
const { Readable } = require('stream')
const Packet = require('aedes-packet')

@@ -10,3 +6,3 @@

const test = opts.test
var _persistence = opts.persistence
let _persistence = opts.persistence
const waitForReady = opts.waitForReady

@@ -35,3 +31,3 @@

_persistence(function (err, instance) {
_persistence((err, instance) => {
if (instance) {

@@ -44,3 +40,3 @@ // Wait for ready event, if applicable, to ensure the persistence isn't

// can result in 'ready' being emitted.
instance.on('ready', function () {
instance.on('ready', () => {
instance.removeListener('error', cb)

@@ -61,2 +57,25 @@ cb(null, instance)

// legacy third party streams are typically not iterable
function iterableStream (stream) {
if (typeof stream[Symbol.iterator] !== 'function') {
return new Readable({ objectMode: true }).wrap(stream)
}
return stream
}
// end of legacy third party streams support
async function getArrayFromStream (stream) {
const list = []
for await (const item of iterableStream(stream)) {
list.push(item)
}
return list
}
async function streamForEach (stream, fn) {
for await (const item of iterableStream(stream)) {
fn(item)
}
}
function storeRetained (instance, opts, cb) {

@@ -74,3 +93,3 @@ opts = opts || {}

instance.storeRetained(packet, function (err) {
instance.storeRetained(packet, err => {
cb(err, packet)

@@ -81,8 +100,8 @@ })

function matchRetainedWithPattern (t, pattern, opts) {
persistence(function (err, instance) {
persistence((err, instance) => {
if (err) { throw err }
storeRetained(instance, opts, function (err, packet) {
storeRetained(instance, opts, (err, packet) => {
t.notOk(err, 'no error')
var stream
let stream
if (Array.isArray(pattern)) {

@@ -94,6 +113,6 @@ stream = instance.createRetainedStreamCombi(pattern)

stream.pipe(concat(function (list) {
getArrayFromStream(stream).then(list => {
t.deepEqual(list, [packet], 'must return the packet')
instance.destroy(t.end.bind(t))
}))
})
})

@@ -104,4 +123,4 @@ })

function testInstance (title, cb) {
test(title, function (t) {
persistence(function (err, instance) {
test(title, t => {
persistence((err, instance) => {
if (err) { throw err }

@@ -119,25 +138,25 @@ cb(t, instance)

test('store and look up retained messages', function (t) {
test('store and look up retained messages', t => {
matchRetainedWithPattern(t, 'hello/world')
})
test('look up retained messages with a # pattern', function (t) {
test('look up retained messages with a # pattern', t => {
matchRetainedWithPattern(t, '#')
})
test('look up retained messages with a hello/world/# pattern', function (t) {
test('look up retained messages with a hello/world/# pattern', t => {
matchRetainedWithPattern(t, 'hello/world/#')
})
test('look up retained messages with a + pattern', function (t) {
test('look up retained messages with a + pattern', t => {
matchRetainedWithPattern(t, 'hello/+')
})
test('look up retained messages with multiple patterns', function (t) {
test('look up retained messages with multiple patterns', t => {
matchRetainedWithPattern(t, ['hello/+', 'other/hello'])
})
testInstance('store multiple retained messages in order', function (t, instance) {
testInstance('store multiple retained messages in order', (t, instance) => {
const totalMessages = 1000
var done = 0
let done = 0

@@ -155,3 +174,3 @@ const retained = {

instance.storeRetained(packet, function (err) {
instance.storeRetained(packet, err => {
t.notOk(err, 'no error')

@@ -170,16 +189,15 @@ t.equal(packet.brokerCounter, index + 1, 'packet stored in order')

testInstance('remove retained message', function (t, instance) {
storeRetained(instance, {}, function (err, packet) {
testInstance('remove retained message', (t, instance) => {
storeRetained(instance, {}, (err, packet) => {
t.notOk(err, 'no error')
storeRetained(instance, {
payload: Buffer.alloc(0)
}, function (err) {
}, err => {
t.notOk(err, 'no error')
const stream = instance.createRetainedStream('#')
stream.pipe(concat(function (list) {
getArrayFromStream(stream).then(list => {
t.deepEqual(list, [], 'must return an empty list')
instance.destroy(t.end.bind(t))
}))
})
})

@@ -189,8 +207,8 @@ })

testInstance('storing twice a retained message should keep only the last', function (t, instance) {
storeRetained(instance, {}, function (err, packet) {
testInstance('storing twice a retained message should keep only the last', (t, instance) => {
storeRetained(instance, {}, (err, packet) => {
t.notOk(err, 'no error')
storeRetained(instance, {
payload: Buffer.from('ahah')
}, function (err, packet) {
}, (err, packet) => {
t.notOk(err, 'no error')

@@ -200,6 +218,6 @@

stream.pipe(concat(function (list) {
getArrayFromStream(stream).then(list => {
t.deepEqual(list, [packet], 'must return the last packet')
instance.destroy(t.end.bind(t))
}))
})
})

@@ -209,3 +227,3 @@ })

testInstance('Create a new packet while storing a retained message', function (t, instance) {
testInstance('Create a new packet while storing a retained message', (t, instance) => {
const packet = {

@@ -221,3 +239,3 @@ cmd: 'publish',

instance.storeRetained(packet, function (err) {
instance.storeRetained(packet, err => {
t.notOk(err, 'no error')

@@ -228,10 +246,10 @@ // packet reference change to check if a new packet is stored always

stream.pipe(concat(function (list) {
getArrayFromStream(stream).then(list => {
t.deepEqual(list, [newPacket], 'must return the last packet')
instance.destroy(t.end.bind(t))
}))
})
})
})
testInstance('store and look up subscriptions by client', function (t, instance) {
testInstance('store and look up subscriptions by client', (t, instance) => {
const client = { id: 'abcde' }

@@ -249,6 +267,6 @@ const subs = [{

instance.addSubscriptions(client, subs, function (err, reClient) {
instance.addSubscriptions(client, subs, (err, reClient) => {
t.equal(reClient, client, 'client must be the same')
t.notOk(err, 'no error')
instance.subscriptionsByClient(client, function (err, resubs, reReClient) {
instance.subscriptionsByClient(client, (err, resubs, reReClient) => {
t.equal(reReClient, client, 'client must be the same')

@@ -262,3 +280,3 @@ t.notOk(err, 'no error')

testInstance('remove subscriptions by client', function (t, instance) {
testInstance('remove subscriptions by client', (t, instance) => {
const client = { id: 'abcde' }

@@ -273,8 +291,8 @@ const subs = [{

instance.addSubscriptions(client, subs, function (err, reClient) {
instance.addSubscriptions(client, subs, (err, reClient) => {
t.notOk(err, 'no error')
instance.removeSubscriptions(client, ['hello'], function (err, reClient) {
instance.removeSubscriptions(client, ['hello'], (err, reClient) => {
t.notOk(err, 'no error')
t.equal(reClient, client, 'client must be the same')
instance.subscriptionsByClient(client, function (err, resubs, reClient) {
instance.subscriptionsByClient(client, (err, resubs, reClient) => {
t.equal(reClient, client, 'client must be the same')

@@ -292,3 +310,3 @@ t.notOk(err, 'no error')

testInstance('store and look up subscriptions by topic', function (t, instance) {
testInstance('store and look up subscriptions by topic', (t, instance) => {
const client = { id: 'abcde' }

@@ -306,5 +324,5 @@ const subs = [{

instance.addSubscriptions(client, subs, function (err) {
instance.addSubscriptions(client, subs, err => {
t.notOk(err, 'no error')
instance.subscriptionsByTopic('hello', function (err, resubs) {
instance.subscriptionsByTopic('hello', (err, resubs) => {
t.notOk(err, 'no error')

@@ -325,3 +343,3 @@ t.deepEqual(resubs, [{

testInstance('get client list after subscriptions', function (t, instance) {
testInstance('get client list after subscriptions', (t, instance) => {
const client1 = { id: 'abcde' }

@@ -334,11 +352,11 @@ const client2 = { id: 'efghi' }

instance.addSubscriptions(client1, subs, function (err) {
instance.addSubscriptions(client1, subs, err => {
t.notOk(err, 'no error for client 1')
instance.addSubscriptions(client2, subs, function (err) {
instance.addSubscriptions(client2, subs, err => {
t.notOk(err, 'no error for client 2')
const stream = instance.getClientList(subs[0].topic)
stream.pipe(concat({ encoding: 'object' }, function (out) {
getArrayFromStream(stream).then(out => {
t.deepEqual(out, [client1.id, client2.id])
instance.destroy(t.end.bind(t))
}))
})
})

@@ -348,3 +366,3 @@ })

testInstance('get client list after an unsubscribe', function (t, instance) {
testInstance('get client list after an unsubscribe', (t, instance) => {
const client1 = { id: 'abcde' }

@@ -357,13 +375,13 @@ const client2 = { id: 'efghi' }

instance.addSubscriptions(client1, subs, function (err) {
instance.addSubscriptions(client1, subs, err => {
t.notOk(err, 'no error for client 1')
instance.addSubscriptions(client2, subs, function (err) {
instance.addSubscriptions(client2, subs, err => {
t.notOk(err, 'no error for client 2')
instance.removeSubscriptions(client2, [subs[0].topic], function (err, reClient) {
instance.removeSubscriptions(client2, [subs[0].topic], (err, reClient) => {
t.notOk(err, 'no error for removeSubscriptions')
const stream = instance.getClientList(subs[0].topic)
stream.pipe(concat({ encoding: 'object' }, function (out) {
getArrayFromStream(stream).then(out => {
t.deepEqual(out, [client1.id])
instance.destroy(t.end.bind(t))
}))
})
})

@@ -374,3 +392,3 @@ })

testInstance('get subscriptions list after an unsubscribe', function (t, instance) {
testInstance('get subscriptions list after an unsubscribe', (t, instance) => {
const client1 = { id: 'abcde' }

@@ -383,9 +401,9 @@ const client2 = { id: 'efghi' }

instance.addSubscriptions(client1, subs, function (err) {
instance.addSubscriptions(client1, subs, err => {
t.notOk(err, 'no error for client 1')
instance.addSubscriptions(client2, subs, function (err) {
instance.addSubscriptions(client2, subs, err => {
t.notOk(err, 'no error for client 2')
instance.removeSubscriptions(client2, [subs[0].topic], function (err, reClient) {
instance.removeSubscriptions(client2, [subs[0].topic], (err, reClient) => {
t.notOk(err, 'no error for removeSubscriptions')
instance.subscriptionsByTopic(subs[0].topic, function (err, clients) {
instance.subscriptionsByTopic(subs[0].topic, (err, clients) => {
t.notOk(err, 'no error getting subscriptions by topic')

@@ -400,3 +418,3 @@ t.deepEqual(clients[0].clientId, client1.id)

testInstance('QoS 0 subscriptions, restored but not matched', function (t, instance) {
testInstance('QoS 0 subscriptions, restored but not matched', (t, instance) => {
const client = { id: 'abcde' }

@@ -414,8 +432,8 @@ const subs = [{

instance.addSubscriptions(client, subs, function (err) {
instance.addSubscriptions(client, subs, err => {
t.notOk(err, 'no error')
instance.subscriptionsByClient(client, function (err, resubs) {
instance.subscriptionsByClient(client, (err, resubs) => {
t.notOk(err, 'no error')
t.deepEqual(resubs, subs)
instance.subscriptionsByTopic('hello', function (err, resubs2) {
instance.subscriptionsByTopic('hello', (err, resubs2) => {
t.notOk(err, 'no error')

@@ -433,3 +451,3 @@ t.deepEqual(resubs2, [{

testInstance('clean subscriptions', function (t, instance) {
testInstance('clean subscriptions', (t, instance) => {
const client = { id: 'abcde' }

@@ -444,15 +462,15 @@ const subs = [{

instance.addSubscriptions(client, subs, function (err) {
instance.addSubscriptions(client, subs, err => {
t.notOk(err, 'no error')
instance.cleanSubscriptions(client, function (err) {
instance.cleanSubscriptions(client, err => {
t.notOk(err, 'no error')
instance.subscriptionsByTopic('hello', function (err, resubs) {
instance.subscriptionsByTopic('hello', (err, resubs) => {
t.notOk(err, 'no error')
t.deepEqual(resubs, [], 'no subscriptions')
instance.subscriptionsByClient(client, function (err, resubs) {
instance.subscriptionsByClient(client, (err, resubs) => {
t.error(err)
t.deepEqual(resubs, null, 'no subscriptions')
instance.countOffline(function (err, subsCount, clientsCount) {
instance.countOffline((err, subsCount, clientsCount) => {
t.error(err, 'no error')

@@ -470,16 +488,16 @@ t.equal(subsCount, 0, 'no subscriptions added')

testInstance('clean subscriptions with no active subscriptions', function (t, instance) {
testInstance('clean subscriptions with no active subscriptions', (t, instance) => {
const client = { id: 'abcde' }
instance.cleanSubscriptions(client, function (err) {
instance.cleanSubscriptions(client, err => {
t.notOk(err, 'no error')
instance.subscriptionsByTopic('hello', function (err, resubs) {
instance.subscriptionsByTopic('hello', (err, resubs) => {
t.notOk(err, 'no error')
t.deepEqual(resubs, [], 'no subscriptions')
instance.subscriptionsByClient(client, function (err, resubs) {
instance.subscriptionsByClient(client, (err, resubs) => {
t.error(err)
t.deepEqual(resubs, null, 'no subscriptions')
instance.countOffline(function (err, subsCount, clientsCount) {
instance.countOffline((err, subsCount, clientsCount) => {
t.error(err, 'no error')

@@ -496,3 +514,3 @@ t.equal(subsCount, 0, 'no subscriptions added')

testInstance('same topic, different QoS', function (t, instance) {
testInstance('same topic, different QoS', (t, instance) => {
const client = { id: 'abcde' }

@@ -507,7 +525,7 @@ const subs = [{

instance.addSubscriptions(client, subs, function (err, reClient) {
instance.addSubscriptions(client, subs, (err, reClient) => {
t.equal(reClient, client, 'client must be the same')
t.error(err, 'no error')
instance.subscriptionsByClient(client, function (err, subsForClient, client) {
instance.subscriptionsByClient(client, (err, subsForClient, client) => {
t.error(err, 'no error')

@@ -519,3 +537,3 @@ t.deepEqual(subsForClient, [{

instance.subscriptionsByTopic('hello', function (err, subsForTopic) {
instance.subscriptionsByTopic('hello', (err, subsForTopic) => {
t.error(err, 'no error')

@@ -528,3 +546,3 @@ t.deepEqual(subsForTopic, [{

instance.countOffline(function (err, subsCount, clientsCount) {
instance.countOffline((err, subsCount, clientsCount) => {
t.error(err, 'no error')

@@ -541,3 +559,3 @@ t.equal(subsCount, 1, 'one subscription added')

testInstance('replace subscriptions', function (t, instance) {
testInstance('replace subscriptions', (t, instance) => {
const client = { id: 'abcde' }

@@ -550,12 +568,12 @@ const topic = 'hello'

sub.qos = subByTopic.qos = qos
instance.addSubscriptions(client, [sub], function (err, reClient) {
instance.addSubscriptions(client, [sub], (err, reClient) => {
t.equal(reClient, client, 'client must be the same')
t.error(err, 'no error')
instance.subscriptionsByClient(client, function (err, subsForClient, client) {
instance.subscriptionsByClient(client, (err, subsForClient, client) => {
t.error(err, 'no error')
t.deepEqual(subsForClient, [sub])
instance.subscriptionsByTopic(topic, function (err, subsForTopic) {
instance.subscriptionsByTopic(topic, (err, subsForTopic) => {
t.error(err, 'no error')
t.deepEqual(subsForTopic, qos === 0 ? [] : [subByTopic])
instance.countOffline(function (err, subsCount, clientsCount) {
instance.countOffline((err, subsCount, clientsCount) => {
t.error(err, 'no error')

@@ -575,7 +593,7 @@ if (qos === 0) {

check(0, function () {
check(1, function () {
check(2, function () {
check(1, function () {
check(0, function () {
check(0, () => {
check(1, () => {
check(2, () => {
check(1, () => {
check(0, () => {
instance.destroy(t.end.bind(t))

@@ -589,3 +607,3 @@ })

testInstance('replace subscriptions in same call', function (t, instance) {
testInstance('replace subscriptions in same call', (t, instance) => {
const client = { id: 'abcde' }

@@ -600,12 +618,12 @@ const topic = 'hello'

]
instance.addSubscriptions(client, subs, function (err, reClient) {
instance.addSubscriptions(client, subs, (err, reClient) => {
t.equal(reClient, client, 'client must be the same')
t.error(err, 'no error')
instance.subscriptionsByClient(client, function (err, subsForClient, client) {
instance.subscriptionsByClient(client, (err, subsForClient, client) => {
t.error(err, 'no error')
t.deepEqual(subsForClient, [{ topic, qos: 0 }])
instance.subscriptionsByTopic(topic, function (err, subsForTopic) {
instance.subscriptionsByTopic(topic, (err, subsForTopic) => {
t.error(err, 'no error')
t.deepEqual(subsForTopic, [])
instance.countOffline(function (err, subsCount, clientsCount) {
instance.countOffline((err, subsCount, clientsCount) => {
t.error(err, 'no error')

@@ -621,3 +639,3 @@ t.equal(subsCount, 0, 'no subscriptions added')

testInstance('store and count subscriptions', function (t, instance) {
testInstance('store and count subscriptions', (t, instance) => {
const client = { id: 'abcde' }

@@ -635,7 +653,7 @@ const subs = [{

instance.addSubscriptions(client, subs, function (err, reClient) {
instance.addSubscriptions(client, subs, (err, reClient) => {
t.equal(reClient, client, 'client must be the same')
t.error(err, 'no error')
instance.countOffline(function (err, subsCount, clientsCount) {
instance.countOffline((err, subsCount, clientsCount) => {
t.error(err, 'no error')

@@ -645,6 +663,6 @@ t.equal(subsCount, 2, 'two subscriptions added')

instance.removeSubscriptions(client, ['hello'], function (err, reClient) {
instance.removeSubscriptions(client, ['hello'], (err, reClient) => {
t.error(err, 'no error')
instance.countOffline(function (err, subsCount, clientsCount) {
instance.countOffline((err, subsCount, clientsCount) => {
t.error(err, 'no error')

@@ -654,6 +672,6 @@ t.equal(subsCount, 1, 'one subscription added')

instance.removeSubscriptions(client, ['matteo'], function (err, reClient) {
instance.removeSubscriptions(client, ['matteo'], (err, reClient) => {
t.error(err, 'no error')
instance.countOffline(function (err, subsCount, clientsCount) {
instance.countOffline((err, subsCount, clientsCount) => {
t.error(err, 'no error')

@@ -663,6 +681,6 @@ t.equal(subsCount, 0, 'zero subscriptions added')

instance.removeSubscriptions(client, ['noqos'], function (err, reClient) {
instance.removeSubscriptions(client, ['noqos'], (err, reClient) => {
t.error(err, 'no error')
instance.countOffline(function (err, subsCount, clientsCount) {
instance.countOffline((err, subsCount, clientsCount) => {
t.error(err, 'no error')

@@ -672,6 +690,6 @@ t.equal(subsCount, 0, 'zero subscriptions added')

instance.removeSubscriptions(client, ['noqos'], function (err, reClient) {
instance.removeSubscriptions(client, ['noqos'], (err, reClient) => {
t.error(err, 'no error')
instance.countOffline(function (err, subsCount, clientsCount) {
instance.countOffline((err, subsCount, clientsCount) => {
t.error(err, 'no error')

@@ -694,3 +712,3 @@ t.equal(subsCount, 0, 'zero subscriptions added')

testInstance('count subscriptions with two clients', function (t, instance) {
testInstance('count subscriptions with two clients', (t, instance) => {
const client1 = { id: 'abcde' }

@@ -710,7 +728,7 @@ const client2 = { id: 'fghij' }

function remove (client, subs, expectedSubs, expectedClients, cb) {
instance.removeSubscriptions(client, subs, function (err, reClient) {
instance.removeSubscriptions(client, subs, (err, reClient) => {
t.error(err, 'no error')
t.equal(reClient, client, 'client must be the same')
instance.countOffline(function (err, subsCount, clientsCount) {
instance.countOffline((err, subsCount, clientsCount) => {
t.error(err, 'no error')

@@ -725,18 +743,18 @@ t.equal(subsCount, expectedSubs, 'subscriptions added')

instance.addSubscriptions(client1, subs, function (err, reClient) {
instance.addSubscriptions(client1, subs, (err, reClient) => {
t.equal(reClient, client1, 'client must be the same')
t.error(err, 'no error')
instance.addSubscriptions(client2, subs, function (err, reClient) {
instance.addSubscriptions(client2, subs, (err, reClient) => {
t.equal(reClient, client2, 'client must be the same')
t.error(err, 'no error')
remove(client1, ['foobar'], 4, 2, function () {
remove(client1, ['hello'], 3, 2, function () {
remove(client1, ['hello'], 3, 2, function () {
remove(client1, ['matteo'], 2, 2, function () {
remove(client1, ['noqos'], 2, 1, function () {
remove(client2, ['hello'], 1, 1, function () {
remove(client2, ['matteo'], 0, 1, function () {
remove(client2, ['noqos'], 0, 0, function () {
remove(client1, ['foobar'], 4, 2, () => {
remove(client1, ['hello'], 3, 2, () => {
remove(client1, ['hello'], 3, 2, () => {
remove(client1, ['matteo'], 2, 2, () => {
remove(client1, ['noqos'], 2, 1, () => {
remove(client2, ['hello'], 1, 1, () => {
remove(client2, ['matteo'], 0, 1, () => {
remove(client2, ['noqos'], 0, 0, () => {
instance.destroy(t.end.bind(t))

@@ -755,3 +773,3 @@ })

testInstance('add duplicate subs to persistence for qos > 0', function (t, instance) {
testInstance('add duplicate subs to persistence for qos > 0', (t, instance) => {
const client = { id: 'abcde' }

@@ -764,11 +782,11 @@ const topic = 'hello'

instance.addSubscriptions(client, subs, function (err, reClient) {
instance.addSubscriptions(client, subs, (err, reClient) => {
t.equal(reClient, client, 'client must be the same')
t.error(err, 'no error')
instance.addSubscriptions(client, subs, function (err, resCLient) {
instance.addSubscriptions(client, subs, (err, resCLient) => {
t.equal(resCLient, client, 'client must be the same')
t.error(err, 'no error')
subs[0].clientId = client.id
instance.subscriptionsByTopic(topic, function (err, subsForTopic) {
instance.subscriptionsByTopic(topic, (err, subsForTopic) => {
t.error(err, 'no error')

@@ -782,3 +800,3 @@ t.deepEqual(subsForTopic, subs)

testInstance('add duplicate subs to persistence for qos 0', function (t, instance) {
testInstance('add duplicate subs to persistence for qos 0', (t, instance) => {
const client = { id: 'abcde' }

@@ -791,10 +809,10 @@ const topic = 'hello'

instance.addSubscriptions(client, subs, function (err, reClient) {
instance.addSubscriptions(client, subs, (err, reClient) => {
t.equal(reClient, client, 'client must be the same')
t.error(err, 'no error')
instance.addSubscriptions(client, subs, function (err, resCLient) {
instance.addSubscriptions(client, subs, (err, resCLient) => {
t.equal(resCLient, client, 'client must be the same')
t.error(err, 'no error')
instance.subscriptionsByClient(client, function (err, subsForClient, client) {
instance.subscriptionsByClient(client, (err, subsForClient, client) => {
t.error(err, 'no error')

@@ -808,3 +826,3 @@ t.deepEqual(subsForClient, subs)

testInstance('get topic list after concurrent subscriptions of a client', function (t, instance) {
testInstance('get topic list after concurrent subscriptions of a client', (t, instance) => {
const client = { id: 'abcde' }

@@ -819,7 +837,7 @@ const subs1 = [{

}]
var calls = 2
let calls = 2
function done () {
if (!--calls) {
instance.subscriptionsByClient(client, function (err, resubs) {
instance.subscriptionsByClient(client, (err, resubs) => {
t.notOk(err, 'no error')

@@ -833,7 +851,7 @@ resubs.sort((a, b) => b.topic.localeCompare(b.topic, 'en'))

instance.addSubscriptions(client, subs1, function (err) {
instance.addSubscriptions(client, subs1, err => {
t.notOk(err, 'no error for hello1')
done()
})
instance.addSubscriptions(client, subs2, function (err) {
instance.addSubscriptions(client, subs2, err => {
t.notOk(err, 'no error for hello2')

@@ -844,3 +862,3 @@ done()

testInstance('add outgoing packet and stream it', function (t, instance) {
testInstance('add outgoing packet and stream it', (t, instance) => {
const sub = {

@@ -877,15 +895,15 @@ clientId: 'abcde',

instance.outgoingEnqueue(sub, packet, function (err) {
instance.outgoingEnqueue(sub, packet, err => {
t.error(err)
const stream = instance.outgoingStream(client)
stream.pipe(concat(function (list) {
var packet = list[0]
getArrayFromStream(stream).then(list => {
const packet = list[0]
testPacket(t, packet, expected)
instance.destroy(t.end.bind(t))
}))
})
})
})
testInstance('add outgoing packet for multiple subs and stream to all', function (t, instance) {
testInstance('add outgoing packet for multiple subs and stream to all', (t, instance) => {
const sub = {

@@ -931,20 +949,20 @@ clientId: 'abcde',

instance.outgoingEnqueueCombi(subs, packet, function (err) {
instance.outgoingEnqueueCombi(subs, packet, err => {
t.error(err)
const stream = instance.outgoingStream(client)
stream.pipe(concat(function (list) {
var packet = list[0]
getArrayFromStream(stream).then(list => {
const packet = list[0]
testPacket(t, packet, expected)
const stream2 = instance.outgoingStream(client2)
stream2.pipe(concat(function (list) {
var packet = list[0]
getArrayFromStream(stream2).then(list2 => {
const packet = list2[0]
testPacket(t, packet, expected)
instance.destroy(t.end.bind(t))
}))
}))
})
})
})
})
testInstance('add outgoing packet as a string and pump', function (t, instance) {
testInstance('add outgoing packet as a string and pump', (t, instance) => {
const sub = {

@@ -977,13 +995,14 @@ clientId: 'abcde',

const queue = []
enqueueAndUpdate(t, instance, client, sub, packet1, 42, function (updated1) {
enqueueAndUpdate(t, instance, client, sub, packet2, 43, function (updated2) {
enqueueAndUpdate(t, instance, client, sub, packet1, 42, updated1 => {
enqueueAndUpdate(t, instance, client, sub, packet2, 43, updated2 => {
const stream = instance.outgoingStream(client)
pump(stream, through.obj(function clearQueue (data, enc, next) {
function clearQueue (data) {
instance.outgoingUpdate(client, data,
function (err, client, packet) {
(err, client, packet) => {
t.notOk(err, 'no error')
queue.push(packet)
next()
})
}), function done () {
}
streamForEach(stream, clearQueue).then(function done () {
t.equal(queue.length, 2)

@@ -1000,3 +1019,3 @@ if (queue.length === 2) {

testInstance('add outgoing packet as a string and stream', function (t, instance) {
testInstance('add outgoing packet as a string and stream', (t, instance) => {
const sub = {

@@ -1033,15 +1052,15 @@ clientId: 'abcde',

instance.outgoingEnqueueCombi([sub], packet, function (err) {
instance.outgoingEnqueueCombi([sub], packet, err => {
t.error(err)
const stream = instance.outgoingStream(client)
stream.pipe(concat(function (list) {
var packet = list[0]
getArrayFromStream(stream).then(list => {
const packet = list[0]
testPacket(t, packet, expected)
instance.destroy(t.end.bind(t))
}))
})
})
})
testInstance('add outgoing packet and stream it twice', function (t, instance) {
testInstance('add outgoing packet and stream it twice', (t, instance) => {
const sub = {

@@ -1079,19 +1098,19 @@ clientId: 'abcde',

instance.outgoingEnqueueCombi([sub], packet, function (err) {
instance.outgoingEnqueueCombi([sub], packet, err => {
t.error(err)
const stream = instance.outgoingStream(client)
stream.pipe(concat(function (list) {
var packet = list[0]
getArrayFromStream(stream).then(list => {
const packet = list[0]
testPacket(t, packet, expected)
const stream = instance.outgoingStream(client)
const stream2 = instance.outgoingStream(client)
stream.pipe(concat(function (list) {
var packet = list[0]
getArrayFromStream(stream2).then(list2 => {
const packet = list2[0]
testPacket(t, packet, expected)
t.notEqual(packet, expected, 'packet must be a different object')
instance.destroy(t.end.bind(t))
}))
}))
})
})
})

@@ -1101,3 +1120,3 @@ })

function enqueueAndUpdate (t, instance, client, sub, packet, messageId, callback) {
instance.outgoingEnqueueCombi([sub], packet, function (err) {
instance.outgoingEnqueueCombi([sub], packet, err => {
t.error(err)

@@ -1107,3 +1126,3 @@ const updated = new Packet(packet)

instance.outgoingUpdate(client, updated, function (err, reclient, repacket) {
instance.outgoingUpdate(client, updated, (err, reclient, repacket) => {
t.error(err)

@@ -1117,3 +1136,3 @@ t.equal(reclient, client, 'client matches')

testInstance('add outgoing packet and update messageId', function (t, instance) {
testInstance('add outgoing packet and update messageId', (t, instance) => {
const sub = {

@@ -1137,6 +1156,6 @@ clientId: 'abcde', topic: 'hello', qos: 1

enqueueAndUpdate(t, instance, client, sub, packet, 42, function (updated) {
enqueueAndUpdate(t, instance, client, sub, packet, 42, updated => {
const stream = instance.outgoingStream(client)
delete updated.messageId
stream.pipe(concat(function (list) {
getArrayFromStream(stream).then(list => {
delete list[0].messageId

@@ -1146,7 +1165,7 @@ t.notEqual(list[0], updated, 'must not be the same object')

instance.destroy(t.end.bind(t))
}))
})
})
})
testInstance('add 2 outgoing packet and clear messageId', function (t, instance) {
testInstance('add 2 outgoing packet and clear messageId', (t, instance) => {
const sub = {

@@ -1181,5 +1200,5 @@ clientId: 'abcde', topic: 'hello', qos: 1

enqueueAndUpdate(t, instance, client, sub, packet1, 42, function (updated1) {
enqueueAndUpdate(t, instance, client, sub, packet2, 43, function (updated2) {
instance.outgoingClearMessageId(client, updated1, function (err, packet) {
enqueueAndUpdate(t, instance, client, sub, packet1, 42, updated1 => {
enqueueAndUpdate(t, instance, client, sub, packet2, 43, updated2 => {
instance.outgoingClearMessageId(client, updated1, (err, packet) => {
t.error(err)

@@ -1191,3 +1210,3 @@ t.deepEqual(packet.messageId, 42, 'must have the same messageId')

delete updated2.messageId
stream.pipe(concat(function (list) {
getArrayFromStream(stream).then(list => {
delete list[0].messageId

@@ -1197,3 +1216,3 @@ t.notEqual(list[0], updated2, 'must not be the same object')

instance.destroy(t.end.bind(t))
}))
})
})

@@ -1204,3 +1223,3 @@ })

testInstance('update to publish w/ same messageId', function (t, instance) {
testInstance('update to publish w/ same messageId', (t, instance) => {
const sub = {

@@ -1237,8 +1256,8 @@ clientId: 'abcde', topic: 'hello', qos: 1

instance.outgoingEnqueue(sub, packet1, function () {
instance.outgoingEnqueue(sub, packet2, function () {
instance.outgoingUpdate(client, packet1, function () {
instance.outgoingUpdate(client, packet2, function () {
instance.outgoingEnqueue(sub, packet1, () => {
instance.outgoingEnqueue(sub, packet2, () => {
instance.outgoingUpdate(client, packet1, () => {
instance.outgoingUpdate(client, packet2, () => {
const stream = instance.outgoingStream(client)
stream.pipe(concat(function (list) {
getArrayFromStream(stream).then(list => {
t.equal(list.length, 2, 'must have two items in queue')

@@ -1250,3 +1269,3 @@ t.equal(list[0].brokerCounter, packet1.brokerCounter, 'brokerCounter must match')

instance.destroy(t.end.bind(t))
}))
})
})

@@ -1258,3 +1277,3 @@ })

testInstance('update to pubrel', function (t, instance) {
testInstance('update to pubrel', (t, instance) => {
const sub = {

@@ -1278,3 +1297,3 @@ clientId: 'abcde', topic: 'hello', qos: 1

instance.outgoingEnqueueCombi([sub], packet, function (err) {
instance.outgoingEnqueueCombi([sub], packet, err => {
t.error(err)

@@ -1284,3 +1303,3 @@ const updated = new Packet(packet)

instance.outgoingUpdate(client, updated, function (err, reclient, repacket) {
instance.outgoingUpdate(client, updated, (err, reclient, repacket) => {
t.error(err)

@@ -1295,3 +1314,3 @@ t.equal(reclient, client, 'client matches')

instance.outgoingUpdate(client, pubrel, function (err) {
instance.outgoingUpdate(client, pubrel, err => {
t.error(err)

@@ -1301,6 +1320,6 @@

stream.pipe(concat(function (list) {
getArrayFromStream(stream).then(list => {
t.deepEqual(list, [pubrel], 'must return the packet')
instance.destroy(t.end.bind(t))
}))
})
})

@@ -1311,3 +1330,3 @@ })

testInstance('add incoming packet, get it, and clear with messageId', function (t, instance) {
testInstance('add incoming packet, get it, and clear with messageId', (t, instance) => {
const client = {

@@ -1327,3 +1346,3 @@ id: 'abcde'

instance.incomingStorePacket(client, packet, function (err) {
instance.incomingStorePacket(client, packet, err => {
t.error(err)

@@ -1333,3 +1352,3 @@

messageId: packet.messageId
}, function (err, retrieved) {
}, (err, retrieved) => {
t.error(err)

@@ -1345,3 +1364,3 @@

instance.incomingDelPacket(client, retrieved, function (err) {
instance.incomingDelPacket(client, retrieved, err => {
t.error(err)

@@ -1351,3 +1370,3 @@

messageId: packet.messageId
}, function (err, retrieved) {
}, (err, retrieved) => {
t.ok(err, 'must error')

@@ -1361,3 +1380,3 @@ instance.destroy(t.end.bind(t))

testInstance('store, fetch and delete will message', function (t, instance) {
testInstance('store, fetch and delete will message', (t, instance) => {
const client = {

@@ -1373,6 +1392,6 @@ id: '12345'

instance.putWill(client, expected, function (err, c) {
instance.putWill(client, expected, (err, c) => {
t.error(err, 'no error')
t.equal(c, client, 'client matches')
instance.getWill(client, function (err, packet, c) {
instance.getWill(client, (err, packet, c) => {
t.error(err, 'no error')

@@ -1382,7 +1401,7 @@ t.deepEqual(packet, expected, 'will matches')

client.brokerId = packet.brokerId
instance.delWill(client, function (err, packet, c) {
instance.delWill(client, (err, packet, c) => {
t.error(err, 'no error')
t.deepEqual(packet, expected, 'will matches')
t.equal(c, client, 'client matches')
instance.getWill(client, function (err, packet, c) {
instance.getWill(client, (err, packet, c) => {
t.error(err, 'no error')

@@ -1398,3 +1417,3 @@ t.notOk(packet, 'no will after del')

testInstance('stream all will messages', function (t, instance) {
testInstance('stream all will messages', (t, instance) => {
const client = {

@@ -1410,6 +1429,6 @@ id: '12345'

instance.putWill(client, toWrite, function (err, c) {
instance.putWill(client, toWrite, (err, c) => {
t.error(err, 'no error')
t.equal(c, client, 'client matches')
instance.streamWill().pipe(through.obj(function (chunk, enc, cb) {
streamForEach(instance.streamWill(), (chunk) => {
t.deepEqual(chunk, {

@@ -1423,13 +1442,11 @@ clientId: client.id,

}, 'packet matches')
cb()
client.brokerId = chunk.brokerId
instance.delWill(client, function (err, result, client) {
instance.delWill(client, (err, result, client) => {
t.error(err, 'no error')
instance.destroy(t.end.bind(t))
})
}))
})
})
})
testInstance('stream all will message for unknown brokers', function (t, instance) {
testInstance('stream all will message for unknown brokers', (t, instance) => {
const originalId = instance.broker.id

@@ -1455,28 +1472,25 @@ const client = {

instance.putWill(client, toWrite1, function (err, c) {
instance.putWill(client, toWrite1, (err, c) => {
t.error(err, 'no error')
t.equal(c, client, 'client matches')
instance.broker.id = 'anotherBroker'
instance.putWill(anotherClient, toWrite2, function (err, c) {
instance.putWill(anotherClient, toWrite2, (err, c) => {
t.error(err, 'no error')
t.equal(c, anotherClient, 'client matches')
instance.streamWill({
streamForEach(instance.streamWill({
anotherBroker: Date.now()
}), (chunk) => {
t.deepEqual(chunk, {
clientId: client.id,
brokerId: originalId,
topic: 'hello/died42',
payload: Buffer.from('muahahha'),
qos: 0,
retain: true
}, 'packet matches')
instance.delWill(client, (err, result, client) => {
t.error(err, 'no error')
instance.destroy(t.end.bind(t))
})
})
.pipe(through.obj(function (chunk, enc, cb) {
t.deepEqual(chunk, {
clientId: client.id,
brokerId: originalId,
topic: 'hello/died42',
payload: Buffer.from('muahahha'),
qos: 0,
retain: true
}, 'packet matches')
cb()
client.brokerId = chunk.brokerId
instance.delWill(client, function (err, result, client) {
t.error(err, 'no error')
instance.destroy(t.end.bind(t))
})
}))
})

@@ -1486,3 +1500,3 @@ })

testInstance('delete wills from dead brokers', function (t, instance) {
testInstance('delete wills from dead brokers', (t, instance) => {
const client = {

@@ -1499,3 +1513,3 @@ id: '42'

instance.putWill(client, toWrite1, function (err, c) {
instance.putWill(client, toWrite1, (err, c) => {
t.error(err, 'no error')

@@ -1505,3 +1519,3 @@ t.equal(c, client, 'client matches')

client.brokerId = instance.broker.id
instance.delWill(client, function (err, result, client) {
instance.delWill(client, (err, result, client) => {
t.error(err, 'no error')

@@ -1513,3 +1527,3 @@ instance.destroy(t.end.bind(t))

testInstance('do not error if unkown messageId in outoingClearMessageId', function (t, instance) {
testInstance('do not error if unkown messageId in outoingClearMessageId', (t, instance) => {
const client = {

@@ -1519,3 +1533,3 @@ id: 'abc-123'

instance.outgoingClearMessageId(client, 42, function (err) {
instance.outgoingClearMessageId(client, 42, err => {
t.error(err)

@@ -1522,0 +1536,0 @@ instance.destroy(t.end.bind(t))

{
"name": "aedes-persistence",
"version": "8.1.3",
"version": "9.0.0",
"description": "The spec for an Aedes persistence, with abstract tests and a fast in-memory implementation.",

@@ -60,25 +60,22 @@ "main": "persistence.js",

"engines": {
"node": ">=10"
"node": ">=14"
},
"devDependencies": {
"aedes": "^0.45.0",
"concat-stream": "^2.0.0",
"@types/node": "^17.0.24",
"aedes": "^0.46.3",
"faucet": "0.0.1",
"license-checker": "^25.0.1",
"mqemitter": "^4.4.0",
"mqemitter": "^4.5.0",
"nyc": "^15.1.0",
"pre-commit": "^1.2.2",
"pump": "^3.0.0",
"release-it": "^14.2.0",
"release-it": "^14.14.2",
"snazzy": "^9.0.0",
"standard": "^15.0.1",
"tape": "^5.2.1",
"through2": "^4.0.2",
"tsd": "^0.14.0"
"standard": "^16.0.4",
"tape": "^5.5.3",
"tsd": "^0.20.0"
},
"dependencies": {
"aedes-packet": "^2.3.1",
"from2": "^2.3.0",
"qlobber": "^5.0.3"
"qlobber": "^6.0.0"
}
}

@@ -1,4 +0,2 @@

'use strict'
const from2 = require('from2')
const { Readable } = require('stream')
const QlobberSub = require('qlobber/aedes/qlobber-sub')

@@ -12,346 +10,315 @@ const { QlobberTrue } = require('qlobber')

}
const CREATE_ON_EMPTY = true
function MemoryPersistence () {
if (!(this instanceof MemoryPersistence)) {
return new MemoryPersistence()
function * multiIterables (iterables) {
for (const iter of iterables) {
yield * iter
}
}
this._retained = []
// clientId -> topic -> qos
this._subscriptions = new Map()
this._clientsCount = 0
this._trie = new QlobberSub(QlobberOpts)
this._outgoing = {}
this._incoming = {}
this._wills = {}
function * retainedMessagesByPattern (retained, pattern) {
const qlobber = new QlobberTrue(QlobberOpts)
qlobber.add(pattern)
for (const [topic, packet] of retained) {
if (qlobber.test(topic)) {
yield packet
}
}
}
function matchTopic (p) {
return p.topic !== this.topic
function * willsByBrokers (wills, brokers) {
for (const will of wills.values()) {
if (!brokers[will.brokerId]) {
yield will
}
}
}
MemoryPersistence.prototype.storeRetained = function (packet, cb) {
packet = Object.assign({}, packet)
this._retained = this._retained.filter(matchTopic, packet)
if (packet.payload.length > 0) this._retained.push(packet)
cb(null)
function * clientListbyTopic (subscriptions, topic) {
for (const [clientId, topicMap] of subscriptions) {
if (topicMap.has(topic)) {
yield clientId
}
}
}
function matchingStream (current, pattern) {
const matcher = new QlobberTrue(QlobberOpts)
class MemoryPersistence {
constructor () {
// using Maps for convenience and security (risk on prototype polution)
// Map ( topic -> packet )
this._retained = new Map()
// Map ( clientId -> Map( topic -> qos ))
this._subscriptions = new Map()
// Map ( clientId > [ packet ] }
this._outgoing = new Map()
// Map ( clientId -> { packetId -> Packet } )
this._incoming = new Map()
// Map( clientId -> will )
this._wills = new Map()
this._clientsCount = 0
this._trie = new QlobberSub(QlobberOpts)
}
if (Array.isArray(pattern)) {
for (var i = 0; i < pattern.length; i += 1) {
matcher.add(pattern[i])
storeRetained (pkt, cb) {
const packet = Object.assign({}, pkt)
if (packet.payload.length === 0) {
this._retained.delete(packet.topic)
} else {
this._retained.set(packet.topic, packet)
}
} else {
matcher.add(pattern)
cb(null)
}
return from2.obj(function match (size, next) {
var entry
createRetainedStreamCombi (patterns) {
const iterables = patterns.map((p) => {
return retainedMessagesByPattern(this._retained, p)
})
return Readable.from(multiIterables(iterables))
}
while ((entry = current.shift()) != null) {
if (matcher.test(entry.topic)) {
setImmediate(next, null, entry)
return
createRetainedStream (pattern) {
return Readable.from(retainedMessagesByPattern(this._retained, pattern))
}
addSubscriptions (client, subs, cb) {
let stored = this._subscriptions.get(client.id)
const trie = this._trie
if (!stored) {
stored = new Map()
this._subscriptions.set(client.id, stored)
this._clientsCount++
}
for (const sub of subs) {
const qos = stored.get(sub.topic)
const hasQoSGreaterThanZero = (qos !== undefined) && (qos > 0)
if (sub.qos > 0) {
trie.add(sub.topic, {
clientId: client.id,
topic: sub.topic,
qos: sub.qos
})
} else if (hasQoSGreaterThanZero) {
trie.remove(sub.topic, {
clientId: client.id,
topic: sub.topic
})
}
stored.set(sub.topic, sub.qos)
}
if (!entry) this.push(null)
})
}
cb(null, client)
}
MemoryPersistence.prototype.createRetainedStream = function (pattern) {
return matchingStream([].concat(this._retained), pattern)
}
removeSubscriptions (client, subs, cb) {
const stored = this._subscriptions.get(client.id)
const trie = this._trie
MemoryPersistence.prototype.createRetainedStreamCombi = function (patterns) {
return matchingStream([].concat(this._retained), patterns)
}
if (stored) {
for (const topic of subs) {
const qos = stored.get(topic)
if (qos !== undefined) {
if (qos > 0) {
trie.remove(topic, { clientId: client.id, topic })
}
stored.delete(topic)
}
}
MemoryPersistence.prototype.addSubscriptions = function (client, subs, cb) {
var stored = this._subscriptions.get(client.id)
const trie = this._trie
if (stored.size === 0) {
this._clientsCount--
this._subscriptions.delete(client.id)
}
}
if (!stored) {
stored = new Map()
this._subscriptions.set(client.id, stored)
this._clientsCount++
cb(null, client)
}
for (var i = 0; i < subs.length; i += 1) {
const sub = subs[i]
const qos = stored.get(sub.topic)
const hasQoSGreaterThanZero = (qos !== undefined) && (qos > 0)
if (sub.qos > 0) {
trie.add(sub.topic, {
clientId: client.id,
topic: sub.topic,
qos: sub.qos
})
} else if (hasQoSGreaterThanZero) {
trie.remove(sub.topic, {
clientId: client.id,
topic: sub.topic
})
subscriptionsByClient (client, cb) {
let subs = null
const stored = this._subscriptions.get(client.id)
if (stored) {
subs = []
for (const topicAndQos of stored) {
subs.push({ topic: topicAndQos[0], qos: topicAndQos[1] })
}
}
stored.set(sub.topic, sub.qos)
cb(null, subs, client)
}
cb(null, client)
}
countOffline (cb) {
return cb(null, this._trie.subscriptionsCount, this._clientsCount)
}
MemoryPersistence.prototype.removeSubscriptions = function (client, subs, cb) {
const stored = this._subscriptions.get(client.id)
const trie = this._trie
subscriptionsByTopic (pattern, cb) {
cb(null, this._trie.match(pattern))
}
if (stored) {
for (var i = 0; i < subs.length; i += 1) {
const topic = subs[i]
const qos = stored.get(topic)
if (qos !== undefined) {
if (qos > 0) {
cleanSubscriptions (client, cb) {
const trie = this._trie
const stored = this._subscriptions.get(client.id)
if (stored) {
for (const topicAndQos of stored) {
if (topicAndQos[1] > 0) {
const topic = topicAndQos[0]
trie.remove(topic, { clientId: client.id, topic })
}
stored.delete(topic)
}
}
if (stored.size === 0) {
this._clientsCount--
this._subscriptions.delete(client.id)
}
cb(null, client)
}
cb(null, client)
}
outgoingEnqueue (sub, packet, cb) {
_outgoingEnqueue.call(this, sub, packet)
process.nextTick(cb)
}
MemoryPersistence.prototype.subscriptionsByClient = function (client, cb) {
var subs = null
const stored = this._subscriptions.get(client.id)
if (stored) {
subs = []
for (const topicAndQos of stored) {
subs.push({ topic: topicAndQos[0], qos: topicAndQos[1] })
outgoingEnqueueCombi (subs, packet, cb) {
for (let i = 0; i < subs.length; i++) {
_outgoingEnqueue.call(this, subs[i], packet)
}
process.nextTick(cb)
}
cb(null, subs, client)
}
MemoryPersistence.prototype.countOffline = function (cb) {
return cb(null, this._trie.subscriptionsCount, this._clientsCount)
}
outgoingUpdate (client, packet, cb) {
const outgoing = getMapRef(this._outgoing, client.id, [], CREATE_ON_EMPTY)
MemoryPersistence.prototype.subscriptionsByTopic = function (pattern, cb) {
cb(null, this._trie.match(pattern))
}
MemoryPersistence.prototype.cleanSubscriptions = function (client, cb) {
const trie = this._trie
const stored = this._subscriptions.get(client.id)
if (stored) {
for (const topicAndQos of stored) {
if (topicAndQos[1] > 0) {
const topic = topicAndQos[0]
trie.remove(topic, { clientId: client.id, topic })
let temp
for (let i = 0; i < outgoing.length; i++) {
temp = outgoing[i]
if (temp.brokerId === packet.brokerId) {
if (temp.brokerCounter === packet.brokerCounter) {
temp.messageId = packet.messageId
return cb(null, client, packet)
}
/*
Maximum of messageId (packet identifier) is 65535 and will be rotated,
brokerCounter is to ensure the packet identifier be unique.
The for loop is going to search which packet messageId should be updated
in the _outgoing queue.
If there is a case that brokerCounter is different but messageId is same,
we need to let the loop keep searching
*/
} else if (temp.messageId === packet.messageId) {
outgoing[i] = packet
return cb(null, client, packet)
}
}
this._clientsCount--
this._subscriptions.delete(client.id)
cb(new Error('no such packet'), client, packet)
}
cb(null, client)
}
outgoingClearMessageId (client, packet, cb) {
const outgoing = getMapRef(this._outgoing, client.id, [], CREATE_ON_EMPTY)
MemoryPersistence.prototype.outgoingEnqueue = function (sub, packet, cb) {
_outgoingEnqueue.call(this, sub, packet)
process.nextTick(cb)
}
let temp
for (let i = 0; i < outgoing.length; i++) {
temp = outgoing[i]
if (temp.messageId === packet.messageId) {
outgoing.splice(i, 1)
return cb(null, temp)
}
}
MemoryPersistence.prototype.outgoingEnqueueCombi = function (subs, packet, cb) {
for (var i = 0; i < subs.length; i++) {
_outgoingEnqueue.call(this, subs[i], packet)
cb()
}
process.nextTick(cb)
}
function _outgoingEnqueue (sub, packet) {
const id = sub.clientId
const queue = this._outgoing[id] || []
outgoingStream (client) {
return Readable.from(getMapRef(this._outgoing, client.id, []))
}
this._outgoing[id] = queue
const p = new Packet(packet)
queue[queue.length] = p
}
incomingStorePacket (client, packet, cb) {
const id = client.id
const store = getMapRef(this._incoming, id, {}, CREATE_ON_EMPTY)
MemoryPersistence.prototype.outgoingUpdate = function (client, packet, cb) {
const clientId = client.id
const outgoing = this._outgoing[clientId] || []
var temp
store[packet.messageId] = new Packet(packet)
store[packet.messageId].messageId = packet.messageId
this._outgoing[clientId] = outgoing
for (var i = 0; i < outgoing.length; i++) {
temp = outgoing[i]
if (temp.brokerId === packet.brokerId) {
if (temp.brokerCounter === packet.brokerCounter) {
temp.messageId = packet.messageId
return cb(null, client, packet)
}
/*
Maximum of messageId (packet identifier) is 65535 and will be rotated,
brokerCounter is to ensure the packet identifier be unique.
The for loop is going to search which packet messageId should be updated
in the _outgoing queue.
If there is a case that brokerCounter is different but messageId is same,
we need to let the loop keep searching
*/
} else if (temp.messageId === packet.messageId) {
outgoing[i] = packet
return cb(null, client, packet)
}
cb(null)
}
cb(new Error('no such packet'), client, packet)
}
incomingGetPacket (client, packet, cb) {
const id = client.id
const store = getMapRef(this._incoming, id, {})
let err = null
MemoryPersistence.prototype.outgoingClearMessageId = function (client, packet, cb) {
const clientId = client.id
const outgoing = this._outgoing[clientId] || []
var temp
this._incoming.set(id, store)
this._outgoing[clientId] = outgoing
if (!store[packet.messageId]) {
err = new Error('no such packet')
}
for (var i = 0; i < outgoing.length; i++) {
temp = outgoing[i]
if (temp.messageId === packet.messageId) {
outgoing.splice(i, 1)
return cb(null, temp)
}
cb(err, store[packet.messageId])
}
cb()
}
incomingDelPacket (client, packet, cb) {
const id = client.id
const store = getMapRef(this._incoming, id, {})
const toDelete = store[packet.messageId]
let err = null
MemoryPersistence.prototype.outgoingStream = function (client) {
const queue = [].concat(this._outgoing[client.id] || [])
return from2.obj(function match (size, next) {
var entry
if ((entry = queue.shift()) != null) {
setImmediate(next, null, entry)
return
if (!toDelete) {
err = new Error('no such packet')
} else {
delete store[packet.messageId]
}
if (!entry) this.push(null)
})
}
cb(err)
}
MemoryPersistence.prototype.incomingStorePacket = function (client, packet, cb) {
const id = client.id
const store = this._incoming[id] || {}
putWill (client, packet, cb) {
packet.brokerId = this.broker.id
packet.clientId = client.id
this._wills.set(client.id, packet)
cb(null, client)
}
this._incoming[id] = store
getWill (client, cb) {
cb(null, this._wills.get(client.id), client)
}
store[packet.messageId] = new Packet(packet)
store[packet.messageId].messageId = packet.messageId
delWill (client, cb) {
const will = this._wills.get(client.id)
this._wills.delete(client.id)
cb(null, will, client)
}
cb(null)
}
MemoryPersistence.prototype.incomingGetPacket = function (client, packet, cb) {
const id = client.id
const store = this._incoming[id] || {}
var err = null
this._incoming[id] = store
if (!store[packet.messageId]) {
err = new Error('no such packet')
streamWill (brokers = {}) {
return Readable.from(willsByBrokers(this._wills, brokers))
}
cb(err, store[packet.messageId])
}
MemoryPersistence.prototype.incomingDelPacket = function (client, packet, cb) {
const id = client.id
const store = this._incoming[id] || {}
const toDelete = store[packet.messageId]
var err = null
if (!toDelete) {
err = new Error('no such packet')
} else {
delete store[packet.messageId]
getClientList (topic) {
return Readable.from(clientListbyTopic(this._subscriptions, topic))
}
cb(err)
}
MemoryPersistence.prototype.putWill = function (client, packet, cb) {
packet.brokerId = this.broker.id
packet.clientId = client.id
this._wills[client.id] = packet
cb(null, client)
}
MemoryPersistence.prototype.getWill = function (client, cb) {
cb(null, this._wills[client.id], client)
}
MemoryPersistence.prototype.delWill = function (client, cb) {
const will = this._wills[client.id]
delete this._wills[client.id]
cb(null, will, client)
}
MemoryPersistence.prototype.streamWill = function (brokers) {
const clients = Object.keys(this._wills)
const wills = this._wills
brokers = brokers || {}
return from2.obj(function match (size, next) {
var entry
while ((entry = clients.shift()) != null) {
if (!brokers[wills[entry].brokerId]) {
setImmediate(next, null, wills[entry])
return
}
destroy (cb) {
this._retained = null
if (cb) {
cb(null)
}
if (!entry) {
this.push(null)
}
})
}
}
MemoryPersistence.prototype.getClientList = function (topic) {
const clientSubs = this._subscriptions
const entries = clientSubs.entries(clientSubs)
return from2.obj(function match (size, next) {
var entry
while (!(entry = entries.next()).done) {
if (entry.value[1].has(topic)) {
setImmediate(next, null, entry.value[0])
return
}
}
next(null, null)
})
function _outgoingEnqueue (sub, packet) {
const id = sub.clientId
const queue = getMapRef(this._outgoing, id, [], CREATE_ON_EMPTY)
queue[queue.length] = new Packet(packet)
}
MemoryPersistence.prototype.destroy = function (cb) {
this._retained = null
if (cb) {
cb(null)
function getMapRef (map, key, ifEmpty, createOnEmpty = false) {
const value = map.get(key)
if (value === undefined && createOnEmpty) {
map.set(key, ifEmpty)
}
return value || ifEmpty
}
module.exports = MemoryPersistence
module.exports = () => { return new MemoryPersistence() }
module.exports.Packet = Packet

@@ -268,5 +268,5 @@ # aedes-persistence

```js
var test = require('tape').test
var myperst = require('./')
var abs = require('aedes-persistence/abstract')
const test = require('tape').test
const myperst = require('./')
const abs = require('aedes-persistence/abstract')

@@ -283,6 +283,6 @@ abs({

```js
var test = require('tape').test
var myperst = require('./')
var abs = require('aedes-persistence/abstract')
var clean = require('./clean') // invented module
const test = require('tape').test
const myperst = require('./')
const abs = require('aedes-persistence/abstract')
const clean = require('./clean') // invented module

@@ -289,0 +289,0 @@ abs({

@@ -1,11 +0,23 @@

import type { Brokers, Client, Subscription } from 'aedes';
import type { AedesPacket } from 'aedes-packet';
import type { QoS } from 'mqtt-packet';
import type { Readable } from 'stream';
import type { Brokers, Client, Subscription } from "aedes";
import type { AedesPacket } from "aedes-packet";
import type { Readable } from "stream";
export type { AedesPacket as Packet } from 'aedes-packet';
export type { AedesPacket as Packet } from "aedes-packet";
type ClientId = Subscription["clientId"];
type MessageId = AedesPacket["messageId"];
type Topic = Subscription["topic"];
type TopicPattern = Subscription["topic"];
type QoS = Subscription["qos"];
type Incoming = {
// tsc accepts:
// [messageId: MessageId]: AedesPacket;
// Workaround for tsd 0.20:
[messageId: number]: AedesPacket;
};
export interface AedesPersistenceSubscription {
clientId: string;
topic: string;
clientId: ClientId;
topic: Topic;
qos?: QoS;

@@ -18,15 +30,11 @@ }

interface Incoming {
[clientId: string]: { [messageId: string]: AedesPacket };
}
export interface AedesPersistence {
storeRetained: (
packet: AedesPacket,
cb: (error: CallbackError) => void
cb: (error: CallbackError) => void,
) => void;
createRetainedStream: (pattern: string) => Readable;
createRetainedStream: (pattern: TopicPattern) => Readable;
createRetainedStreamCombi: (patterns: string[]) => Readable;
createRetainedStreamCombi: (patterns: TopicPattern[]) => Readable;

@@ -36,3 +44,3 @@ addSubscriptions: (

subs: Subscription[],
cb: (error: CallbackError, client: Client) => void
cb: (error: CallbackError, client: Client) => void,
) => void;

@@ -43,3 +51,3 @@

subs: Subscription[],
cb: (error: CallbackError, client: Client) => void
cb: (error: CallbackError, client: Client) => void,
) => void;

@@ -51,5 +59,5 @@

error: CallbackError,
subs: { topic: string; qos: QoS }[],
client: Client
) => void
subs: { topic: Topic; qos: QoS }[],
client: Client,
) => void,
) => void;

@@ -61,9 +69,9 @@

subscriptionsCount: number,
clientsCount: number
) => void
clientsCount: number,
) => void,
) => void;
subscriptionsByTopic: (
pattern: string,
cb: (error: CallbackError, subs: AedesPersistenceSubscription[]) => void
pattern: TopicPattern,
cb: (error: CallbackError, subs: AedesPersistenceSubscription[]) => void,
) => void;

@@ -73,15 +81,15 @@

client: Client,
cb: (error: CallbackError, client: Client) => void
cb: (error: CallbackError, client: Client) => void,
) => void;
outgoingEnqueue: (
sub: { clientId: string },
sub: { clientId: ClientId },
packet: AedesPacket,
cb: (error: CallbackError) => void
cb: (error: CallbackError) => void,
) => void;
outgoingEnqueueCombi: (
subs: { clientId: string }[],
subs: { clientId: ClientId }[],
packet: AedesPacket,
cb: (error: CallbackError) => void
cb: (error: CallbackError) => void,
) => void;

@@ -92,3 +100,3 @@

packet: AedesPacket,
cb: (error: CallbackError, client: Client, packet: AedesPacket) => void
cb: (error: CallbackError, client: Client, packet: AedesPacket) => void,
) => void;

@@ -99,3 +107,3 @@

packet: AedesPacket,
cb: (error?: CallbackError, packet?: AedesPacket) => void
cb: (error?: CallbackError, packet?: AedesPacket) => void,
) => void;

@@ -108,3 +116,3 @@

packet: AedesPacket,
cb: (error: CallbackError) => void
cb: (error: CallbackError) => void,
) => void;

@@ -115,3 +123,3 @@

packet: AedesPacket,
cb: (error: CallbackError, packet: AedesPacket) => void
cb: (error: CallbackError, packet: AedesPacket) => void,
) => void;

@@ -122,3 +130,3 @@

packet: AedesPacket,
cb: (error: CallbackError) => void
cb: (error: CallbackError) => void,
) => void;

@@ -129,3 +137,3 @@

packet: AedesPacket,
cb: (error: CallbackError, client: Client) => void
cb: (error: CallbackError, client: Client) => void,
) => void;

@@ -135,3 +143,3 @@

client: Client,
cb: (error: CallbackError, will: WillPacket, client: Client) => void
cb: (error: CallbackError, will: WillPacket, client: Client) => void,
) => void;

@@ -141,3 +149,3 @@

client: Client,
cb: (error: CallbackError, will: WillPacket, client: Client) => void
cb: (error: CallbackError, will: WillPacket, client: Client) => void,
) => void;

@@ -153,8 +161,8 @@

export class AedesMemoryPersistence implements AedesPersistence {
_retained: AedesPacket[];
_retained: Map<Topic, AedesPacket>;
_subscriptions: Map<
AedesPersistenceSubscription['clientId'],
ClientId,
Map<
AedesPersistenceSubscription['topic'],
AedesPersistenceSubscription['qos']
Topic,
QoS
>

@@ -164,5 +172,5 @@ >;

_trie: any;
_outgoing: Record<string, AedesPacket[]>;
_incoming: Incoming;
_wills: Record<string, WillPacket>;
_outgoing: Map<ClientId, AedesPacket[]>;
_incoming: Map<ClientId, Incoming>;
_wills: Map<ClientId, WillPacket>;

@@ -173,8 +181,8 @@ constructor();

packet: AedesPacket,
cb: (error: CallbackError) => void
cb: (error: CallbackError) => void,
) => void;
createRetainedStream: (pattern: string) => Readable;
createRetainedStream: (pattern: TopicPattern) => Readable;
createRetainedStreamCombi: (patterns: string[]) => Readable;
createRetainedStreamCombi: (patterns: TopicPattern[]) => Readable;

@@ -184,3 +192,3 @@ addSubscriptions: (

subs: Subscription[],
cb: (error: CallbackError, client: Client) => void
cb: (error: CallbackError, client: Client) => void,
) => void;

@@ -191,3 +199,3 @@

subs: Subscription[],
cb: (error: CallbackError, client: Client) => void
cb: (error: CallbackError, client: Client) => void,
) => void;

@@ -200,4 +208,4 @@

subs: { topic: string; qos: QoS }[],
client: Client
) => void
client: Client,
) => void,
) => void;

@@ -209,4 +217,4 @@

subscriptionsCount: number,
clientsCount: number
) => void
clientsCount: number,
) => void,
) => void;

@@ -216,3 +224,3 @@

pattern: string,
cb: (error: CallbackError, subs: AedesPersistenceSubscription[]) => void
cb: (error: CallbackError, subs: AedesPersistenceSubscription[]) => void,
) => void;

@@ -222,15 +230,15 @@

client: Client,
cb: (error: CallbackError, client: Client) => void
cb: (error: CallbackError, client: Client) => void,
) => void;
outgoingEnqueue: (
sub: { clientId: string },
sub: { clientId: ClientId },
packet: AedesPacket,
cb: (error: CallbackError) => void
cb: (error: CallbackError) => void,
) => void;
outgoingEnqueueCombi: (
sub: { clientId: string }[],
sub: { clientId: ClientId }[],
packet: AedesPacket,
cb: (error: CallbackError) => void
cb: (error: CallbackError) => void,
) => void;

@@ -241,3 +249,3 @@

packet: AedesPacket,
cb: (error: CallbackError, client: Client, packet: AedesPacket) => void
cb: (error: CallbackError, client: Client, packet: AedesPacket) => void,
) => void;

@@ -248,3 +256,3 @@

packet: AedesPacket,
cb: (error?: CallbackError, packet?: AedesPacket) => void
cb: (error?: CallbackError, packet?: AedesPacket) => void,
) => void;

@@ -257,3 +265,3 @@

packet: AedesPacket,
cb: (error: CallbackError) => void
cb: (error: CallbackError) => void,
) => void;

@@ -264,3 +272,3 @@

packet: AedesPacket,
cb: (error: CallbackError, packet: AedesPacket) => void
cb: (error: CallbackError, packet: AedesPacket) => void,
) => void;

@@ -271,3 +279,3 @@

packet: AedesPacket,
cb: (error: CallbackError) => void
cb: (error: CallbackError) => void,
) => void;

@@ -278,3 +286,3 @@

packet: AedesPacket,
cb: (error: CallbackError, client: Client) => void
cb: (error: CallbackError, client: Client) => void,
) => void;

@@ -284,3 +292,3 @@

client: Client,
cb: (error: CallbackError, will: WillPacket, client: Client) => void
cb: (error: CallbackError, will: WillPacket, client: Client) => void,
) => void;

@@ -290,3 +298,3 @@

client: Client,
cb: (error: CallbackError, will: WillPacket, client: Client) => void
cb: (error: CallbackError, will: WillPacket, client: Client) => void,
) => void;

@@ -293,0 +301,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc