aedes-persistence
Advanced tools
Comparing version 8.1.3 to 9.0.0
490
abstract.js
@@ -1,6 +0,2 @@ | ||
'use strict' | ||
const concat = require('concat-stream') | ||
const pump = require('pump') | ||
const through = require('through2') | ||
const { Readable } = require('stream') | ||
const Packet = require('aedes-packet') | ||
@@ -10,3 +6,3 @@ | ||
const test = opts.test | ||
var _persistence = opts.persistence | ||
let _persistence = opts.persistence | ||
const waitForReady = opts.waitForReady | ||
@@ -35,3 +31,3 @@ | ||
_persistence(function (err, instance) { | ||
_persistence((err, instance) => { | ||
if (instance) { | ||
@@ -44,3 +40,3 @@ // Wait for ready event, if applicable, to ensure the persistence isn't | ||
// can result in 'ready' being emitted. | ||
instance.on('ready', function () { | ||
instance.on('ready', () => { | ||
instance.removeListener('error', cb) | ||
@@ -61,2 +57,25 @@ cb(null, instance) | ||
// legacy third party streams are typically not iterable | ||
function iterableStream (stream) { | ||
if (typeof stream[Symbol.iterator] !== 'function') { | ||
return new Readable({ objectMode: true }).wrap(stream) | ||
} | ||
return stream | ||
} | ||
// end of legacy third party streams support | ||
async function getArrayFromStream (stream) { | ||
const list = [] | ||
for await (const item of iterableStream(stream)) { | ||
list.push(item) | ||
} | ||
return list | ||
} | ||
async function streamForEach (stream, fn) { | ||
for await (const item of iterableStream(stream)) { | ||
fn(item) | ||
} | ||
} | ||
function storeRetained (instance, opts, cb) { | ||
@@ -74,3 +93,3 @@ opts = opts || {} | ||
instance.storeRetained(packet, function (err) { | ||
instance.storeRetained(packet, err => { | ||
cb(err, packet) | ||
@@ -81,8 +100,8 @@ }) | ||
function matchRetainedWithPattern (t, pattern, opts) { | ||
persistence(function (err, instance) { | ||
persistence((err, instance) => { | ||
if (err) { throw err } | ||
storeRetained(instance, opts, function (err, packet) { | ||
storeRetained(instance, opts, (err, packet) => { | ||
t.notOk(err, 'no error') | ||
var stream | ||
let stream | ||
if (Array.isArray(pattern)) { | ||
@@ -94,6 +113,6 @@ stream = instance.createRetainedStreamCombi(pattern) | ||
stream.pipe(concat(function (list) { | ||
getArrayFromStream(stream).then(list => { | ||
t.deepEqual(list, [packet], 'must return the packet') | ||
instance.destroy(t.end.bind(t)) | ||
})) | ||
}) | ||
}) | ||
@@ -104,4 +123,4 @@ }) | ||
function testInstance (title, cb) { | ||
test(title, function (t) { | ||
persistence(function (err, instance) { | ||
test(title, t => { | ||
persistence((err, instance) => { | ||
if (err) { throw err } | ||
@@ -119,25 +138,25 @@ cb(t, instance) | ||
test('store and look up retained messages', function (t) { | ||
test('store and look up retained messages', t => { | ||
matchRetainedWithPattern(t, 'hello/world') | ||
}) | ||
test('look up retained messages with a # pattern', function (t) { | ||
test('look up retained messages with a # pattern', t => { | ||
matchRetainedWithPattern(t, '#') | ||
}) | ||
test('look up retained messages with a hello/world/# pattern', function (t) { | ||
test('look up retained messages with a hello/world/# pattern', t => { | ||
matchRetainedWithPattern(t, 'hello/world/#') | ||
}) | ||
test('look up retained messages with a + pattern', function (t) { | ||
test('look up retained messages with a + pattern', t => { | ||
matchRetainedWithPattern(t, 'hello/+') | ||
}) | ||
test('look up retained messages with multiple patterns', function (t) { | ||
test('look up retained messages with multiple patterns', t => { | ||
matchRetainedWithPattern(t, ['hello/+', 'other/hello']) | ||
}) | ||
testInstance('store multiple retained messages in order', function (t, instance) { | ||
testInstance('store multiple retained messages in order', (t, instance) => { | ||
const totalMessages = 1000 | ||
var done = 0 | ||
let done = 0 | ||
@@ -155,3 +174,3 @@ const retained = { | ||
instance.storeRetained(packet, function (err) { | ||
instance.storeRetained(packet, err => { | ||
t.notOk(err, 'no error') | ||
@@ -170,16 +189,15 @@ t.equal(packet.brokerCounter, index + 1, 'packet stored in order') | ||
testInstance('remove retained message', function (t, instance) { | ||
storeRetained(instance, {}, function (err, packet) { | ||
testInstance('remove retained message', (t, instance) => { | ||
storeRetained(instance, {}, (err, packet) => { | ||
t.notOk(err, 'no error') | ||
storeRetained(instance, { | ||
payload: Buffer.alloc(0) | ||
}, function (err) { | ||
}, err => { | ||
t.notOk(err, 'no error') | ||
const stream = instance.createRetainedStream('#') | ||
stream.pipe(concat(function (list) { | ||
getArrayFromStream(stream).then(list => { | ||
t.deepEqual(list, [], 'must return an empty list') | ||
instance.destroy(t.end.bind(t)) | ||
})) | ||
}) | ||
}) | ||
@@ -189,8 +207,8 @@ }) | ||
testInstance('storing twice a retained message should keep only the last', function (t, instance) { | ||
storeRetained(instance, {}, function (err, packet) { | ||
testInstance('storing twice a retained message should keep only the last', (t, instance) => { | ||
storeRetained(instance, {}, (err, packet) => { | ||
t.notOk(err, 'no error') | ||
storeRetained(instance, { | ||
payload: Buffer.from('ahah') | ||
}, function (err, packet) { | ||
}, (err, packet) => { | ||
t.notOk(err, 'no error') | ||
@@ -200,6 +218,6 @@ | ||
stream.pipe(concat(function (list) { | ||
getArrayFromStream(stream).then(list => { | ||
t.deepEqual(list, [packet], 'must return the last packet') | ||
instance.destroy(t.end.bind(t)) | ||
})) | ||
}) | ||
}) | ||
@@ -209,3 +227,3 @@ }) | ||
testInstance('Create a new packet while storing a retained message', function (t, instance) { | ||
testInstance('Create a new packet while storing a retained message', (t, instance) => { | ||
const packet = { | ||
@@ -221,3 +239,3 @@ cmd: 'publish', | ||
instance.storeRetained(packet, function (err) { | ||
instance.storeRetained(packet, err => { | ||
t.notOk(err, 'no error') | ||
@@ -228,10 +246,10 @@ // packet reference change to check if a new packet is stored always | ||
stream.pipe(concat(function (list) { | ||
getArrayFromStream(stream).then(list => { | ||
t.deepEqual(list, [newPacket], 'must return the last packet') | ||
instance.destroy(t.end.bind(t)) | ||
})) | ||
}) | ||
}) | ||
}) | ||
testInstance('store and look up subscriptions by client', function (t, instance) { | ||
testInstance('store and look up subscriptions by client', (t, instance) => { | ||
const client = { id: 'abcde' } | ||
@@ -249,6 +267,6 @@ const subs = [{ | ||
instance.addSubscriptions(client, subs, function (err, reClient) { | ||
instance.addSubscriptions(client, subs, (err, reClient) => { | ||
t.equal(reClient, client, 'client must be the same') | ||
t.notOk(err, 'no error') | ||
instance.subscriptionsByClient(client, function (err, resubs, reReClient) { | ||
instance.subscriptionsByClient(client, (err, resubs, reReClient) => { | ||
t.equal(reReClient, client, 'client must be the same') | ||
@@ -262,3 +280,3 @@ t.notOk(err, 'no error') | ||
testInstance('remove subscriptions by client', function (t, instance) { | ||
testInstance('remove subscriptions by client', (t, instance) => { | ||
const client = { id: 'abcde' } | ||
@@ -273,8 +291,8 @@ const subs = [{ | ||
instance.addSubscriptions(client, subs, function (err, reClient) { | ||
instance.addSubscriptions(client, subs, (err, reClient) => { | ||
t.notOk(err, 'no error') | ||
instance.removeSubscriptions(client, ['hello'], function (err, reClient) { | ||
instance.removeSubscriptions(client, ['hello'], (err, reClient) => { | ||
t.notOk(err, 'no error') | ||
t.equal(reClient, client, 'client must be the same') | ||
instance.subscriptionsByClient(client, function (err, resubs, reClient) { | ||
instance.subscriptionsByClient(client, (err, resubs, reClient) => { | ||
t.equal(reClient, client, 'client must be the same') | ||
@@ -292,3 +310,3 @@ t.notOk(err, 'no error') | ||
testInstance('store and look up subscriptions by topic', function (t, instance) { | ||
testInstance('store and look up subscriptions by topic', (t, instance) => { | ||
const client = { id: 'abcde' } | ||
@@ -306,5 +324,5 @@ const subs = [{ | ||
instance.addSubscriptions(client, subs, function (err) { | ||
instance.addSubscriptions(client, subs, err => { | ||
t.notOk(err, 'no error') | ||
instance.subscriptionsByTopic('hello', function (err, resubs) { | ||
instance.subscriptionsByTopic('hello', (err, resubs) => { | ||
t.notOk(err, 'no error') | ||
@@ -325,3 +343,3 @@ t.deepEqual(resubs, [{ | ||
testInstance('get client list after subscriptions', function (t, instance) { | ||
testInstance('get client list after subscriptions', (t, instance) => { | ||
const client1 = { id: 'abcde' } | ||
@@ -334,11 +352,11 @@ const client2 = { id: 'efghi' } | ||
instance.addSubscriptions(client1, subs, function (err) { | ||
instance.addSubscriptions(client1, subs, err => { | ||
t.notOk(err, 'no error for client 1') | ||
instance.addSubscriptions(client2, subs, function (err) { | ||
instance.addSubscriptions(client2, subs, err => { | ||
t.notOk(err, 'no error for client 2') | ||
const stream = instance.getClientList(subs[0].topic) | ||
stream.pipe(concat({ encoding: 'object' }, function (out) { | ||
getArrayFromStream(stream).then(out => { | ||
t.deepEqual(out, [client1.id, client2.id]) | ||
instance.destroy(t.end.bind(t)) | ||
})) | ||
}) | ||
}) | ||
@@ -348,3 +366,3 @@ }) | ||
testInstance('get client list after an unsubscribe', function (t, instance) { | ||
testInstance('get client list after an unsubscribe', (t, instance) => { | ||
const client1 = { id: 'abcde' } | ||
@@ -357,13 +375,13 @@ const client2 = { id: 'efghi' } | ||
instance.addSubscriptions(client1, subs, function (err) { | ||
instance.addSubscriptions(client1, subs, err => { | ||
t.notOk(err, 'no error for client 1') | ||
instance.addSubscriptions(client2, subs, function (err) { | ||
instance.addSubscriptions(client2, subs, err => { | ||
t.notOk(err, 'no error for client 2') | ||
instance.removeSubscriptions(client2, [subs[0].topic], function (err, reClient) { | ||
instance.removeSubscriptions(client2, [subs[0].topic], (err, reClient) => { | ||
t.notOk(err, 'no error for removeSubscriptions') | ||
const stream = instance.getClientList(subs[0].topic) | ||
stream.pipe(concat({ encoding: 'object' }, function (out) { | ||
getArrayFromStream(stream).then(out => { | ||
t.deepEqual(out, [client1.id]) | ||
instance.destroy(t.end.bind(t)) | ||
})) | ||
}) | ||
}) | ||
@@ -374,3 +392,3 @@ }) | ||
testInstance('get subscriptions list after an unsubscribe', function (t, instance) { | ||
testInstance('get subscriptions list after an unsubscribe', (t, instance) => { | ||
const client1 = { id: 'abcde' } | ||
@@ -383,9 +401,9 @@ const client2 = { id: 'efghi' } | ||
instance.addSubscriptions(client1, subs, function (err) { | ||
instance.addSubscriptions(client1, subs, err => { | ||
t.notOk(err, 'no error for client 1') | ||
instance.addSubscriptions(client2, subs, function (err) { | ||
instance.addSubscriptions(client2, subs, err => { | ||
t.notOk(err, 'no error for client 2') | ||
instance.removeSubscriptions(client2, [subs[0].topic], function (err, reClient) { | ||
instance.removeSubscriptions(client2, [subs[0].topic], (err, reClient) => { | ||
t.notOk(err, 'no error for removeSubscriptions') | ||
instance.subscriptionsByTopic(subs[0].topic, function (err, clients) { | ||
instance.subscriptionsByTopic(subs[0].topic, (err, clients) => { | ||
t.notOk(err, 'no error getting subscriptions by topic') | ||
@@ -400,3 +418,3 @@ t.deepEqual(clients[0].clientId, client1.id) | ||
testInstance('QoS 0 subscriptions, restored but not matched', function (t, instance) { | ||
testInstance('QoS 0 subscriptions, restored but not matched', (t, instance) => { | ||
const client = { id: 'abcde' } | ||
@@ -414,8 +432,8 @@ const subs = [{ | ||
instance.addSubscriptions(client, subs, function (err) { | ||
instance.addSubscriptions(client, subs, err => { | ||
t.notOk(err, 'no error') | ||
instance.subscriptionsByClient(client, function (err, resubs) { | ||
instance.subscriptionsByClient(client, (err, resubs) => { | ||
t.notOk(err, 'no error') | ||
t.deepEqual(resubs, subs) | ||
instance.subscriptionsByTopic('hello', function (err, resubs2) { | ||
instance.subscriptionsByTopic('hello', (err, resubs2) => { | ||
t.notOk(err, 'no error') | ||
@@ -433,3 +451,3 @@ t.deepEqual(resubs2, [{ | ||
testInstance('clean subscriptions', function (t, instance) { | ||
testInstance('clean subscriptions', (t, instance) => { | ||
const client = { id: 'abcde' } | ||
@@ -444,15 +462,15 @@ const subs = [{ | ||
instance.addSubscriptions(client, subs, function (err) { | ||
instance.addSubscriptions(client, subs, err => { | ||
t.notOk(err, 'no error') | ||
instance.cleanSubscriptions(client, function (err) { | ||
instance.cleanSubscriptions(client, err => { | ||
t.notOk(err, 'no error') | ||
instance.subscriptionsByTopic('hello', function (err, resubs) { | ||
instance.subscriptionsByTopic('hello', (err, resubs) => { | ||
t.notOk(err, 'no error') | ||
t.deepEqual(resubs, [], 'no subscriptions') | ||
instance.subscriptionsByClient(client, function (err, resubs) { | ||
instance.subscriptionsByClient(client, (err, resubs) => { | ||
t.error(err) | ||
t.deepEqual(resubs, null, 'no subscriptions') | ||
instance.countOffline(function (err, subsCount, clientsCount) { | ||
instance.countOffline((err, subsCount, clientsCount) => { | ||
t.error(err, 'no error') | ||
@@ -470,16 +488,16 @@ t.equal(subsCount, 0, 'no subscriptions added') | ||
testInstance('clean subscriptions with no active subscriptions', function (t, instance) { | ||
testInstance('clean subscriptions with no active subscriptions', (t, instance) => { | ||
const client = { id: 'abcde' } | ||
instance.cleanSubscriptions(client, function (err) { | ||
instance.cleanSubscriptions(client, err => { | ||
t.notOk(err, 'no error') | ||
instance.subscriptionsByTopic('hello', function (err, resubs) { | ||
instance.subscriptionsByTopic('hello', (err, resubs) => { | ||
t.notOk(err, 'no error') | ||
t.deepEqual(resubs, [], 'no subscriptions') | ||
instance.subscriptionsByClient(client, function (err, resubs) { | ||
instance.subscriptionsByClient(client, (err, resubs) => { | ||
t.error(err) | ||
t.deepEqual(resubs, null, 'no subscriptions') | ||
instance.countOffline(function (err, subsCount, clientsCount) { | ||
instance.countOffline((err, subsCount, clientsCount) => { | ||
t.error(err, 'no error') | ||
@@ -496,3 +514,3 @@ t.equal(subsCount, 0, 'no subscriptions added') | ||
testInstance('same topic, different QoS', function (t, instance) { | ||
testInstance('same topic, different QoS', (t, instance) => { | ||
const client = { id: 'abcde' } | ||
@@ -507,7 +525,7 @@ const subs = [{ | ||
instance.addSubscriptions(client, subs, function (err, reClient) { | ||
instance.addSubscriptions(client, subs, (err, reClient) => { | ||
t.equal(reClient, client, 'client must be the same') | ||
t.error(err, 'no error') | ||
instance.subscriptionsByClient(client, function (err, subsForClient, client) { | ||
instance.subscriptionsByClient(client, (err, subsForClient, client) => { | ||
t.error(err, 'no error') | ||
@@ -519,3 +537,3 @@ t.deepEqual(subsForClient, [{ | ||
instance.subscriptionsByTopic('hello', function (err, subsForTopic) { | ||
instance.subscriptionsByTopic('hello', (err, subsForTopic) => { | ||
t.error(err, 'no error') | ||
@@ -528,3 +546,3 @@ t.deepEqual(subsForTopic, [{ | ||
instance.countOffline(function (err, subsCount, clientsCount) { | ||
instance.countOffline((err, subsCount, clientsCount) => { | ||
t.error(err, 'no error') | ||
@@ -541,3 +559,3 @@ t.equal(subsCount, 1, 'one subscription added') | ||
testInstance('replace subscriptions', function (t, instance) { | ||
testInstance('replace subscriptions', (t, instance) => { | ||
const client = { id: 'abcde' } | ||
@@ -550,12 +568,12 @@ const topic = 'hello' | ||
sub.qos = subByTopic.qos = qos | ||
instance.addSubscriptions(client, [sub], function (err, reClient) { | ||
instance.addSubscriptions(client, [sub], (err, reClient) => { | ||
t.equal(reClient, client, 'client must be the same') | ||
t.error(err, 'no error') | ||
instance.subscriptionsByClient(client, function (err, subsForClient, client) { | ||
instance.subscriptionsByClient(client, (err, subsForClient, client) => { | ||
t.error(err, 'no error') | ||
t.deepEqual(subsForClient, [sub]) | ||
instance.subscriptionsByTopic(topic, function (err, subsForTopic) { | ||
instance.subscriptionsByTopic(topic, (err, subsForTopic) => { | ||
t.error(err, 'no error') | ||
t.deepEqual(subsForTopic, qos === 0 ? [] : [subByTopic]) | ||
instance.countOffline(function (err, subsCount, clientsCount) { | ||
instance.countOffline((err, subsCount, clientsCount) => { | ||
t.error(err, 'no error') | ||
@@ -575,7 +593,7 @@ if (qos === 0) { | ||
check(0, function () { | ||
check(1, function () { | ||
check(2, function () { | ||
check(1, function () { | ||
check(0, function () { | ||
check(0, () => { | ||
check(1, () => { | ||
check(2, () => { | ||
check(1, () => { | ||
check(0, () => { | ||
instance.destroy(t.end.bind(t)) | ||
@@ -589,3 +607,3 @@ }) | ||
testInstance('replace subscriptions in same call', function (t, instance) { | ||
testInstance('replace subscriptions in same call', (t, instance) => { | ||
const client = { id: 'abcde' } | ||
@@ -600,12 +618,12 @@ const topic = 'hello' | ||
] | ||
instance.addSubscriptions(client, subs, function (err, reClient) { | ||
instance.addSubscriptions(client, subs, (err, reClient) => { | ||
t.equal(reClient, client, 'client must be the same') | ||
t.error(err, 'no error') | ||
instance.subscriptionsByClient(client, function (err, subsForClient, client) { | ||
instance.subscriptionsByClient(client, (err, subsForClient, client) => { | ||
t.error(err, 'no error') | ||
t.deepEqual(subsForClient, [{ topic, qos: 0 }]) | ||
instance.subscriptionsByTopic(topic, function (err, subsForTopic) { | ||
instance.subscriptionsByTopic(topic, (err, subsForTopic) => { | ||
t.error(err, 'no error') | ||
t.deepEqual(subsForTopic, []) | ||
instance.countOffline(function (err, subsCount, clientsCount) { | ||
instance.countOffline((err, subsCount, clientsCount) => { | ||
t.error(err, 'no error') | ||
@@ -621,3 +639,3 @@ t.equal(subsCount, 0, 'no subscriptions added') | ||
testInstance('store and count subscriptions', function (t, instance) { | ||
testInstance('store and count subscriptions', (t, instance) => { | ||
const client = { id: 'abcde' } | ||
@@ -635,7 +653,7 @@ const subs = [{ | ||
instance.addSubscriptions(client, subs, function (err, reClient) { | ||
instance.addSubscriptions(client, subs, (err, reClient) => { | ||
t.equal(reClient, client, 'client must be the same') | ||
t.error(err, 'no error') | ||
instance.countOffline(function (err, subsCount, clientsCount) { | ||
instance.countOffline((err, subsCount, clientsCount) => { | ||
t.error(err, 'no error') | ||
@@ -645,6 +663,6 @@ t.equal(subsCount, 2, 'two subscriptions added') | ||
instance.removeSubscriptions(client, ['hello'], function (err, reClient) { | ||
instance.removeSubscriptions(client, ['hello'], (err, reClient) => { | ||
t.error(err, 'no error') | ||
instance.countOffline(function (err, subsCount, clientsCount) { | ||
instance.countOffline((err, subsCount, clientsCount) => { | ||
t.error(err, 'no error') | ||
@@ -654,6 +672,6 @@ t.equal(subsCount, 1, 'one subscription added') | ||
instance.removeSubscriptions(client, ['matteo'], function (err, reClient) { | ||
instance.removeSubscriptions(client, ['matteo'], (err, reClient) => { | ||
t.error(err, 'no error') | ||
instance.countOffline(function (err, subsCount, clientsCount) { | ||
instance.countOffline((err, subsCount, clientsCount) => { | ||
t.error(err, 'no error') | ||
@@ -663,6 +681,6 @@ t.equal(subsCount, 0, 'zero subscriptions added') | ||
instance.removeSubscriptions(client, ['noqos'], function (err, reClient) { | ||
instance.removeSubscriptions(client, ['noqos'], (err, reClient) => { | ||
t.error(err, 'no error') | ||
instance.countOffline(function (err, subsCount, clientsCount) { | ||
instance.countOffline((err, subsCount, clientsCount) => { | ||
t.error(err, 'no error') | ||
@@ -672,6 +690,6 @@ t.equal(subsCount, 0, 'zero subscriptions added') | ||
instance.removeSubscriptions(client, ['noqos'], function (err, reClient) { | ||
instance.removeSubscriptions(client, ['noqos'], (err, reClient) => { | ||
t.error(err, 'no error') | ||
instance.countOffline(function (err, subsCount, clientsCount) { | ||
instance.countOffline((err, subsCount, clientsCount) => { | ||
t.error(err, 'no error') | ||
@@ -694,3 +712,3 @@ t.equal(subsCount, 0, 'zero subscriptions added') | ||
testInstance('count subscriptions with two clients', function (t, instance) { | ||
testInstance('count subscriptions with two clients', (t, instance) => { | ||
const client1 = { id: 'abcde' } | ||
@@ -710,7 +728,7 @@ const client2 = { id: 'fghij' } | ||
function remove (client, subs, expectedSubs, expectedClients, cb) { | ||
instance.removeSubscriptions(client, subs, function (err, reClient) { | ||
instance.removeSubscriptions(client, subs, (err, reClient) => { | ||
t.error(err, 'no error') | ||
t.equal(reClient, client, 'client must be the same') | ||
instance.countOffline(function (err, subsCount, clientsCount) { | ||
instance.countOffline((err, subsCount, clientsCount) => { | ||
t.error(err, 'no error') | ||
@@ -725,18 +743,18 @@ t.equal(subsCount, expectedSubs, 'subscriptions added') | ||
instance.addSubscriptions(client1, subs, function (err, reClient) { | ||
instance.addSubscriptions(client1, subs, (err, reClient) => { | ||
t.equal(reClient, client1, 'client must be the same') | ||
t.error(err, 'no error') | ||
instance.addSubscriptions(client2, subs, function (err, reClient) { | ||
instance.addSubscriptions(client2, subs, (err, reClient) => { | ||
t.equal(reClient, client2, 'client must be the same') | ||
t.error(err, 'no error') | ||
remove(client1, ['foobar'], 4, 2, function () { | ||
remove(client1, ['hello'], 3, 2, function () { | ||
remove(client1, ['hello'], 3, 2, function () { | ||
remove(client1, ['matteo'], 2, 2, function () { | ||
remove(client1, ['noqos'], 2, 1, function () { | ||
remove(client2, ['hello'], 1, 1, function () { | ||
remove(client2, ['matteo'], 0, 1, function () { | ||
remove(client2, ['noqos'], 0, 0, function () { | ||
remove(client1, ['foobar'], 4, 2, () => { | ||
remove(client1, ['hello'], 3, 2, () => { | ||
remove(client1, ['hello'], 3, 2, () => { | ||
remove(client1, ['matteo'], 2, 2, () => { | ||
remove(client1, ['noqos'], 2, 1, () => { | ||
remove(client2, ['hello'], 1, 1, () => { | ||
remove(client2, ['matteo'], 0, 1, () => { | ||
remove(client2, ['noqos'], 0, 0, () => { | ||
instance.destroy(t.end.bind(t)) | ||
@@ -755,3 +773,3 @@ }) | ||
testInstance('add duplicate subs to persistence for qos > 0', function (t, instance) { | ||
testInstance('add duplicate subs to persistence for qos > 0', (t, instance) => { | ||
const client = { id: 'abcde' } | ||
@@ -764,11 +782,11 @@ const topic = 'hello' | ||
instance.addSubscriptions(client, subs, function (err, reClient) { | ||
instance.addSubscriptions(client, subs, (err, reClient) => { | ||
t.equal(reClient, client, 'client must be the same') | ||
t.error(err, 'no error') | ||
instance.addSubscriptions(client, subs, function (err, resCLient) { | ||
instance.addSubscriptions(client, subs, (err, resCLient) => { | ||
t.equal(resCLient, client, 'client must be the same') | ||
t.error(err, 'no error') | ||
subs[0].clientId = client.id | ||
instance.subscriptionsByTopic(topic, function (err, subsForTopic) { | ||
instance.subscriptionsByTopic(topic, (err, subsForTopic) => { | ||
t.error(err, 'no error') | ||
@@ -782,3 +800,3 @@ t.deepEqual(subsForTopic, subs) | ||
testInstance('add duplicate subs to persistence for qos 0', function (t, instance) { | ||
testInstance('add duplicate subs to persistence for qos 0', (t, instance) => { | ||
const client = { id: 'abcde' } | ||
@@ -791,10 +809,10 @@ const topic = 'hello' | ||
instance.addSubscriptions(client, subs, function (err, reClient) { | ||
instance.addSubscriptions(client, subs, (err, reClient) => { | ||
t.equal(reClient, client, 'client must be the same') | ||
t.error(err, 'no error') | ||
instance.addSubscriptions(client, subs, function (err, resCLient) { | ||
instance.addSubscriptions(client, subs, (err, resCLient) => { | ||
t.equal(resCLient, client, 'client must be the same') | ||
t.error(err, 'no error') | ||
instance.subscriptionsByClient(client, function (err, subsForClient, client) { | ||
instance.subscriptionsByClient(client, (err, subsForClient, client) => { | ||
t.error(err, 'no error') | ||
@@ -808,3 +826,3 @@ t.deepEqual(subsForClient, subs) | ||
testInstance('get topic list after concurrent subscriptions of a client', function (t, instance) { | ||
testInstance('get topic list after concurrent subscriptions of a client', (t, instance) => { | ||
const client = { id: 'abcde' } | ||
@@ -819,7 +837,7 @@ const subs1 = [{ | ||
}] | ||
var calls = 2 | ||
let calls = 2 | ||
function done () { | ||
if (!--calls) { | ||
instance.subscriptionsByClient(client, function (err, resubs) { | ||
instance.subscriptionsByClient(client, (err, resubs) => { | ||
t.notOk(err, 'no error') | ||
@@ -833,7 +851,7 @@ resubs.sort((a, b) => b.topic.localeCompare(b.topic, 'en')) | ||
instance.addSubscriptions(client, subs1, function (err) { | ||
instance.addSubscriptions(client, subs1, err => { | ||
t.notOk(err, 'no error for hello1') | ||
done() | ||
}) | ||
instance.addSubscriptions(client, subs2, function (err) { | ||
instance.addSubscriptions(client, subs2, err => { | ||
t.notOk(err, 'no error for hello2') | ||
@@ -844,3 +862,3 @@ done() | ||
testInstance('add outgoing packet and stream it', function (t, instance) { | ||
testInstance('add outgoing packet and stream it', (t, instance) => { | ||
const sub = { | ||
@@ -877,15 +895,15 @@ clientId: 'abcde', | ||
instance.outgoingEnqueue(sub, packet, function (err) { | ||
instance.outgoingEnqueue(sub, packet, err => { | ||
t.error(err) | ||
const stream = instance.outgoingStream(client) | ||
stream.pipe(concat(function (list) { | ||
var packet = list[0] | ||
getArrayFromStream(stream).then(list => { | ||
const packet = list[0] | ||
testPacket(t, packet, expected) | ||
instance.destroy(t.end.bind(t)) | ||
})) | ||
}) | ||
}) | ||
}) | ||
testInstance('add outgoing packet for multiple subs and stream to all', function (t, instance) { | ||
testInstance('add outgoing packet for multiple subs and stream to all', (t, instance) => { | ||
const sub = { | ||
@@ -931,20 +949,20 @@ clientId: 'abcde', | ||
instance.outgoingEnqueueCombi(subs, packet, function (err) { | ||
instance.outgoingEnqueueCombi(subs, packet, err => { | ||
t.error(err) | ||
const stream = instance.outgoingStream(client) | ||
stream.pipe(concat(function (list) { | ||
var packet = list[0] | ||
getArrayFromStream(stream).then(list => { | ||
const packet = list[0] | ||
testPacket(t, packet, expected) | ||
const stream2 = instance.outgoingStream(client2) | ||
stream2.pipe(concat(function (list) { | ||
var packet = list[0] | ||
getArrayFromStream(stream2).then(list2 => { | ||
const packet = list2[0] | ||
testPacket(t, packet, expected) | ||
instance.destroy(t.end.bind(t)) | ||
})) | ||
})) | ||
}) | ||
}) | ||
}) | ||
}) | ||
testInstance('add outgoing packet as a string and pump', function (t, instance) { | ||
testInstance('add outgoing packet as a string and pump', (t, instance) => { | ||
const sub = { | ||
@@ -977,13 +995,14 @@ clientId: 'abcde', | ||
const queue = [] | ||
enqueueAndUpdate(t, instance, client, sub, packet1, 42, function (updated1) { | ||
enqueueAndUpdate(t, instance, client, sub, packet2, 43, function (updated2) { | ||
enqueueAndUpdate(t, instance, client, sub, packet1, 42, updated1 => { | ||
enqueueAndUpdate(t, instance, client, sub, packet2, 43, updated2 => { | ||
const stream = instance.outgoingStream(client) | ||
pump(stream, through.obj(function clearQueue (data, enc, next) { | ||
function clearQueue (data) { | ||
instance.outgoingUpdate(client, data, | ||
function (err, client, packet) { | ||
(err, client, packet) => { | ||
t.notOk(err, 'no error') | ||
queue.push(packet) | ||
next() | ||
}) | ||
}), function done () { | ||
} | ||
streamForEach(stream, clearQueue).then(function done () { | ||
t.equal(queue.length, 2) | ||
@@ -1000,3 +1019,3 @@ if (queue.length === 2) { | ||
testInstance('add outgoing packet as a string and stream', function (t, instance) { | ||
testInstance('add outgoing packet as a string and stream', (t, instance) => { | ||
const sub = { | ||
@@ -1033,15 +1052,15 @@ clientId: 'abcde', | ||
instance.outgoingEnqueueCombi([sub], packet, function (err) { | ||
instance.outgoingEnqueueCombi([sub], packet, err => { | ||
t.error(err) | ||
const stream = instance.outgoingStream(client) | ||
stream.pipe(concat(function (list) { | ||
var packet = list[0] | ||
getArrayFromStream(stream).then(list => { | ||
const packet = list[0] | ||
testPacket(t, packet, expected) | ||
instance.destroy(t.end.bind(t)) | ||
})) | ||
}) | ||
}) | ||
}) | ||
testInstance('add outgoing packet and stream it twice', function (t, instance) { | ||
testInstance('add outgoing packet and stream it twice', (t, instance) => { | ||
const sub = { | ||
@@ -1079,19 +1098,19 @@ clientId: 'abcde', | ||
instance.outgoingEnqueueCombi([sub], packet, function (err) { | ||
instance.outgoingEnqueueCombi([sub], packet, err => { | ||
t.error(err) | ||
const stream = instance.outgoingStream(client) | ||
stream.pipe(concat(function (list) { | ||
var packet = list[0] | ||
getArrayFromStream(stream).then(list => { | ||
const packet = list[0] | ||
testPacket(t, packet, expected) | ||
const stream = instance.outgoingStream(client) | ||
const stream2 = instance.outgoingStream(client) | ||
stream.pipe(concat(function (list) { | ||
var packet = list[0] | ||
getArrayFromStream(stream2).then(list2 => { | ||
const packet = list2[0] | ||
testPacket(t, packet, expected) | ||
t.notEqual(packet, expected, 'packet must be a different object') | ||
instance.destroy(t.end.bind(t)) | ||
})) | ||
})) | ||
}) | ||
}) | ||
}) | ||
@@ -1101,3 +1120,3 @@ }) | ||
function enqueueAndUpdate (t, instance, client, sub, packet, messageId, callback) { | ||
instance.outgoingEnqueueCombi([sub], packet, function (err) { | ||
instance.outgoingEnqueueCombi([sub], packet, err => { | ||
t.error(err) | ||
@@ -1107,3 +1126,3 @@ const updated = new Packet(packet) | ||
instance.outgoingUpdate(client, updated, function (err, reclient, repacket) { | ||
instance.outgoingUpdate(client, updated, (err, reclient, repacket) => { | ||
t.error(err) | ||
@@ -1117,3 +1136,3 @@ t.equal(reclient, client, 'client matches') | ||
testInstance('add outgoing packet and update messageId', function (t, instance) { | ||
testInstance('add outgoing packet and update messageId', (t, instance) => { | ||
const sub = { | ||
@@ -1137,6 +1156,6 @@ clientId: 'abcde', topic: 'hello', qos: 1 | ||
enqueueAndUpdate(t, instance, client, sub, packet, 42, function (updated) { | ||
enqueueAndUpdate(t, instance, client, sub, packet, 42, updated => { | ||
const stream = instance.outgoingStream(client) | ||
delete updated.messageId | ||
stream.pipe(concat(function (list) { | ||
getArrayFromStream(stream).then(list => { | ||
delete list[0].messageId | ||
@@ -1146,7 +1165,7 @@ t.notEqual(list[0], updated, 'must not be the same object') | ||
instance.destroy(t.end.bind(t)) | ||
})) | ||
}) | ||
}) | ||
}) | ||
testInstance('add 2 outgoing packet and clear messageId', function (t, instance) { | ||
testInstance('add 2 outgoing packet and clear messageId', (t, instance) => { | ||
const sub = { | ||
@@ -1181,5 +1200,5 @@ clientId: 'abcde', topic: 'hello', qos: 1 | ||
enqueueAndUpdate(t, instance, client, sub, packet1, 42, function (updated1) { | ||
enqueueAndUpdate(t, instance, client, sub, packet2, 43, function (updated2) { | ||
instance.outgoingClearMessageId(client, updated1, function (err, packet) { | ||
enqueueAndUpdate(t, instance, client, sub, packet1, 42, updated1 => { | ||
enqueueAndUpdate(t, instance, client, sub, packet2, 43, updated2 => { | ||
instance.outgoingClearMessageId(client, updated1, (err, packet) => { | ||
t.error(err) | ||
@@ -1191,3 +1210,3 @@ t.deepEqual(packet.messageId, 42, 'must have the same messageId') | ||
delete updated2.messageId | ||
stream.pipe(concat(function (list) { | ||
getArrayFromStream(stream).then(list => { | ||
delete list[0].messageId | ||
@@ -1197,3 +1216,3 @@ t.notEqual(list[0], updated2, 'must not be the same object') | ||
instance.destroy(t.end.bind(t)) | ||
})) | ||
}) | ||
}) | ||
@@ -1204,3 +1223,3 @@ }) | ||
testInstance('update to publish w/ same messageId', function (t, instance) { | ||
testInstance('update to publish w/ same messageId', (t, instance) => { | ||
const sub = { | ||
@@ -1237,8 +1256,8 @@ clientId: 'abcde', topic: 'hello', qos: 1 | ||
instance.outgoingEnqueue(sub, packet1, function () { | ||
instance.outgoingEnqueue(sub, packet2, function () { | ||
instance.outgoingUpdate(client, packet1, function () { | ||
instance.outgoingUpdate(client, packet2, function () { | ||
instance.outgoingEnqueue(sub, packet1, () => { | ||
instance.outgoingEnqueue(sub, packet2, () => { | ||
instance.outgoingUpdate(client, packet1, () => { | ||
instance.outgoingUpdate(client, packet2, () => { | ||
const stream = instance.outgoingStream(client) | ||
stream.pipe(concat(function (list) { | ||
getArrayFromStream(stream).then(list => { | ||
t.equal(list.length, 2, 'must have two items in queue') | ||
@@ -1250,3 +1269,3 @@ t.equal(list[0].brokerCounter, packet1.brokerCounter, 'brokerCounter must match') | ||
instance.destroy(t.end.bind(t)) | ||
})) | ||
}) | ||
}) | ||
@@ -1258,3 +1277,3 @@ }) | ||
testInstance('update to pubrel', function (t, instance) { | ||
testInstance('update to pubrel', (t, instance) => { | ||
const sub = { | ||
@@ -1278,3 +1297,3 @@ clientId: 'abcde', topic: 'hello', qos: 1 | ||
instance.outgoingEnqueueCombi([sub], packet, function (err) { | ||
instance.outgoingEnqueueCombi([sub], packet, err => { | ||
t.error(err) | ||
@@ -1284,3 +1303,3 @@ const updated = new Packet(packet) | ||
instance.outgoingUpdate(client, updated, function (err, reclient, repacket) { | ||
instance.outgoingUpdate(client, updated, (err, reclient, repacket) => { | ||
t.error(err) | ||
@@ -1295,3 +1314,3 @@ t.equal(reclient, client, 'client matches') | ||
instance.outgoingUpdate(client, pubrel, function (err) { | ||
instance.outgoingUpdate(client, pubrel, err => { | ||
t.error(err) | ||
@@ -1301,6 +1320,6 @@ | ||
stream.pipe(concat(function (list) { | ||
getArrayFromStream(stream).then(list => { | ||
t.deepEqual(list, [pubrel], 'must return the packet') | ||
instance.destroy(t.end.bind(t)) | ||
})) | ||
}) | ||
}) | ||
@@ -1311,3 +1330,3 @@ }) | ||
testInstance('add incoming packet, get it, and clear with messageId', function (t, instance) { | ||
testInstance('add incoming packet, get it, and clear with messageId', (t, instance) => { | ||
const client = { | ||
@@ -1327,3 +1346,3 @@ id: 'abcde' | ||
instance.incomingStorePacket(client, packet, function (err) { | ||
instance.incomingStorePacket(client, packet, err => { | ||
t.error(err) | ||
@@ -1333,3 +1352,3 @@ | ||
messageId: packet.messageId | ||
}, function (err, retrieved) { | ||
}, (err, retrieved) => { | ||
t.error(err) | ||
@@ -1345,3 +1364,3 @@ | ||
instance.incomingDelPacket(client, retrieved, function (err) { | ||
instance.incomingDelPacket(client, retrieved, err => { | ||
t.error(err) | ||
@@ -1351,3 +1370,3 @@ | ||
messageId: packet.messageId | ||
}, function (err, retrieved) { | ||
}, (err, retrieved) => { | ||
t.ok(err, 'must error') | ||
@@ -1361,3 +1380,3 @@ instance.destroy(t.end.bind(t)) | ||
testInstance('store, fetch and delete will message', function (t, instance) { | ||
testInstance('store, fetch and delete will message', (t, instance) => { | ||
const client = { | ||
@@ -1373,6 +1392,6 @@ id: '12345' | ||
instance.putWill(client, expected, function (err, c) { | ||
instance.putWill(client, expected, (err, c) => { | ||
t.error(err, 'no error') | ||
t.equal(c, client, 'client matches') | ||
instance.getWill(client, function (err, packet, c) { | ||
instance.getWill(client, (err, packet, c) => { | ||
t.error(err, 'no error') | ||
@@ -1382,7 +1401,7 @@ t.deepEqual(packet, expected, 'will matches') | ||
client.brokerId = packet.brokerId | ||
instance.delWill(client, function (err, packet, c) { | ||
instance.delWill(client, (err, packet, c) => { | ||
t.error(err, 'no error') | ||
t.deepEqual(packet, expected, 'will matches') | ||
t.equal(c, client, 'client matches') | ||
instance.getWill(client, function (err, packet, c) { | ||
instance.getWill(client, (err, packet, c) => { | ||
t.error(err, 'no error') | ||
@@ -1398,3 +1417,3 @@ t.notOk(packet, 'no will after del') | ||
testInstance('stream all will messages', function (t, instance) { | ||
testInstance('stream all will messages', (t, instance) => { | ||
const client = { | ||
@@ -1410,6 +1429,6 @@ id: '12345' | ||
instance.putWill(client, toWrite, function (err, c) { | ||
instance.putWill(client, toWrite, (err, c) => { | ||
t.error(err, 'no error') | ||
t.equal(c, client, 'client matches') | ||
instance.streamWill().pipe(through.obj(function (chunk, enc, cb) { | ||
streamForEach(instance.streamWill(), (chunk) => { | ||
t.deepEqual(chunk, { | ||
@@ -1423,13 +1442,11 @@ clientId: client.id, | ||
}, 'packet matches') | ||
cb() | ||
client.brokerId = chunk.brokerId | ||
instance.delWill(client, function (err, result, client) { | ||
instance.delWill(client, (err, result, client) => { | ||
t.error(err, 'no error') | ||
instance.destroy(t.end.bind(t)) | ||
}) | ||
})) | ||
}) | ||
}) | ||
}) | ||
testInstance('stream all will message for unknown brokers', function (t, instance) { | ||
testInstance('stream all will message for unknown brokers', (t, instance) => { | ||
const originalId = instance.broker.id | ||
@@ -1455,28 +1472,25 @@ const client = { | ||
instance.putWill(client, toWrite1, function (err, c) { | ||
instance.putWill(client, toWrite1, (err, c) => { | ||
t.error(err, 'no error') | ||
t.equal(c, client, 'client matches') | ||
instance.broker.id = 'anotherBroker' | ||
instance.putWill(anotherClient, toWrite2, function (err, c) { | ||
instance.putWill(anotherClient, toWrite2, (err, c) => { | ||
t.error(err, 'no error') | ||
t.equal(c, anotherClient, 'client matches') | ||
instance.streamWill({ | ||
streamForEach(instance.streamWill({ | ||
anotherBroker: Date.now() | ||
}), (chunk) => { | ||
t.deepEqual(chunk, { | ||
clientId: client.id, | ||
brokerId: originalId, | ||
topic: 'hello/died42', | ||
payload: Buffer.from('muahahha'), | ||
qos: 0, | ||
retain: true | ||
}, 'packet matches') | ||
instance.delWill(client, (err, result, client) => { | ||
t.error(err, 'no error') | ||
instance.destroy(t.end.bind(t)) | ||
}) | ||
}) | ||
.pipe(through.obj(function (chunk, enc, cb) { | ||
t.deepEqual(chunk, { | ||
clientId: client.id, | ||
brokerId: originalId, | ||
topic: 'hello/died42', | ||
payload: Buffer.from('muahahha'), | ||
qos: 0, | ||
retain: true | ||
}, 'packet matches') | ||
cb() | ||
client.brokerId = chunk.brokerId | ||
instance.delWill(client, function (err, result, client) { | ||
t.error(err, 'no error') | ||
instance.destroy(t.end.bind(t)) | ||
}) | ||
})) | ||
}) | ||
@@ -1486,3 +1500,3 @@ }) | ||
testInstance('delete wills from dead brokers', function (t, instance) { | ||
testInstance('delete wills from dead brokers', (t, instance) => { | ||
const client = { | ||
@@ -1499,3 +1513,3 @@ id: '42' | ||
instance.putWill(client, toWrite1, function (err, c) { | ||
instance.putWill(client, toWrite1, (err, c) => { | ||
t.error(err, 'no error') | ||
@@ -1505,3 +1519,3 @@ t.equal(c, client, 'client matches') | ||
client.brokerId = instance.broker.id | ||
instance.delWill(client, function (err, result, client) { | ||
instance.delWill(client, (err, result, client) => { | ||
t.error(err, 'no error') | ||
@@ -1513,3 +1527,3 @@ instance.destroy(t.end.bind(t)) | ||
testInstance('do not error if unkown messageId in outoingClearMessageId', function (t, instance) { | ||
testInstance('do not error if unkown messageId in outoingClearMessageId', (t, instance) => { | ||
const client = { | ||
@@ -1519,3 +1533,3 @@ id: 'abc-123' | ||
instance.outgoingClearMessageId(client, 42, function (err) { | ||
instance.outgoingClearMessageId(client, 42, err => { | ||
t.error(err) | ||
@@ -1522,0 +1536,0 @@ instance.destroy(t.end.bind(t)) |
{ | ||
"name": "aedes-persistence", | ||
"version": "8.1.3", | ||
"version": "9.0.0", | ||
"description": "The spec for an Aedes persistence, with abstract tests and a fast in-memory implementation.", | ||
@@ -60,25 +60,22 @@ "main": "persistence.js", | ||
"engines": { | ||
"node": ">=10" | ||
"node": ">=14" | ||
}, | ||
"devDependencies": { | ||
"aedes": "^0.45.0", | ||
"concat-stream": "^2.0.0", | ||
"@types/node": "^17.0.24", | ||
"aedes": "^0.46.3", | ||
"faucet": "0.0.1", | ||
"license-checker": "^25.0.1", | ||
"mqemitter": "^4.4.0", | ||
"mqemitter": "^4.5.0", | ||
"nyc": "^15.1.0", | ||
"pre-commit": "^1.2.2", | ||
"pump": "^3.0.0", | ||
"release-it": "^14.2.0", | ||
"release-it": "^14.14.2", | ||
"snazzy": "^9.0.0", | ||
"standard": "^15.0.1", | ||
"tape": "^5.2.1", | ||
"through2": "^4.0.2", | ||
"tsd": "^0.14.0" | ||
"standard": "^16.0.4", | ||
"tape": "^5.5.3", | ||
"tsd": "^0.20.0" | ||
}, | ||
"dependencies": { | ||
"aedes-packet": "^2.3.1", | ||
"from2": "^2.3.0", | ||
"qlobber": "^5.0.3" | ||
"qlobber": "^6.0.0" | ||
} | ||
} |
@@ -1,4 +0,2 @@ | ||
'use strict' | ||
const from2 = require('from2') | ||
const { Readable } = require('stream') | ||
const QlobberSub = require('qlobber/aedes/qlobber-sub') | ||
@@ -12,346 +10,315 @@ const { QlobberTrue } = require('qlobber') | ||
} | ||
const CREATE_ON_EMPTY = true | ||
function MemoryPersistence () { | ||
if (!(this instanceof MemoryPersistence)) { | ||
return new MemoryPersistence() | ||
function * multiIterables (iterables) { | ||
for (const iter of iterables) { | ||
yield * iter | ||
} | ||
} | ||
this._retained = [] | ||
// clientId -> topic -> qos | ||
this._subscriptions = new Map() | ||
this._clientsCount = 0 | ||
this._trie = new QlobberSub(QlobberOpts) | ||
this._outgoing = {} | ||
this._incoming = {} | ||
this._wills = {} | ||
function * retainedMessagesByPattern (retained, pattern) { | ||
const qlobber = new QlobberTrue(QlobberOpts) | ||
qlobber.add(pattern) | ||
for (const [topic, packet] of retained) { | ||
if (qlobber.test(topic)) { | ||
yield packet | ||
} | ||
} | ||
} | ||
function matchTopic (p) { | ||
return p.topic !== this.topic | ||
function * willsByBrokers (wills, brokers) { | ||
for (const will of wills.values()) { | ||
if (!brokers[will.brokerId]) { | ||
yield will | ||
} | ||
} | ||
} | ||
MemoryPersistence.prototype.storeRetained = function (packet, cb) { | ||
packet = Object.assign({}, packet) | ||
this._retained = this._retained.filter(matchTopic, packet) | ||
if (packet.payload.length > 0) this._retained.push(packet) | ||
cb(null) | ||
function * clientListbyTopic (subscriptions, topic) { | ||
for (const [clientId, topicMap] of subscriptions) { | ||
if (topicMap.has(topic)) { | ||
yield clientId | ||
} | ||
} | ||
} | ||
function matchingStream (current, pattern) { | ||
const matcher = new QlobberTrue(QlobberOpts) | ||
class MemoryPersistence { | ||
constructor () { | ||
// using Maps for convenience and security (risk on prototype polution) | ||
// Map ( topic -> packet ) | ||
this._retained = new Map() | ||
// Map ( clientId -> Map( topic -> qos )) | ||
this._subscriptions = new Map() | ||
// Map ( clientId > [ packet ] } | ||
this._outgoing = new Map() | ||
// Map ( clientId -> { packetId -> Packet } ) | ||
this._incoming = new Map() | ||
// Map( clientId -> will ) | ||
this._wills = new Map() | ||
this._clientsCount = 0 | ||
this._trie = new QlobberSub(QlobberOpts) | ||
} | ||
if (Array.isArray(pattern)) { | ||
for (var i = 0; i < pattern.length; i += 1) { | ||
matcher.add(pattern[i]) | ||
storeRetained (pkt, cb) { | ||
const packet = Object.assign({}, pkt) | ||
if (packet.payload.length === 0) { | ||
this._retained.delete(packet.topic) | ||
} else { | ||
this._retained.set(packet.topic, packet) | ||
} | ||
} else { | ||
matcher.add(pattern) | ||
cb(null) | ||
} | ||
return from2.obj(function match (size, next) { | ||
var entry | ||
createRetainedStreamCombi (patterns) { | ||
const iterables = patterns.map((p) => { | ||
return retainedMessagesByPattern(this._retained, p) | ||
}) | ||
return Readable.from(multiIterables(iterables)) | ||
} | ||
while ((entry = current.shift()) != null) { | ||
if (matcher.test(entry.topic)) { | ||
setImmediate(next, null, entry) | ||
return | ||
createRetainedStream (pattern) { | ||
return Readable.from(retainedMessagesByPattern(this._retained, pattern)) | ||
} | ||
addSubscriptions (client, subs, cb) { | ||
let stored = this._subscriptions.get(client.id) | ||
const trie = this._trie | ||
if (!stored) { | ||
stored = new Map() | ||
this._subscriptions.set(client.id, stored) | ||
this._clientsCount++ | ||
} | ||
for (const sub of subs) { | ||
const qos = stored.get(sub.topic) | ||
const hasQoSGreaterThanZero = (qos !== undefined) && (qos > 0) | ||
if (sub.qos > 0) { | ||
trie.add(sub.topic, { | ||
clientId: client.id, | ||
topic: sub.topic, | ||
qos: sub.qos | ||
}) | ||
} else if (hasQoSGreaterThanZero) { | ||
trie.remove(sub.topic, { | ||
clientId: client.id, | ||
topic: sub.topic | ||
}) | ||
} | ||
stored.set(sub.topic, sub.qos) | ||
} | ||
if (!entry) this.push(null) | ||
}) | ||
} | ||
cb(null, client) | ||
} | ||
MemoryPersistence.prototype.createRetainedStream = function (pattern) { | ||
return matchingStream([].concat(this._retained), pattern) | ||
} | ||
removeSubscriptions (client, subs, cb) { | ||
const stored = this._subscriptions.get(client.id) | ||
const trie = this._trie | ||
MemoryPersistence.prototype.createRetainedStreamCombi = function (patterns) { | ||
return matchingStream([].concat(this._retained), patterns) | ||
} | ||
if (stored) { | ||
for (const topic of subs) { | ||
const qos = stored.get(topic) | ||
if (qos !== undefined) { | ||
if (qos > 0) { | ||
trie.remove(topic, { clientId: client.id, topic }) | ||
} | ||
stored.delete(topic) | ||
} | ||
} | ||
MemoryPersistence.prototype.addSubscriptions = function (client, subs, cb) { | ||
var stored = this._subscriptions.get(client.id) | ||
const trie = this._trie | ||
if (stored.size === 0) { | ||
this._clientsCount-- | ||
this._subscriptions.delete(client.id) | ||
} | ||
} | ||
if (!stored) { | ||
stored = new Map() | ||
this._subscriptions.set(client.id, stored) | ||
this._clientsCount++ | ||
cb(null, client) | ||
} | ||
for (var i = 0; i < subs.length; i += 1) { | ||
const sub = subs[i] | ||
const qos = stored.get(sub.topic) | ||
const hasQoSGreaterThanZero = (qos !== undefined) && (qos > 0) | ||
if (sub.qos > 0) { | ||
trie.add(sub.topic, { | ||
clientId: client.id, | ||
topic: sub.topic, | ||
qos: sub.qos | ||
}) | ||
} else if (hasQoSGreaterThanZero) { | ||
trie.remove(sub.topic, { | ||
clientId: client.id, | ||
topic: sub.topic | ||
}) | ||
subscriptionsByClient (client, cb) { | ||
let subs = null | ||
const stored = this._subscriptions.get(client.id) | ||
if (stored) { | ||
subs = [] | ||
for (const topicAndQos of stored) { | ||
subs.push({ topic: topicAndQos[0], qos: topicAndQos[1] }) | ||
} | ||
} | ||
stored.set(sub.topic, sub.qos) | ||
cb(null, subs, client) | ||
} | ||
cb(null, client) | ||
} | ||
countOffline (cb) { | ||
return cb(null, this._trie.subscriptionsCount, this._clientsCount) | ||
} | ||
MemoryPersistence.prototype.removeSubscriptions = function (client, subs, cb) { | ||
const stored = this._subscriptions.get(client.id) | ||
const trie = this._trie | ||
subscriptionsByTopic (pattern, cb) { | ||
cb(null, this._trie.match(pattern)) | ||
} | ||
if (stored) { | ||
for (var i = 0; i < subs.length; i += 1) { | ||
const topic = subs[i] | ||
const qos = stored.get(topic) | ||
if (qos !== undefined) { | ||
if (qos > 0) { | ||
cleanSubscriptions (client, cb) { | ||
const trie = this._trie | ||
const stored = this._subscriptions.get(client.id) | ||
if (stored) { | ||
for (const topicAndQos of stored) { | ||
if (topicAndQos[1] > 0) { | ||
const topic = topicAndQos[0] | ||
trie.remove(topic, { clientId: client.id, topic }) | ||
} | ||
stored.delete(topic) | ||
} | ||
} | ||
if (stored.size === 0) { | ||
this._clientsCount-- | ||
this._subscriptions.delete(client.id) | ||
} | ||
cb(null, client) | ||
} | ||
cb(null, client) | ||
} | ||
outgoingEnqueue (sub, packet, cb) { | ||
_outgoingEnqueue.call(this, sub, packet) | ||
process.nextTick(cb) | ||
} | ||
MemoryPersistence.prototype.subscriptionsByClient = function (client, cb) { | ||
var subs = null | ||
const stored = this._subscriptions.get(client.id) | ||
if (stored) { | ||
subs = [] | ||
for (const topicAndQos of stored) { | ||
subs.push({ topic: topicAndQos[0], qos: topicAndQos[1] }) | ||
outgoingEnqueueCombi (subs, packet, cb) { | ||
for (let i = 0; i < subs.length; i++) { | ||
_outgoingEnqueue.call(this, subs[i], packet) | ||
} | ||
process.nextTick(cb) | ||
} | ||
cb(null, subs, client) | ||
} | ||
MemoryPersistence.prototype.countOffline = function (cb) { | ||
return cb(null, this._trie.subscriptionsCount, this._clientsCount) | ||
} | ||
outgoingUpdate (client, packet, cb) { | ||
const outgoing = getMapRef(this._outgoing, client.id, [], CREATE_ON_EMPTY) | ||
MemoryPersistence.prototype.subscriptionsByTopic = function (pattern, cb) { | ||
cb(null, this._trie.match(pattern)) | ||
} | ||
MemoryPersistence.prototype.cleanSubscriptions = function (client, cb) { | ||
const trie = this._trie | ||
const stored = this._subscriptions.get(client.id) | ||
if (stored) { | ||
for (const topicAndQos of stored) { | ||
if (topicAndQos[1] > 0) { | ||
const topic = topicAndQos[0] | ||
trie.remove(topic, { clientId: client.id, topic }) | ||
let temp | ||
for (let i = 0; i < outgoing.length; i++) { | ||
temp = outgoing[i] | ||
if (temp.brokerId === packet.brokerId) { | ||
if (temp.brokerCounter === packet.brokerCounter) { | ||
temp.messageId = packet.messageId | ||
return cb(null, client, packet) | ||
} | ||
/* | ||
Maximum of messageId (packet identifier) is 65535 and will be rotated, | ||
brokerCounter is to ensure the packet identifier be unique. | ||
The for loop is going to search which packet messageId should be updated | ||
in the _outgoing queue. | ||
If there is a case that brokerCounter is different but messageId is same, | ||
we need to let the loop keep searching | ||
*/ | ||
} else if (temp.messageId === packet.messageId) { | ||
outgoing[i] = packet | ||
return cb(null, client, packet) | ||
} | ||
} | ||
this._clientsCount-- | ||
this._subscriptions.delete(client.id) | ||
cb(new Error('no such packet'), client, packet) | ||
} | ||
cb(null, client) | ||
} | ||
outgoingClearMessageId (client, packet, cb) { | ||
const outgoing = getMapRef(this._outgoing, client.id, [], CREATE_ON_EMPTY) | ||
MemoryPersistence.prototype.outgoingEnqueue = function (sub, packet, cb) { | ||
_outgoingEnqueue.call(this, sub, packet) | ||
process.nextTick(cb) | ||
} | ||
let temp | ||
for (let i = 0; i < outgoing.length; i++) { | ||
temp = outgoing[i] | ||
if (temp.messageId === packet.messageId) { | ||
outgoing.splice(i, 1) | ||
return cb(null, temp) | ||
} | ||
} | ||
MemoryPersistence.prototype.outgoingEnqueueCombi = function (subs, packet, cb) { | ||
for (var i = 0; i < subs.length; i++) { | ||
_outgoingEnqueue.call(this, subs[i], packet) | ||
cb() | ||
} | ||
process.nextTick(cb) | ||
} | ||
function _outgoingEnqueue (sub, packet) { | ||
const id = sub.clientId | ||
const queue = this._outgoing[id] || [] | ||
outgoingStream (client) { | ||
return Readable.from(getMapRef(this._outgoing, client.id, [])) | ||
} | ||
this._outgoing[id] = queue | ||
const p = new Packet(packet) | ||
queue[queue.length] = p | ||
} | ||
incomingStorePacket (client, packet, cb) { | ||
const id = client.id | ||
const store = getMapRef(this._incoming, id, {}, CREATE_ON_EMPTY) | ||
MemoryPersistence.prototype.outgoingUpdate = function (client, packet, cb) { | ||
const clientId = client.id | ||
const outgoing = this._outgoing[clientId] || [] | ||
var temp | ||
store[packet.messageId] = new Packet(packet) | ||
store[packet.messageId].messageId = packet.messageId | ||
this._outgoing[clientId] = outgoing | ||
for (var i = 0; i < outgoing.length; i++) { | ||
temp = outgoing[i] | ||
if (temp.brokerId === packet.brokerId) { | ||
if (temp.brokerCounter === packet.brokerCounter) { | ||
temp.messageId = packet.messageId | ||
return cb(null, client, packet) | ||
} | ||
/* | ||
Maximum of messageId (packet identifier) is 65535 and will be rotated, | ||
brokerCounter is to ensure the packet identifier be unique. | ||
The for loop is going to search which packet messageId should be updated | ||
in the _outgoing queue. | ||
If there is a case that brokerCounter is different but messageId is same, | ||
we need to let the loop keep searching | ||
*/ | ||
} else if (temp.messageId === packet.messageId) { | ||
outgoing[i] = packet | ||
return cb(null, client, packet) | ||
} | ||
cb(null) | ||
} | ||
cb(new Error('no such packet'), client, packet) | ||
} | ||
incomingGetPacket (client, packet, cb) { | ||
const id = client.id | ||
const store = getMapRef(this._incoming, id, {}) | ||
let err = null | ||
MemoryPersistence.prototype.outgoingClearMessageId = function (client, packet, cb) { | ||
const clientId = client.id | ||
const outgoing = this._outgoing[clientId] || [] | ||
var temp | ||
this._incoming.set(id, store) | ||
this._outgoing[clientId] = outgoing | ||
if (!store[packet.messageId]) { | ||
err = new Error('no such packet') | ||
} | ||
for (var i = 0; i < outgoing.length; i++) { | ||
temp = outgoing[i] | ||
if (temp.messageId === packet.messageId) { | ||
outgoing.splice(i, 1) | ||
return cb(null, temp) | ||
} | ||
cb(err, store[packet.messageId]) | ||
} | ||
cb() | ||
} | ||
incomingDelPacket (client, packet, cb) { | ||
const id = client.id | ||
const store = getMapRef(this._incoming, id, {}) | ||
const toDelete = store[packet.messageId] | ||
let err = null | ||
MemoryPersistence.prototype.outgoingStream = function (client) { | ||
const queue = [].concat(this._outgoing[client.id] || []) | ||
return from2.obj(function match (size, next) { | ||
var entry | ||
if ((entry = queue.shift()) != null) { | ||
setImmediate(next, null, entry) | ||
return | ||
if (!toDelete) { | ||
err = new Error('no such packet') | ||
} else { | ||
delete store[packet.messageId] | ||
} | ||
if (!entry) this.push(null) | ||
}) | ||
} | ||
cb(err) | ||
} | ||
MemoryPersistence.prototype.incomingStorePacket = function (client, packet, cb) { | ||
const id = client.id | ||
const store = this._incoming[id] || {} | ||
putWill (client, packet, cb) { | ||
packet.brokerId = this.broker.id | ||
packet.clientId = client.id | ||
this._wills.set(client.id, packet) | ||
cb(null, client) | ||
} | ||
this._incoming[id] = store | ||
getWill (client, cb) { | ||
cb(null, this._wills.get(client.id), client) | ||
} | ||
store[packet.messageId] = new Packet(packet) | ||
store[packet.messageId].messageId = packet.messageId | ||
delWill (client, cb) { | ||
const will = this._wills.get(client.id) | ||
this._wills.delete(client.id) | ||
cb(null, will, client) | ||
} | ||
cb(null) | ||
} | ||
MemoryPersistence.prototype.incomingGetPacket = function (client, packet, cb) { | ||
const id = client.id | ||
const store = this._incoming[id] || {} | ||
var err = null | ||
this._incoming[id] = store | ||
if (!store[packet.messageId]) { | ||
err = new Error('no such packet') | ||
streamWill (brokers = {}) { | ||
return Readable.from(willsByBrokers(this._wills, brokers)) | ||
} | ||
cb(err, store[packet.messageId]) | ||
} | ||
MemoryPersistence.prototype.incomingDelPacket = function (client, packet, cb) { | ||
const id = client.id | ||
const store = this._incoming[id] || {} | ||
const toDelete = store[packet.messageId] | ||
var err = null | ||
if (!toDelete) { | ||
err = new Error('no such packet') | ||
} else { | ||
delete store[packet.messageId] | ||
getClientList (topic) { | ||
return Readable.from(clientListbyTopic(this._subscriptions, topic)) | ||
} | ||
cb(err) | ||
} | ||
MemoryPersistence.prototype.putWill = function (client, packet, cb) { | ||
packet.brokerId = this.broker.id | ||
packet.clientId = client.id | ||
this._wills[client.id] = packet | ||
cb(null, client) | ||
} | ||
MemoryPersistence.prototype.getWill = function (client, cb) { | ||
cb(null, this._wills[client.id], client) | ||
} | ||
MemoryPersistence.prototype.delWill = function (client, cb) { | ||
const will = this._wills[client.id] | ||
delete this._wills[client.id] | ||
cb(null, will, client) | ||
} | ||
MemoryPersistence.prototype.streamWill = function (brokers) { | ||
const clients = Object.keys(this._wills) | ||
const wills = this._wills | ||
brokers = brokers || {} | ||
return from2.obj(function match (size, next) { | ||
var entry | ||
while ((entry = clients.shift()) != null) { | ||
if (!brokers[wills[entry].brokerId]) { | ||
setImmediate(next, null, wills[entry]) | ||
return | ||
} | ||
destroy (cb) { | ||
this._retained = null | ||
if (cb) { | ||
cb(null) | ||
} | ||
if (!entry) { | ||
this.push(null) | ||
} | ||
}) | ||
} | ||
} | ||
MemoryPersistence.prototype.getClientList = function (topic) { | ||
const clientSubs = this._subscriptions | ||
const entries = clientSubs.entries(clientSubs) | ||
return from2.obj(function match (size, next) { | ||
var entry | ||
while (!(entry = entries.next()).done) { | ||
if (entry.value[1].has(topic)) { | ||
setImmediate(next, null, entry.value[0]) | ||
return | ||
} | ||
} | ||
next(null, null) | ||
}) | ||
function _outgoingEnqueue (sub, packet) { | ||
const id = sub.clientId | ||
const queue = getMapRef(this._outgoing, id, [], CREATE_ON_EMPTY) | ||
queue[queue.length] = new Packet(packet) | ||
} | ||
MemoryPersistence.prototype.destroy = function (cb) { | ||
this._retained = null | ||
if (cb) { | ||
cb(null) | ||
function getMapRef (map, key, ifEmpty, createOnEmpty = false) { | ||
const value = map.get(key) | ||
if (value === undefined && createOnEmpty) { | ||
map.set(key, ifEmpty) | ||
} | ||
return value || ifEmpty | ||
} | ||
module.exports = MemoryPersistence | ||
module.exports = () => { return new MemoryPersistence() } | ||
module.exports.Packet = Packet |
@@ -268,5 +268,5 @@ # aedes-persistence | ||
```js | ||
var test = require('tape').test | ||
var myperst = require('./') | ||
var abs = require('aedes-persistence/abstract') | ||
const test = require('tape').test | ||
const myperst = require('./') | ||
const abs = require('aedes-persistence/abstract') | ||
@@ -283,6 +283,6 @@ abs({ | ||
```js | ||
var test = require('tape').test | ||
var myperst = require('./') | ||
var abs = require('aedes-persistence/abstract') | ||
var clean = require('./clean') // invented module | ||
const test = require('tape').test | ||
const myperst = require('./') | ||
const abs = require('aedes-persistence/abstract') | ||
const clean = require('./clean') // invented module | ||
@@ -289,0 +289,0 @@ abs({ |
@@ -1,11 +0,23 @@ | ||
import type { Brokers, Client, Subscription } from 'aedes'; | ||
import type { AedesPacket } from 'aedes-packet'; | ||
import type { QoS } from 'mqtt-packet'; | ||
import type { Readable } from 'stream'; | ||
import type { Brokers, Client, Subscription } from "aedes"; | ||
import type { AedesPacket } from "aedes-packet"; | ||
import type { Readable } from "stream"; | ||
export type { AedesPacket as Packet } from 'aedes-packet'; | ||
export type { AedesPacket as Packet } from "aedes-packet"; | ||
type ClientId = Subscription["clientId"]; | ||
type MessageId = AedesPacket["messageId"]; | ||
type Topic = Subscription["topic"]; | ||
type TopicPattern = Subscription["topic"]; | ||
type QoS = Subscription["qos"]; | ||
type Incoming = { | ||
// tsc accepts: | ||
// [messageId: MessageId]: AedesPacket; | ||
// Workaround for tsd 0.20: | ||
[messageId: number]: AedesPacket; | ||
}; | ||
export interface AedesPersistenceSubscription { | ||
clientId: string; | ||
topic: string; | ||
clientId: ClientId; | ||
topic: Topic; | ||
qos?: QoS; | ||
@@ -18,15 +30,11 @@ } | ||
interface Incoming { | ||
[clientId: string]: { [messageId: string]: AedesPacket }; | ||
} | ||
export interface AedesPersistence { | ||
storeRetained: ( | ||
packet: AedesPacket, | ||
cb: (error: CallbackError) => void | ||
cb: (error: CallbackError) => void, | ||
) => void; | ||
createRetainedStream: (pattern: string) => Readable; | ||
createRetainedStream: (pattern: TopicPattern) => Readable; | ||
createRetainedStreamCombi: (patterns: string[]) => Readable; | ||
createRetainedStreamCombi: (patterns: TopicPattern[]) => Readable; | ||
@@ -36,3 +44,3 @@ addSubscriptions: ( | ||
subs: Subscription[], | ||
cb: (error: CallbackError, client: Client) => void | ||
cb: (error: CallbackError, client: Client) => void, | ||
) => void; | ||
@@ -43,3 +51,3 @@ | ||
subs: Subscription[], | ||
cb: (error: CallbackError, client: Client) => void | ||
cb: (error: CallbackError, client: Client) => void, | ||
) => void; | ||
@@ -51,5 +59,5 @@ | ||
error: CallbackError, | ||
subs: { topic: string; qos: QoS }[], | ||
client: Client | ||
) => void | ||
subs: { topic: Topic; qos: QoS }[], | ||
client: Client, | ||
) => void, | ||
) => void; | ||
@@ -61,9 +69,9 @@ | ||
subscriptionsCount: number, | ||
clientsCount: number | ||
) => void | ||
clientsCount: number, | ||
) => void, | ||
) => void; | ||
subscriptionsByTopic: ( | ||
pattern: string, | ||
cb: (error: CallbackError, subs: AedesPersistenceSubscription[]) => void | ||
pattern: TopicPattern, | ||
cb: (error: CallbackError, subs: AedesPersistenceSubscription[]) => void, | ||
) => void; | ||
@@ -73,15 +81,15 @@ | ||
client: Client, | ||
cb: (error: CallbackError, client: Client) => void | ||
cb: (error: CallbackError, client: Client) => void, | ||
) => void; | ||
outgoingEnqueue: ( | ||
sub: { clientId: string }, | ||
sub: { clientId: ClientId }, | ||
packet: AedesPacket, | ||
cb: (error: CallbackError) => void | ||
cb: (error: CallbackError) => void, | ||
) => void; | ||
outgoingEnqueueCombi: ( | ||
subs: { clientId: string }[], | ||
subs: { clientId: ClientId }[], | ||
packet: AedesPacket, | ||
cb: (error: CallbackError) => void | ||
cb: (error: CallbackError) => void, | ||
) => void; | ||
@@ -92,3 +100,3 @@ | ||
packet: AedesPacket, | ||
cb: (error: CallbackError, client: Client, packet: AedesPacket) => void | ||
cb: (error: CallbackError, client: Client, packet: AedesPacket) => void, | ||
) => void; | ||
@@ -99,3 +107,3 @@ | ||
packet: AedesPacket, | ||
cb: (error?: CallbackError, packet?: AedesPacket) => void | ||
cb: (error?: CallbackError, packet?: AedesPacket) => void, | ||
) => void; | ||
@@ -108,3 +116,3 @@ | ||
packet: AedesPacket, | ||
cb: (error: CallbackError) => void | ||
cb: (error: CallbackError) => void, | ||
) => void; | ||
@@ -115,3 +123,3 @@ | ||
packet: AedesPacket, | ||
cb: (error: CallbackError, packet: AedesPacket) => void | ||
cb: (error: CallbackError, packet: AedesPacket) => void, | ||
) => void; | ||
@@ -122,3 +130,3 @@ | ||
packet: AedesPacket, | ||
cb: (error: CallbackError) => void | ||
cb: (error: CallbackError) => void, | ||
) => void; | ||
@@ -129,3 +137,3 @@ | ||
packet: AedesPacket, | ||
cb: (error: CallbackError, client: Client) => void | ||
cb: (error: CallbackError, client: Client) => void, | ||
) => void; | ||
@@ -135,3 +143,3 @@ | ||
client: Client, | ||
cb: (error: CallbackError, will: WillPacket, client: Client) => void | ||
cb: (error: CallbackError, will: WillPacket, client: Client) => void, | ||
) => void; | ||
@@ -141,3 +149,3 @@ | ||
client: Client, | ||
cb: (error: CallbackError, will: WillPacket, client: Client) => void | ||
cb: (error: CallbackError, will: WillPacket, client: Client) => void, | ||
) => void; | ||
@@ -153,8 +161,8 @@ | ||
export class AedesMemoryPersistence implements AedesPersistence { | ||
_retained: AedesPacket[]; | ||
_retained: Map<Topic, AedesPacket>; | ||
_subscriptions: Map< | ||
AedesPersistenceSubscription['clientId'], | ||
ClientId, | ||
Map< | ||
AedesPersistenceSubscription['topic'], | ||
AedesPersistenceSubscription['qos'] | ||
Topic, | ||
QoS | ||
> | ||
@@ -164,5 +172,5 @@ >; | ||
_trie: any; | ||
_outgoing: Record<string, AedesPacket[]>; | ||
_incoming: Incoming; | ||
_wills: Record<string, WillPacket>; | ||
_outgoing: Map<ClientId, AedesPacket[]>; | ||
_incoming: Map<ClientId, Incoming>; | ||
_wills: Map<ClientId, WillPacket>; | ||
@@ -173,8 +181,8 @@ constructor(); | ||
packet: AedesPacket, | ||
cb: (error: CallbackError) => void | ||
cb: (error: CallbackError) => void, | ||
) => void; | ||
createRetainedStream: (pattern: string) => Readable; | ||
createRetainedStream: (pattern: TopicPattern) => Readable; | ||
createRetainedStreamCombi: (patterns: string[]) => Readable; | ||
createRetainedStreamCombi: (patterns: TopicPattern[]) => Readable; | ||
@@ -184,3 +192,3 @@ addSubscriptions: ( | ||
subs: Subscription[], | ||
cb: (error: CallbackError, client: Client) => void | ||
cb: (error: CallbackError, client: Client) => void, | ||
) => void; | ||
@@ -191,3 +199,3 @@ | ||
subs: Subscription[], | ||
cb: (error: CallbackError, client: Client) => void | ||
cb: (error: CallbackError, client: Client) => void, | ||
) => void; | ||
@@ -200,4 +208,4 @@ | ||
subs: { topic: string; qos: QoS }[], | ||
client: Client | ||
) => void | ||
client: Client, | ||
) => void, | ||
) => void; | ||
@@ -209,4 +217,4 @@ | ||
subscriptionsCount: number, | ||
clientsCount: number | ||
) => void | ||
clientsCount: number, | ||
) => void, | ||
) => void; | ||
@@ -216,3 +224,3 @@ | ||
pattern: string, | ||
cb: (error: CallbackError, subs: AedesPersistenceSubscription[]) => void | ||
cb: (error: CallbackError, subs: AedesPersistenceSubscription[]) => void, | ||
) => void; | ||
@@ -222,15 +230,15 @@ | ||
client: Client, | ||
cb: (error: CallbackError, client: Client) => void | ||
cb: (error: CallbackError, client: Client) => void, | ||
) => void; | ||
outgoingEnqueue: ( | ||
sub: { clientId: string }, | ||
sub: { clientId: ClientId }, | ||
packet: AedesPacket, | ||
cb: (error: CallbackError) => void | ||
cb: (error: CallbackError) => void, | ||
) => void; | ||
outgoingEnqueueCombi: ( | ||
sub: { clientId: string }[], | ||
sub: { clientId: ClientId }[], | ||
packet: AedesPacket, | ||
cb: (error: CallbackError) => void | ||
cb: (error: CallbackError) => void, | ||
) => void; | ||
@@ -241,3 +249,3 @@ | ||
packet: AedesPacket, | ||
cb: (error: CallbackError, client: Client, packet: AedesPacket) => void | ||
cb: (error: CallbackError, client: Client, packet: AedesPacket) => void, | ||
) => void; | ||
@@ -248,3 +256,3 @@ | ||
packet: AedesPacket, | ||
cb: (error?: CallbackError, packet?: AedesPacket) => void | ||
cb: (error?: CallbackError, packet?: AedesPacket) => void, | ||
) => void; | ||
@@ -257,3 +265,3 @@ | ||
packet: AedesPacket, | ||
cb: (error: CallbackError) => void | ||
cb: (error: CallbackError) => void, | ||
) => void; | ||
@@ -264,3 +272,3 @@ | ||
packet: AedesPacket, | ||
cb: (error: CallbackError, packet: AedesPacket) => void | ||
cb: (error: CallbackError, packet: AedesPacket) => void, | ||
) => void; | ||
@@ -271,3 +279,3 @@ | ||
packet: AedesPacket, | ||
cb: (error: CallbackError) => void | ||
cb: (error: CallbackError) => void, | ||
) => void; | ||
@@ -278,3 +286,3 @@ | ||
packet: AedesPacket, | ||
cb: (error: CallbackError, client: Client) => void | ||
cb: (error: CallbackError, client: Client) => void, | ||
) => void; | ||
@@ -284,3 +292,3 @@ | ||
client: Client, | ||
cb: (error: CallbackError, will: WillPacket, client: Client) => void | ||
cb: (error: CallbackError, will: WillPacket, client: Client) => void, | ||
) => void; | ||
@@ -290,3 +298,3 @@ | ||
client: Client, | ||
cb: (error: CallbackError, will: WillPacket, client: Client) => void | ||
cb: (error: CallbackError, will: WillPacket, client: Client) => void, | ||
) => void; | ||
@@ -293,0 +301,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
2
12
74544
1945
+ Addedqlobber@6.0.0(transitive)
- Removedfrom2@^2.3.0
- Removedcore-util-is@1.0.3(transitive)
- Removedfrom2@2.3.0(transitive)
- Removedisarray@1.0.0(transitive)
- Removedqlobber@5.0.3(transitive)
- Removedreadable-stream@2.3.8(transitive)
- Removedsafe-buffer@5.1.2(transitive)
- Removedstring_decoder@1.1.1(transitive)
Updatedqlobber@^6.0.0