Socket
Socket
Sign inDemoInstall

aedes-persistence

Package Overview
Dependencies
Maintainers
4
Versions
44
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

aedes-persistence - npm Package Compare versions

Comparing version 9.0.0 to 9.0.1

62

abstract.js

@@ -1151,2 +1151,64 @@ const { Readable } = require('stream')

testInstance('add many outgoing packets and clear messageIds', async (t, instance) => {
const sub = {
clientId: 'abcde', topic: 'hello', qos: 1
}
const client = {
id: sub.clientId
}
const packet = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 1,
dup: false,
length: 14,
retain: false,
brokerId: instance.broker.id,
brokerCounter: 42
}
function outStream (instance, client) {
return iterableStream(instance.outgoingStream(client))
}
// we just need a stream to figure out the high watermark
const stream = outStream(instance, client)
const total = stream.readableHighWaterMark * 2
async function submitMessage (id) {
return new Promise((resolve, reject) => {
enqueueAndUpdate(t, instance, client, sub, packet, id, resolve)
})
}
for (let i = 0; i < total; i++) {
await submitMessage(i)
}
let queued = 0
for await (const p of outStream(instance, client)) {
if (p) {
queued++
}
}
t.equal(queued, total, `outgoing queue must hold ${total} items`)
for await (const p of outStream(instance, client)) {
instance.outgoingClearMessageId(client, p, (err, received) => {
t.error(err)
t.deepEqual(received, p, 'must return the packet')
})
}
let queued2 = 0
for await (const p of outStream(instance, client)) {
if (p) {
queued2++
}
}
t.equal(queued2, 0, 'outgoing queue is empty')
instance.destroy(t.end.bind(t))
})
testInstance('update to publish w/ same messageId', (t, instance) => {

@@ -1153,0 +1215,0 @@ const sub = {

2

package.json
{
"name": "aedes-persistence",
"version": "9.0.0",
"version": "9.0.1",
"description": "The spec for an Aedes persistence, with abstract tests and a fast in-memory implementation.",

@@ -5,0 +5,0 @@ "main": "persistence.js",

@@ -233,3 +233,5 @@ const { Readable } = require('stream')

outgoingStream (client) {
return Readable.from(getMapRef(this._outgoing, client.id, []))
// shallow clone the outgoing queue for this client to avoid race conditions
const outgoing = [].concat(getMapRef(this._outgoing, client.id, []))
return Readable.from(outgoing)
}

@@ -236,0 +238,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc