aedes
Advanced tools
Comparing version 0.46.3 to 0.47.0
@@ -8,5 +8,4 @@ 'use strict' | ||
const { v4: uuidv4 } = require('uuid') | ||
const bulk = require('bulk-write-stream') | ||
const reusify = require('reusify') | ||
const { pipeline } = require('readable-stream') | ||
const { pipeline } = require('stream') | ||
const Packet = require('aedes-packet') | ||
@@ -16,3 +15,3 @@ const memory = require('aedes-persistence') | ||
const Client = require('./lib/client') | ||
const { $SYS_PREFIX } = require('./lib/utils') | ||
const { $SYS_PREFIX, bulk } = require('./lib/utils') | ||
@@ -107,3 +106,3 @@ module.exports = Aedes.Server = Aedes | ||
that.persistence.streamWill(that.brokers), | ||
bulk.obj(receiveWills), | ||
bulk(receiveWills), | ||
function done (err) { | ||
@@ -177,2 +176,3 @@ if (err) { | ||
function emitPacket (packet, done) { | ||
if (this.client) packet.clientId = this.client.id | ||
this.broker.mq.emit(packet, done) | ||
@@ -179,0 +179,0 @@ } |
@@ -13,3 +13,3 @@ 'use strict' | ||
const handle = require('./handlers') | ||
const { pipeline } = require('readable-stream') | ||
const { pipeline } = require('stream') | ||
const { through } = require('./utils') | ||
@@ -93,5 +93,16 @@ | ||
this.deliver0 = function deliverQoS0 (_packet, cb) { | ||
const getToForwardPacket = (_packet) => { | ||
// Mqttv5 3.8.3.1: https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html#_Toc3901169 | ||
// prevent to forward messages sent by the same client when no-local flag is set | ||
if (_packet.clientId === that.id && _packet.nl) return | ||
const toForward = dedupe(that, _packet) && | ||
that.broker.authorizeForward(that, _packet) | ||
return toForward | ||
} | ||
this.deliver0 = function deliverQoS0 (_packet, cb) { | ||
const toForward = getToForwardPacket(_packet) | ||
if (toForward) { | ||
@@ -119,4 +130,4 @@ // Give nodejs some time to clear stacks, or we will see | ||
} | ||
const toForward = dedupe(that, _packet) && | ||
that.broker.authorizeForward(that, _packet) | ||
const toForward = getToForwardPacket(_packet) | ||
if (toForward) { | ||
@@ -123,0 +134,0 @@ setImmediate(() => { |
'use strict' | ||
const retimer = require('retimer') | ||
const { pipeline } = require('readable-stream') | ||
const { pipeline } = require('stream') | ||
const write = require('../write') | ||
@@ -6,0 +6,0 @@ const QoSPacket = require('../qos-packet') |
@@ -145,9 +145,8 @@ 'use strict' | ||
if (!rap) { | ||
const deliverFunc = func | ||
func = function handlePacketSubscription (_packet, cb) { | ||
_packet = new Packet(_packet, broker) | ||
_packet.retain = false | ||
deliverFunc(_packet, cb) | ||
} | ||
const deliverFunc = func | ||
func = function handlePacketSubscription (_packet, cb) { | ||
_packet = new Packet(_packet, broker) | ||
_packet.nl = nl | ||
if (!rap) _packet.retain = false | ||
deliverFunc(_packet, cb) | ||
} | ||
@@ -154,0 +153,0 @@ |
'use strict' | ||
const { Transform } = require('readable-stream') | ||
const { Transform, Writable } = require('stream') | ||
@@ -40,6 +40,16 @@ function validateTopic (topic, message) { | ||
function bulk (fn) { | ||
return new Writable({ | ||
objectMode: true, | ||
writev: function (chunks, cb) { | ||
fn(chunks.map(chunk => chunk.chunk), cb) | ||
} | ||
}) | ||
} | ||
module.exports = { | ||
validateTopic, | ||
through, | ||
bulk, | ||
$SYS_PREFIX: '$SYS/' | ||
} |
{ | ||
"name": "aedes", | ||
"version": "0.46.3", | ||
"version": "0.47.0", | ||
"description": "Stream-based MQTT broker", | ||
@@ -97,29 +97,28 @@ "main": "aedes.js", | ||
"engines": { | ||
"node": ">=12" | ||
"node": ">=14" | ||
}, | ||
"devDependencies": { | ||
"@sinonjs/fake-timers": "^9.1.0", | ||
"@types/node": "^17.0.15", | ||
"@typescript-eslint/eslint-plugin": "^5.10.2", | ||
"@typescript-eslint/parser": "^5.10.2", | ||
"@sinonjs/fake-timers": "^9.1.2", | ||
"@types/node": "^17.0.24", | ||
"@typescript-eslint/eslint-plugin": "^5.19.0", | ||
"@typescript-eslint/parser": "^5.19.0", | ||
"concat-stream": "^2.0.0", | ||
"duplexify": "^4.1.2", | ||
"license-checker": "^25.0.1", | ||
"markdownlint-cli": "^0.31.0", | ||
"mqtt": "^4.3.4", | ||
"markdownlint-cli": "^0.31.1", | ||
"mqtt": "^4.3.7", | ||
"mqtt-connection": "^4.1.0", | ||
"pre-commit": "^1.2.2", | ||
"proxyquire": "^2.1.3", | ||
"release-it": "^14.12.4", | ||
"release-it": "^14.14.2", | ||
"snazzy": "^9.0.0", | ||
"standard": "^16.0.4", | ||
"tap": "^15.1.6", | ||
"tsd": "^0.19.1", | ||
"typescript": "^4.5.5", | ||
"tap": "^16.0.1", | ||
"tsd": "^0.20.0", | ||
"typescript": "^4.6.3", | ||
"websocket-stream": "^5.5.2" | ||
}, | ||
"dependencies": { | ||
"aedes-packet": "^2.3.1", | ||
"aedes-persistence": "^8.1.3", | ||
"bulk-write-stream": "^2.0.1", | ||
"aedes-packet": "^3.0.0", | ||
"aedes-persistence": "^9.1.1", | ||
"end-of-stream": "^1.4.4", | ||
@@ -129,6 +128,5 @@ "fastfall": "^1.5.1", | ||
"fastseries": "^2.0.0", | ||
"hyperid": "^3.0.0", | ||
"hyperid": "^3.0.1", | ||
"mqemitter": "^4.5.0", | ||
"mqtt-packet": "^7.1.2", | ||
"readable-stream": "^3.6.0", | ||
"retimer": "^3.0.0", | ||
@@ -135,0 +133,0 @@ "reusify": "^1.0.4", |
@@ -27,3 +27,3 @@ <!-- markdownlint-disable MD013 MD024 --> | ||
- [Clusters](#clusters) | ||
- [Exensions](#exensions) | ||
- [Extensions](#extensions) | ||
- [Middleware Plugins](#middleware-plugins) | ||
@@ -30,0 +30,0 @@ - [Persistence](#persistence) |
@@ -425,3 +425,3 @@ 'use strict' | ||
const s = connect(setup()) | ||
const s = connect(setup(), { clientId: 'my-client-xyz' }) | ||
t.teardown(s.broker.close.bind(s.broker)) | ||
@@ -449,2 +449,3 @@ | ||
expected.brokerCounter = s.broker.counter | ||
expected.clientId = 'my-client-xyz' | ||
delete expected.length | ||
@@ -465,3 +466,3 @@ t.same(packet, expected, 'packet matches') | ||
const s = setup() | ||
const s = setup(aedes({ clientId: 'my-client-xyz-2' })) | ||
t.teardown(s.broker.close.bind(s.broker)) | ||
@@ -491,3 +492,4 @@ | ||
length: 12, | ||
dup: false | ||
dup: false, | ||
clientId: 'my-client' | ||
} | ||
@@ -526,2 +528,3 @@ | ||
const s = connect(setup(aedes({ | ||
clientId: 'my-client-xyz-3', | ||
authorizePublish: function (client, packet, cb) { | ||
@@ -532,3 +535,3 @@ t.ok(client, 'client exists') | ||
} | ||
}))) | ||
})), { clientId: 'my-client-xyz-3' }) | ||
t.teardown(s.broker.close.bind(s.broker)) | ||
@@ -550,2 +553,3 @@ | ||
expected.brokerCounter = s.broker.counter | ||
expected.clientId = 'my-client-xyz-3' | ||
delete expected.length | ||
@@ -599,3 +603,3 @@ t.same(packet, expected, 'packet matches') | ||
const s = connect(setup()) | ||
const s = connect(setup(), { clientId: 'my-client-xyz-4' }) | ||
t.teardown(s.broker.close.bind(s.broker)) | ||
@@ -610,3 +614,4 @@ | ||
length: 12, | ||
dup: false | ||
dup: false, | ||
clientId: 'my-client-xyz-4' | ||
} | ||
@@ -817,8 +822,15 @@ | ||
// rh, rap, nl are undefined because mqtt.parser is set to MQTT 3.1.1 and will thus erase these props from s.inStream.write | ||
const expected = [{ | ||
topic: 'hello', | ||
qos: 0 | ||
qos: 0, | ||
rh: undefined, | ||
rap: undefined, | ||
nl: undefined | ||
}, { | ||
topic: 'world', | ||
qos: 0 | ||
qos: 0, | ||
rh: undefined, | ||
rap: undefined, | ||
nl: undefined | ||
}] | ||
@@ -852,6 +864,12 @@ | ||
topic: 'hello', | ||
qos: 0 | ||
qos: 0, | ||
rh: 0, | ||
rap: true, | ||
nl: false | ||
}, { | ||
topic: 'world', | ||
qos: 0 | ||
qos: 0, | ||
rh: 0, | ||
rap: true, | ||
nl: false | ||
}] | ||
@@ -858,0 +876,0 @@ }) |
@@ -23,3 +23,3 @@ 'use strict' | ||
const s = connect(setup()) | ||
const s = connect(setup(), { clientId: 'my-client-xyz-5' }) | ||
t.teardown(s.broker.close.bind(s.broker)) | ||
@@ -33,3 +33,4 @@ | ||
retain: false, | ||
dup: false | ||
dup: false, | ||
clientId: 'my-client-xyz-5' | ||
} | ||
@@ -133,3 +134,3 @@ | ||
} | ||
const expectedSubs = ele.clean ? null : [{ topic: 'hello', qos: ele.qos }] | ||
const expectedSubs = ele.clean ? null : [{ topic: 'hello', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined }] | ||
@@ -194,3 +195,6 @@ subscribe(t, s, 'hello', ele.qos, function () { | ||
} | ||
const subs = [{ topic: 'hello', qos: ele.qos }, { topic: 'world', qos: ele.qos }] | ||
const subs = [ | ||
{ topic: 'hello', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined }, | ||
{ topic: 'world', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined } | ||
] | ||
const expectedSubs = ele.clean ? null : subs | ||
@@ -197,0 +201,0 @@ |
@@ -908,6 +908,6 @@ 'use strict' | ||
const broker = aedes() | ||
const broker = aedes({ clientId: 'my-client-xyz-7' }) | ||
t.teardown(broker.close.bind(broker)) | ||
const s = connect(setup(broker)) | ||
const s = connect(setup(broker), { clientId: 'my-client-xyz-7' }) | ||
const expected = { | ||
@@ -928,3 +928,4 @@ cmd: 'publish', | ||
retain: false, | ||
dup: false | ||
dup: false, | ||
clientId: 'my-client-xyz-7' | ||
} | ||
@@ -968,5 +969,6 @@ subscribe(t, s, 'hello', 0, function () { | ||
dup: false, | ||
messageId: undefined | ||
messageId: undefined, | ||
clientId: 'my-client-xyz-6' | ||
} | ||
connect(s, {}, function () { | ||
connect(s, { clientId: 'my-client-xyz-6' }, function () { | ||
broker.subscribe('hello', deliver, function () { | ||
@@ -973,0 +975,0 @@ t.pass('subscribed') |
@@ -242,3 +242,3 @@ 'use strict' | ||
const opts = { clean: cleanSession } | ||
const publisher = connect(setup(broker)) | ||
const publisher = connect(setup(broker), { clientId: 'my-client-xyz-8' }) | ||
const subscriber = connect(setup(broker), { ...opts, clientId: 'abcde' }) | ||
@@ -252,3 +252,4 @@ const forwarded = { | ||
dup: false, | ||
messageId: undefined | ||
messageId: undefined, | ||
clientId: 'my-client-xyz-8' | ||
} | ||
@@ -267,2 +268,3 @@ const expected = { | ||
forwarded.brokerCounter = broker.counter | ||
delete packet.nl | ||
t.same(packet, forwarded, 'forwarded packet must match') | ||
@@ -269,0 +271,0 @@ return packet |
import { AedesPacket } from 'aedes-packet' | ||
import { IConnackPacket, IConnectPacket, IPingreqPacket, IPublishPacket, IPubrelPacket, ISubscribePacket, ISubscription, IUnsubscribePacket } from 'mqtt-packet' | ||
import { Client } from './client' | ||
export type SubscribePacket = ISubscribePacket & { cmd: 'subscribe' } | ||
export type UnsubscribePacket = IUnsubscribePacket & { cmd: 'unsubscribe' } | ||
export type Subscription = ISubscription & { clientId?: string } | ||
export type Subscription = ISubscription & { clientId?: Client['id'] } | ||
export type Subscriptions = { subscriptions: Subscription[] } | ||
@@ -8,0 +9,0 @@ |
Sorry, the diff of this file is not supported yet
302324
12
62
8868
+ Addedaedes-packet@3.0.0(transitive)
+ Addedaedes-persistence@9.1.2(transitive)
+ Addedqlobber@7.0.1(transitive)
+ Addedsafe-buffer@5.2.1(transitive)
+ Addedstring_decoder@1.3.0(transitive)
- Removedbulk-write-stream@^2.0.1
- Removedreadable-stream@^3.6.0
- Removedaedes-packet@2.3.1(transitive)
- Removedaedes-persistence@8.1.3(transitive)
- Removedbulk-write-stream@2.0.1(transitive)
- Removedcore-util-is@1.0.3(transitive)
- Removedfrom2@2.3.0(transitive)
- Removedisarray@1.0.0(transitive)
- Removedmqtt-packet@6.10.0(transitive)
- Removedreadable-stream@2.3.8(transitive)
- Removedsafe-buffer@5.1.2(transitive)
- Removedstring_decoder@1.1.1(transitive)
Updatedaedes-packet@^3.0.0
Updatedaedes-persistence@^9.1.1
Updatedhyperid@^3.0.1