aedes-persistence
Advanced tools
Comparing version 9.0.0 to 9.0.1
@@ -1151,2 +1151,64 @@ const { Readable } = require('stream') | ||
testInstance('add many outgoing packets and clear messageIds', async (t, instance) => { | ||
const sub = { | ||
clientId: 'abcde', topic: 'hello', qos: 1 | ||
} | ||
const client = { | ||
id: sub.clientId | ||
} | ||
const packet = { | ||
cmd: 'publish', | ||
topic: 'hello', | ||
payload: Buffer.from('world'), | ||
qos: 1, | ||
dup: false, | ||
length: 14, | ||
retain: false, | ||
brokerId: instance.broker.id, | ||
brokerCounter: 42 | ||
} | ||
function outStream (instance, client) { | ||
return iterableStream(instance.outgoingStream(client)) | ||
} | ||
// we just need a stream to figure out the high watermark | ||
const stream = outStream(instance, client) | ||
const total = stream.readableHighWaterMark * 2 | ||
async function submitMessage (id) { | ||
return new Promise((resolve, reject) => { | ||
enqueueAndUpdate(t, instance, client, sub, packet, id, resolve) | ||
}) | ||
} | ||
for (let i = 0; i < total; i++) { | ||
await submitMessage(i) | ||
} | ||
let queued = 0 | ||
for await (const p of outStream(instance, client)) { | ||
if (p) { | ||
queued++ | ||
} | ||
} | ||
t.equal(queued, total, `outgoing queue must hold ${total} items`) | ||
for await (const p of outStream(instance, client)) { | ||
instance.outgoingClearMessageId(client, p, (err, received) => { | ||
t.error(err) | ||
t.deepEqual(received, p, 'must return the packet') | ||
}) | ||
} | ||
let queued2 = 0 | ||
for await (const p of outStream(instance, client)) { | ||
if (p) { | ||
queued2++ | ||
} | ||
} | ||
t.equal(queued2, 0, 'outgoing queue is empty') | ||
instance.destroy(t.end.bind(t)) | ||
}) | ||
testInstance('update to publish w/ same messageId', (t, instance) => { | ||
@@ -1153,0 +1215,0 @@ const sub = { |
{ | ||
"name": "aedes-persistence", | ||
"version": "9.0.0", | ||
"version": "9.0.1", | ||
"description": "The spec for an Aedes persistence, with abstract tests and a fast in-memory implementation.", | ||
@@ -5,0 +5,0 @@ "main": "persistence.js", |
@@ -233,3 +233,5 @@ const { Readable } = require('stream') | ||
outgoingStream (client) { | ||
return Readable.from(getMapRef(this._outgoing, client.id, [])) | ||
// shallow clone the outgoing queue for this client to avoid race conditions | ||
const outgoing = [].concat(getMapRef(this._outgoing, client.id, [])) | ||
return Readable.from(outgoing) | ||
} | ||
@@ -236,0 +238,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
76274
2001