Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mqtt

Package Overview
Dependencies
Maintainers
7
Versions
203
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mqtt - npm Package Compare versions

Comparing version 5.0.0-beta.2 to 5.0.0-beta.3

lib/handlers/ack.js

14

bin/mqtt.js
#!/usr/bin/env node
'use strict'

@@ -13,4 +12,4 @@ /*

const helpMe = require('help-me')({
dir: path.join(path.dirname(require.main.filename), '/../doc'),
ext: '.txt'
dir: path.join(path.dirname(require.main.filename), '/../doc'),
ext: '.txt',
})

@@ -20,4 +19,5 @@

commist.register('subscribe', require('./sub'))
commist.register('version', function () {
console.log('MQTT.js version:', require('./../package.json').version)
commist.register('version', () => {
console.log('MQTT.js version:', require('../package.json').version)
})

@@ -27,4 +27,4 @@ commist.register('help', helpMe.toStdout)

if (commist.parse(process.argv.slice(2)) !== null) {
console.log('No such command:', process.argv[2], '\n')
helpMe.toStdout()
console.log('No such command:', process.argv[2], '\n')
helpMe.toStdout()
}
#!/usr/bin/env node
'use strict'
const mqtt = require('../')
const pump = require('pump')
const mqtt = require('..')
const { pipeline, Writable } = require('readable-stream')
const path = require('path')
const fs = require('fs')
const concat = require('concat-stream')
const Writable = require('readable-stream').Writable
const helpMe = require('help-me')({
dir: path.join(__dirname, '..', 'doc')
dir: path.join(__dirname, '..', 'doc'),
})

@@ -17,125 +14,141 @@ const minimist = require('minimist')

function send (args) {
const client = mqtt.connect(args)
client.on('connect', function () {
client.publish(args.topic, args.message, args, function (err) {
if (err) {
console.warn(err)
}
client.end()
})
})
client.on('error', function (err) {
console.warn(err)
client.end()
})
function send(args) {
const client = mqtt.connect(args)
client.on('connect', () => {
client.publish(args.topic, args.message, args, (err) => {
if (err) {
console.warn(err)
}
client.end()
})
})
client.on('error', (err) => {
console.warn(err)
client.end()
})
}
function multisend (args) {
const client = mqtt.connect(args)
const sender = new Writable({
objectMode: true
})
sender._write = function (line, enc, cb) {
client.publish(args.topic, line.trim(), args, cb)
}
function multisend(args) {
const client = mqtt.connect(args)
const sender = new Writable({
objectMode: true,
})
sender._write = (line, enc, cb) => {
client.publish(args.topic, line.trim(), args, cb)
}
client.on('connect', function () {
pump(process.stdin, split2(), sender, function (err) {
client.end()
if (err) {
throw err
}
})
})
client.on('connect', () => {
pipeline(process.stdin, split2(), sender, (err) => {
client.end()
if (err) {
throw err
}
})
})
}
function start (args) {
args = minimist(args, {
string: ['hostname', 'username', 'password', 'key', 'cert', 'ca', 'message', 'clientId', 'i', 'id'],
boolean: ['stdin', 'retain', 'help', 'insecure', 'multiline'],
alias: {
port: 'p',
hostname: ['h', 'host'],
topic: 't',
message: 'm',
qos: 'q',
clientId: ['i', 'id'],
retain: 'r',
username: 'u',
password: 'P',
stdin: 's',
multiline: 'M',
protocol: ['C', 'l'],
help: 'H',
ca: 'cafile'
},
default: {
host: 'localhost',
qos: 0,
retain: false,
topic: '',
message: ''
}
})
function start(args) {
args = minimist(args, {
string: [
'hostname',
'username',
'password',
'key',
'cert',
'ca',
'message',
'clientId',
'i',
'id',
],
boolean: ['stdin', 'retain', 'help', 'insecure', 'multiline'],
alias: {
port: 'p',
hostname: ['h', 'host'],
topic: 't',
message: 'm',
qos: 'q',
clientId: ['i', 'id'],
retain: 'r',
username: 'u',
password: 'P',
stdin: 's',
multiline: 'M',
protocol: ['C', 'l'],
help: 'H',
ca: 'cafile',
},
default: {
host: 'localhost',
qos: 0,
retain: false,
topic: '',
message: '',
},
})
if (args.help) {
return helpMe.toStdout('publish')
}
if (args.help) {
return helpMe.toStdout('publish')
}
if (args.key) {
args.key = fs.readFileSync(args.key)
}
if (args.key) {
args.key = fs.readFileSync(args.key)
}
if (args.cert) {
args.cert = fs.readFileSync(args.cert)
}
if (args.cert) {
args.cert = fs.readFileSync(args.cert)
}
if (args.ca) {
args.ca = fs.readFileSync(args.ca)
}
if (args.ca) {
args.ca = fs.readFileSync(args.ca)
}
if (args.key && args.cert && !args.protocol) {
args.protocol = 'mqtts'
}
if (args.key && args.cert && !args.protocol) {
args.protocol = 'mqtts'
}
if (args.port) {
if (typeof args.port !== 'number') {
console.warn('# Port: number expected, \'%s\' was given.', typeof args.port)
return
}
}
if (args.port) {
if (typeof args.port !== 'number') {
console.warn(
"# Port: number expected, '%s' was given.",
typeof args.port,
)
return
}
}
if (args['will-topic']) {
args.will = {}
args.will.topic = args['will-topic']
args.will.payload = args['will-message']
args.will.qos = args['will-qos']
args.will.retain = args['will-retain']
}
if (args['will-topic']) {
args.will = {}
args.will.topic = args['will-topic']
args.will.payload = args['will-message']
args.will.qos = args['will-qos']
args.will.retain = args['will-retain']
}
if (args.insecure) {
args.rejectUnauthorized = false
}
if (args.insecure) {
args.rejectUnauthorized = false
}
args.topic = (args.topic || args._.shift()).toString()
args.message = (args.message || args._.shift()).toString()
args.topic = (args.topic || args._.shift()).toString()
args.message = (args.message || args._.shift()).toString()
if (!args.topic) {
console.error('missing topic\n')
return helpMe.toStdout('publish')
}
if (!args.topic) {
console.error('missing topic\n')
return helpMe.toStdout('publish')
}
if (args.stdin) {
if (args.multiline) {
multisend(args)
} else {
process.stdin.pipe(concat(function (data) {
args.message = data
send(args)
}))
}
} else {
send(args)
}
if (args.stdin) {
if (args.multiline) {
multisend(args)
} else {
process.stdin.pipe(
concat((data) => {
args.message = data
send(args)
}),
)
}
} else {
send(args)
}
}

@@ -146,3 +159,3 @@

if (require.main === module) {
start(process.argv.slice(2))
start(process.argv.slice(2))
}
#!/usr/bin/env node
const mqtt = require('../')
const mqtt = require('..')
const path = require('path')
const fs = require('fs')
const helpMe = require('help-me')({
dir: path.join(__dirname, '..', 'doc')
dir: path.join(__dirname, '..', 'doc'),
})
const minimist = require('minimist')
function start (args) {
args = minimist(args, {
string: ['hostname', 'username', 'password', 'key', 'cert', 'ca', 'clientId', 'i', 'id'],
boolean: ['stdin', 'help', 'clean', 'insecure'],
alias: {
port: 'p',
hostname: ['h', 'host'],
topic: 't',
qos: 'q',
clean: 'c',
keepalive: 'k',
clientId: ['i', 'id'],
username: 'u',
password: 'P',
protocol: ['C', 'l'],
verbose: 'v',
help: '-H',
ca: 'cafile'
},
default: {
host: 'localhost',
qos: 0,
retain: false,
clean: true,
keepAlive: 30 // 30 sec
}
})
function start(args) {
args = minimist(args, {
string: [
'hostname',
'username',
'password',
'key',
'cert',
'ca',
'clientId',
'i',
'id',
],
boolean: ['stdin', 'help', 'clean', 'insecure'],
alias: {
port: 'p',
hostname: ['h', 'host'],
topic: 't',
qos: 'q',
clean: 'c',
keepalive: 'k',
clientId: ['i', 'id'],
username: 'u',
password: 'P',
protocol: ['C', 'l'],
verbose: 'v',
help: '-H',
ca: 'cafile',
},
default: {
host: 'localhost',
qos: 0,
retain: false,
clean: true,
keepAlive: 30, // 30 sec
},
})
if (args.help) {
return helpMe.toStdout('subscribe')
}
if (args.help) {
return helpMe.toStdout('subscribe')
}
args.topic = args.topic || args._.shift()
args.topic = args.topic || args._.shift()
if (!args.topic) {
console.error('missing topic\n')
return helpMe.toStdout('subscribe')
}
if (!args.topic) {
console.error('missing topic\n')
return helpMe.toStdout('subscribe')
}
if (args.key) {
args.key = fs.readFileSync(args.key)
}
if (args.key) {
args.key = fs.readFileSync(args.key)
}
if (args.cert) {
args.cert = fs.readFileSync(args.cert)
}
if (args.cert) {
args.cert = fs.readFileSync(args.cert)
}
if (args.ca) {
args.ca = fs.readFileSync(args.ca)
}
if (args.ca) {
args.ca = fs.readFileSync(args.ca)
}
if (args.key && args.cert && !args.protocol) {
args.protocol = 'mqtts'
}
if (args.key && args.cert && !args.protocol) {
args.protocol = 'mqtts'
}
if (args.insecure) {
args.rejectUnauthorized = false
}
if (args.insecure) {
args.rejectUnauthorized = false
}
if (args.port) {
if (typeof args.port !== 'number') {
console.warn('# Port: number expected, \'%s\' was given.', typeof args.port)
return
}
}
if (args.port) {
if (typeof args.port !== 'number') {
console.warn(
"# Port: number expected, '%s' was given.",
typeof args.port,
)
return
}
}
if (args['will-topic']) {
args.will = {}
args.will.topic = args['will-topic']
args.will.payload = args['will-message']
args.will.qos = args['will-qos']
args.will.retain = args['will-retain']
}
if (args['will-topic']) {
args.will = {}
args.will.topic = args['will-topic']
args.will.payload = args['will-message']
args.will.qos = args['will-qos']
args.will.retain = args['will-retain']
}
args.keepAlive = args['keep-alive']
args.keepAlive = args['keep-alive']
const client = mqtt.connect(args)
const client = mqtt.connect(args)
client.on('connect', function () {
client.subscribe(args.topic, { qos: args.qos }, function (err, result) {
if (err) {
console.error(err)
process.exit(1)
}
client.on('connect', () => {
client.subscribe(args.topic, { qos: args.qos }, (err, result) => {
if (err) {
console.error(err)
process.exit(1)
}
result.forEach(function (sub) {
if (sub.qos > 2) {
console.error('subscription negated to', sub.topic, 'with code', sub.qos)
process.exit(1)
}
})
})
})
result.forEach((sub) => {
if (sub.qos > 2) {
console.error(
'subscription negated to',
sub.topic,
'with code',
sub.qos,
)
process.exit(1)
}
})
})
})
client.on('message', function (topic, payload) {
if (args.verbose) {
console.log(topic, payload.toString())
} else {
console.log(payload.toString())
}
})
client.on('message', (topic, payload) => {
if (args.verbose) {
console.log(topic, payload.toString())
} else {
console.log(payload.toString())
}
})
client.on('error', function (err) {
console.warn(err)
client.end()
})
client.on('error', (err) => {
console.warn(err)
client.end()
})
}

@@ -122,3 +140,3 @@

if (require.main === module) {
start(process.argv.slice(2))
start(process.argv.slice(2))
}

@@ -1,243 +0,51 @@

'use strict'
/**
* Module dependencies
*/
const EventEmitter = require('events').EventEmitter
const Store = require('./store')
const { EventEmitter } = require('events')
const TopicAliasRecv = require('./topic-alias-recv')
const TopicAliasSend = require('./topic-alias-send')
const mqttPacket = require('mqtt-packet')
const DefaultMessageIdProvider = require('./default-message-id-provider')
const Writable = require('readable-stream').Writable
const inherits = require('inherits')
const { Writable } = require('readable-stream')
const reInterval = require('reinterval')
const clone = require('rfdc/default')
const validations = require('./validations')
const xtend = require('xtend')
const debug = require('debug')('mqttjs:client')
const nextTick = process ? process.nextTick : function (callback) { setTimeout(callback, 0) }
const setImmediate = global.setImmediate || function (...args) {
const callback = args.shift()
nextTick(callback.bind(null, ...args))
}
const Store = require('./store')
const handlePacket = require('./handlers')
const nextTick = process
? process.nextTick
: (callback) => {
setTimeout(callback, 0)
}
const setImmediate =
global.setImmediate ||
((...args) => {
const callback = args.shift()
nextTick(() => {
callback(...args)
})
})
const defaultConnectOptions = {
keepalive: 60,
reschedulePings: true,
protocolId: 'MQTT',
protocolVersion: 4,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
clean: true,
resubscribe: true,
writeCache: true
keepalive: 60,
reschedulePings: true,
protocolId: 'MQTT',
protocolVersion: 4,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
clean: true,
resubscribe: true,
writeCache: true,
}
const socketErrors = [
'ECONNREFUSED',
'EADDRINUSE',
'ECONNRESET',
'ENOTFOUND',
'ETIMEDOUT'
'ECONNREFUSED',
'EADDRINUSE',
'ECONNRESET',
'ENOTFOUND',
'ETIMEDOUT',
]
// Other Socket Errors: EADDRINUSE, ECONNRESET, ENOTFOUND, ETIMEDOUT.
const errors = {
0: '',
1: 'Unacceptable protocol version',
2: 'Identifier rejected',
3: 'Server unavailable',
4: 'Bad username or password',
5: 'Not authorized',
16: 'No matching subscribers',
17: 'No subscription existed',
128: 'Unspecified error',
129: 'Malformed Packet',
130: 'Protocol Error',
131: 'Implementation specific error',
132: 'Unsupported Protocol Version',
133: 'Client Identifier not valid',
134: 'Bad User Name or Password',
135: 'Not authorized',
136: 'Server unavailable',
137: 'Server busy',
138: 'Banned',
139: 'Server shutting down',
140: 'Bad authentication method',
141: 'Keep Alive timeout',
142: 'Session taken over',
143: 'Topic Filter invalid',
144: 'Topic Name invalid',
145: 'Packet identifier in use',
146: 'Packet Identifier not found',
147: 'Receive Maximum exceeded',
148: 'Topic Alias invalid',
149: 'Packet too large',
150: 'Message rate too high',
151: 'Quota exceeded',
152: 'Administrative action',
153: 'Payload format invalid',
154: 'Retain not supported',
155: 'QoS not supported',
156: 'Use another server',
157: 'Server moved',
158: 'Shared Subscriptions not supported',
159: 'Connection rate exceeded',
160: 'Maximum connect time',
161: 'Subscription Identifiers not supported',
162: 'Wildcard Subscriptions not supported'
}
function defaultId () {
return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
}
function applyTopicAlias (client, packet) {
if (client.options.protocolVersion === 5) {
if (packet.cmd === 'publish') {
let alias
if (packet.properties) {
alias = packet.properties.topicAlias
}
const topic = packet.topic.toString()
if (client.topicAliasSend) {
if (alias) {
if (topic.length !== 0) {
// register topic alias
debug('applyTopicAlias :: register topic: %s - alias: %d', topic, alias)
if (!client.topicAliasSend.put(topic, alias)) {
debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias)
return new Error('Sending Topic Alias out of range')
}
}
} else {
if (topic.length !== 0) {
if (client.options.autoAssignTopicAlias) {
alias = client.topicAliasSend.getAliasByTopic(topic)
if (alias) {
packet.topic = ''
packet.properties = { ...(packet.properties), topicAlias: alias }
debug('applyTopicAlias :: auto assign(use) topic: %s - alias: %d', topic, alias)
} else {
alias = client.topicAliasSend.getLruAlias()
client.topicAliasSend.put(topic, alias)
packet.properties = { ...(packet.properties), topicAlias: alias }
debug('applyTopicAlias :: auto assign topic: %s - alias: %d', topic, alias)
}
} else if (client.options.autoUseTopicAlias) {
alias = client.topicAliasSend.getAliasByTopic(topic)
if (alias) {
packet.topic = ''
packet.properties = { ...(packet.properties), topicAlias: alias }
debug('applyTopicAlias :: auto use topic: %s - alias: %d', topic, alias)
}
}
}
}
} else if (alias) {
debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias)
return new Error('Sending Topic Alias out of range')
}
}
}
}
function removeTopicAliasAndRecoverTopicName (client, packet) {
let alias
if (packet.properties) {
alias = packet.properties.topicAlias
}
let topic = packet.topic.toString()
if (topic.length === 0) {
// restore topic from alias
if (typeof alias === 'undefined') {
return new Error('Unregistered Topic Alias')
} else {
topic = client.topicAliasSend.getTopicByAlias(alias)
if (typeof topic === 'undefined') {
return new Error('Unregistered Topic Alias')
} else {
packet.topic = topic
}
}
}
if (alias) {
delete packet.properties.topicAlias
}
}
function sendPacket (client, packet, cb) {
debug('sendPacket :: packet: %O', packet)
debug('sendPacket :: emitting `packetsend`')
client.emit('packetsend', packet)
debug('sendPacket :: writing to stream')
const result = mqttPacket.writeToStream(packet, client.stream, client.options)
debug('sendPacket :: writeToStream result %s', result)
if (!result && cb && cb !== nop) {
debug('sendPacket :: handle events on `drain` once through callback.')
client.stream.once('drain', cb)
} else if (cb) {
debug('sendPacket :: invoking cb')
cb()
}
}
function flush (queue) {
if (queue) {
debug('flush: queue exists? %b', !!(queue))
Object.keys(queue).forEach(function (messageId) {
if (typeof queue[messageId].cb === 'function') {
queue[messageId].cb(new Error('Connection closed'))
// This is suspicious. Why do we only delete this if we have a callbck?
// If this is by-design, then adding no as callback would cause this to get deleted unintentionally.
delete queue[messageId]
}
})
}
}
function flushVolatile (queue) {
if (queue) {
debug('flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function')
Object.keys(queue).forEach(function (messageId) {
if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') {
queue[messageId].cb(new Error('Connection closed'))
delete queue[messageId]
}
})
}
}
function storeAndSend (client, packet, cb, cbStorePut) {
debug('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd)
let storePacket = packet
let err
if (storePacket.cmd === 'publish') {
// The original packet is for sending.
// The cloned storePacket is for storing to resend on reconnect.
// Topic Alias must not be used after disconnected.
storePacket = clone(packet)
err = removeTopicAliasAndRecoverTopicName(client, storePacket)
if (err) {
return cb && cb(err)
}
}
client.outgoingStore.put(storePacket, function storedPacket (err) {
if (err) {
return cb && cb(err)
}
cbStorePut()
sendPacket(client, packet, cb)
})
}
function nop (error) {
debug('nop ::', error)
}
/**

@@ -250,1670 +58,1612 @@ * MqttClient constructor

*/
function MqttClient (streamBuilder, options) {
let k
const that = this
class MqttClient extends EventEmitter {
static defaultId() {
return `mqttjs_${Math.random().toString(16).substr(2, 8)}`
}
if (!(this instanceof MqttClient)) {
return new MqttClient(streamBuilder, options)
}
constructor(streamBuilder, options) {
super()
this.options = options || {}
let k
// Defaults
for (k in defaultConnectOptions) {
if (typeof this.options[k] === 'undefined') {
this.options[k] = defaultConnectOptions[k]
} else {
this.options[k] = options[k]
}
}
this.options = options || {}
debug('MqttClient :: options.protocol', options.protocol)
debug('MqttClient :: options.protocolVersion', options.protocolVersion)
debug('MqttClient :: options.username', options.username)
debug('MqttClient :: options.keepalive', options.keepalive)
debug('MqttClient :: options.reconnectPeriod', options.reconnectPeriod)
debug('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized)
debug('MqttClient :: options.properties.topicAliasMaximum', options.properties ? options.properties.topicAliasMaximum : undefined)
// Defaults
for (k in defaultConnectOptions) {
if (typeof this.options[k] === 'undefined') {
this.options[k] = defaultConnectOptions[k]
} else {
this.options[k] = options[k]
}
}
this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId()
this.log = this.options.log || debug
this.noop = this._noop.bind(this)
debug('MqttClient :: clientId', this.options.clientId)
this.log('MqttClient :: options.protocol', options.protocol)
this.log(
'MqttClient :: options.protocolVersion',
options.protocolVersion,
)
this.log('MqttClient :: options.username', options.username)
this.log('MqttClient :: options.keepalive', options.keepalive)
this.log(
'MqttClient :: options.reconnectPeriod',
options.reconnectPeriod,
)
this.log(
'MqttClient :: options.rejectUnauthorized',
options.rejectUnauthorized,
)
this.log(
'MqttClient :: options.properties.topicAliasMaximum',
options.properties
? options.properties.topicAliasMaximum
: undefined,
)
this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) }
this.options.clientId =
typeof options.clientId === 'string'
? options.clientId
: MqttClient.defaultId()
// Disable pre-generated write cache if requested. Will allocate buffers on-the-fly instead. WARNING: This can affect write performance
if (!this.options.writeCache) {
mqttPacket.writeToStream.cacheNumbers = false
}
this.log('MqttClient :: clientId', this.options.clientId)
this.streamBuilder = streamBuilder
this.options.customHandleAcks =
options.protocolVersion === 5 && options.customHandleAcks
? options.customHandleAcks
: (...args) => {
args[3](0)
}
this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider
// Disable pre-generated write cache if requested. Will allocate buffers on-the-fly instead. WARNING: This can affect write performance
if (!this.options.writeCache) {
mqttPacket.writeToStream.cacheNumbers = false
}
// Inflight message storages
this.outgoingStore = options.outgoingStore || new Store()
this.incomingStore = options.incomingStore || new Store()
this.streamBuilder = streamBuilder
// Should QoS zero messages be queued when the connection is broken?
this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero
this.messageIdProvider =
typeof this.options.messageIdProvider === 'undefined'
? new DefaultMessageIdProvider()
: this.options.messageIdProvider
// map of subscribed topics to support reconnection
this._resubscribeTopics = {}
// Inflight message storages
this.outgoingStore = options.outgoingStore || new Store()
this.incomingStore = options.incomingStore || new Store()
// map of a subscribe messageId and a topic
this.messageIdToTopic = {}
// Should QoS zero messages be queued when the connection is broken?
this.queueQoSZero =
options.queueQoSZero === undefined ? true : options.queueQoSZero
// Ping timer, setup in _setupPingTimer
this.pingTimer = null
// Is the client connected?
this.connected = false
// Are we disconnecting?
this.disconnecting = false
// Packet queue
this.queue = []
// connack timer
this.connackTimer = null
// Reconnect timer
this.reconnectTimer = null
// Is processing store?
this._storeProcessing = false
// Packet Ids are put into the store during store processing
this._packetIdsDuringStoreProcessing = {}
// Store processing queue
this._storeProcessingQueue = []
// map of subscribed topics to support reconnection
this._resubscribeTopics = {}
// Inflight callbacks
this.outgoing = {}
// map of a subscribe messageId and a topic
this.messageIdToTopic = {}
// True if connection is first time.
this._firstConnection = true
// Ping timer, setup in _setupPingTimer
this.pingTimer = null
// Is the client connected?
this.connected = false
// Are we disconnecting?
this.disconnecting = false
// Packet queue
this.queue = []
// connack timer
this.connackTimer = null
// Reconnect timer
this.reconnectTimer = null
// Is processing store?
this._storeProcessing = false
// Packet Ids are put into the store during store processing
this._packetIdsDuringStoreProcessing = {}
// Store processing queue
this._storeProcessingQueue = []
if (options.properties && options.properties.topicAliasMaximum > 0) {
if (options.properties.topicAliasMaximum > 0xffff) {
debug('MqttClient :: options.properties.topicAliasMaximum is out of range')
} else {
this.topicAliasRecv = new TopicAliasRecv(options.properties.topicAliasMaximum)
}
}
// Inflight callbacks
this.outgoing = {}
// Send queued packets
this.on('connect', function () {
const queue = that.queue
// True if connection is first time.
this._firstConnection = true
function deliver () {
const entry = queue.shift()
debug('deliver :: entry %o', entry)
let packet = null
if (options.properties && options.properties.topicAliasMaximum > 0) {
if (options.properties.topicAliasMaximum > 0xffff) {
this.log(
'MqttClient :: options.properties.topicAliasMaximum is out of range',
)
} else {
this.topicAliasRecv = new TopicAliasRecv(
options.properties.topicAliasMaximum,
)
}
}
if (!entry) {
that._resubscribe()
return
}
// Send queued packets
this.on('connect', () => {
const { queue } = this
packet = entry.packet
debug('deliver :: call _sendPacket for %o', packet)
let send = true
if (packet.messageId && packet.messageId !== 0) {
if (!that.messageIdProvider.register(packet.messageId)) {
send = false
}
}
if (send) {
that._sendPacket(
packet,
function (err) {
if (entry.cb) {
entry.cb(err)
}
deliver()
}
)
} else {
debug('messageId: %d has already used. The message is skipped and removed.', packet.messageId)
deliver()
}
}
const deliver = () => {
const entry = queue.shift()
this.log('deliver :: entry %o', entry)
let packet = null
debug('connect :: sending queued packets')
deliver()
})
if (!entry) {
this._resubscribe()
return
}
this.on('close', function () {
debug('close :: connected set to `false`')
that.connected = false
packet = entry.packet
this.log('deliver :: call _sendPacket for %o', packet)
let send = true
if (packet.messageId && packet.messageId !== 0) {
if (!this.messageIdProvider.register(packet.messageId)) {
send = false
}
}
if (send) {
this._sendPacket(packet, (err) => {
if (entry.cb) {
entry.cb(err)
}
deliver()
})
} else {
this.log(
'messageId: %d has already used. The message is skipped and removed.',
packet.messageId,
)
deliver()
}
}
debug('close :: clearing connackTimer')
clearTimeout(that.connackTimer)
this.log('connect :: sending queued packets')
deliver()
})
debug('close :: clearing ping timer')
if (that.pingTimer !== null) {
that.pingTimer.clear()
that.pingTimer = null
}
this.on('close', () => {
this.log('close :: connected set to `false`')
this.connected = false
if (that.topicAliasRecv) {
that.topicAliasRecv.clear()
}
this.log('close :: clearing connackTimer')
clearTimeout(this.connackTimer)
debug('close :: calling _setupReconnect')
that._setupReconnect()
})
EventEmitter.call(this)
this.log('close :: clearing ping timer')
if (this.pingTimer !== null) {
this.pingTimer.clear()
this.pingTimer = null
}
debug('MqttClient :: setting up stream')
this._setupStream()
}
inherits(MqttClient, EventEmitter)
if (this.topicAliasRecv) {
this.topicAliasRecv.clear()
}
/**
* setup the event handlers in the inner stream.
*
* @api private
*/
MqttClient.prototype._setupStream = function () {
const that = this
const writable = new Writable()
const parser = mqttPacket.parser(this.options)
let completeParse = null
const packets = []
this.log('close :: calling _setupReconnect')
this._setupReconnect()
})
debug('_setupStream :: calling method to clear reconnect')
this._clearReconnect()
if (!this.options.manualConnect) {
this.log('MqttClient :: setting up stream')
this.connect()
}
}
debug('_setupStream :: using streamBuilder provided to client to create stream')
this.stream = this.streamBuilder(this)
/**
* Setup the event handlers in the inner stream, sends `connect` and `auth` packets
*/
connect() {
const writable = new Writable()
const parser = mqttPacket.parser(this.options)
let completeParse = null
const packets = []
parser.on('packet', function (packet) {
debug('parser :: on packet push to packets array.')
packets.push(packet)
})
this.log('connect :: calling method to clear reconnect')
this._clearReconnect()
function nextTickWork () {
if (packets.length) {
nextTick(work)
} else {
const done = completeParse
completeParse = null
done()
}
}
this.log(
'connect :: using streamBuilder provided to client to create stream',
)
this.stream = this.streamBuilder(this)
function work () {
debug('work :: getting next packet in queue')
const packet = packets.shift()
parser.on('packet', (packet) => {
this.log('parser :: on packet push to packets array.')
packets.push(packet)
})
if (packet) {
debug('work :: packet pulled from queue')
that._handlePacket(packet, nextTickWork)
} else {
debug('work :: no packets in queue')
const done = completeParse
completeParse = null
debug('work :: done flag is %s', !!(done))
if (done) done()
}
}
const work = () => {
this.log('work :: getting next packet in queue')
const packet = packets.shift()
writable._write = function (buf, enc, done) {
completeParse = done
debug('writable stream :: parsing buffer')
parser.parse(buf)
work()
}
if (packet) {
this.log('work :: packet pulled from queue')
handlePacket(this, packet, nextTickWork)
} else {
this.log('work :: no packets in queue')
const done = completeParse
completeParse = null
this.log('work :: done flag is %s', !!done)
if (done) done()
}
}
function streamErrorHandler (error) {
debug('streamErrorHandler :: error', error.message)
if (socketErrors.includes(error.code)) {
// handle error
debug('streamErrorHandler :: emitting error')
that.emit('error', error)
} else {
nop(error)
}
}
const nextTickWork = () => {
if (packets.length) {
nextTick(work)
} else {
const done = completeParse
completeParse = null
done()
}
}
debug('_setupStream :: pipe stream to writable stream')
this.stream.pipe(writable)
writable._write = (buf, enc, done) => {
completeParse = done
this.log('writable stream :: parsing buffer')
parser.parse(buf)
work()
}
// Suppress connection errors
this.stream.on('error', streamErrorHandler)
const streamErrorHandler = (error) => {
this.log('streamErrorHandler :: error', error.message)
if (socketErrors.includes(error.code)) {
// handle error
this.log('streamErrorHandler :: emitting error')
this.emit('error', error)
} else {
this.noop(error)
}
}
// Echo stream close
this.stream.on('close', function () {
debug('(%s)stream :: on close', that.options.clientId)
flushVolatile(that.outgoing)
debug('stream: emit close to MqttClient')
that.emit('close')
})
this.log('connect :: pipe stream to writable stream')
this.stream.pipe(writable)
// Send a connect packet
debug('_setupStream: sending packet `connect`')
const connectPacket = Object.create(this.options)
connectPacket.cmd = 'connect'
if (this.topicAliasRecv) {
if (!connectPacket.properties) {
connectPacket.properties = {}
}
if (this.topicAliasRecv) {
connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max
}
}
// avoid message queue
sendPacket(this, connectPacket)
// Suppress connection errors
this.stream.on('error', streamErrorHandler)
// Echo connection errors
parser.on('error', this.emit.bind(this, 'error'))
// Echo stream close
this.stream.on('close', () => {
this.log('(%s)stream :: on close', this.options.clientId)
this._flushVolatile(this.outgoing)
this.log('stream: emit close to MqttClient')
this.emit('close')
})
// auth
if (this.options.properties) {
if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) {
that.end(() =>
this.emit('error', new Error('Packet has no Authentication Method')
))
return this
}
if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') {
const authPacket = xtend({ cmd: 'auth', reasonCode: 0 }, this.options.authPacket)
sendPacket(this, authPacket)
}
}
// Send a connect packet
this.log('connect: sending packet `connect`')
const connectPacket = Object.create(this.options)
connectPacket.cmd = 'connect'
if (this.topicAliasRecv) {
if (!connectPacket.properties) {
connectPacket.properties = {}
}
if (this.topicAliasRecv) {
connectPacket.properties.topicAliasMaximum =
this.topicAliasRecv.max
}
}
// avoid message queue
this._writePacket(connectPacket)
// many drain listeners are needed for qos 1 callbacks if the connection is intermittent
this.stream.setMaxListeners(1000)
// Echo connection errors
parser.on('error', this.emit.bind(this, 'error'))
clearTimeout(this.connackTimer)
this.connackTimer = setTimeout(function () {
debug('!!connectTimeout hit!! Calling _cleanUp with force `true`')
that._cleanUp(true)
}, this.options.connectTimeout)
}
// auth
if (this.options.properties) {
if (
!this.options.properties.authenticationMethod &&
this.options.properties.authenticationData
) {
this.end(() =>
this.emit(
'error',
new Error('Packet has no Authentication Method'),
),
)
return this
}
if (
this.options.properties.authenticationMethod &&
this.options.authPacket &&
typeof this.options.authPacket === 'object'
) {
const authPacket = {
cmd: 'auth',
reasonCode: 0,
...this.options.authPacket,
}
this._writePacket(authPacket)
}
}
MqttClient.prototype._handlePacket = function (packet, done) {
const options = this.options
// many drain listeners are needed for qos 1 callbacks if the connection is intermittent
this.stream.setMaxListeners(1000)
if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) {
this.emit('error', new Error('exceeding packets size ' + packet.cmd))
this.end({ reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' } })
return this
}
debug('_handlePacket :: emitting packetreceive')
this.emit('packetreceive', packet)
clearTimeout(this.connackTimer)
this.connackTimer = setTimeout(() => {
this.log(
'!!connectTimeout hit!! Calling _cleanUp with force `true`',
)
this._cleanUp(true)
}, this.options.connectTimeout)
switch (packet.cmd) {
case 'publish':
this._handlePublish(packet, done)
break
case 'puback':
case 'pubrec':
case 'pubcomp':
case 'suback':
case 'unsuback':
this._handleAck(packet)
done()
break
case 'pubrel':
this._handlePubrel(packet, done)
break
case 'connack':
this._handleConnack(packet)
done()
break
case 'auth':
this._handleAuth(packet)
done()
break
case 'pingresp':
this._handlePingresp(packet)
done()
break
case 'disconnect':
this._handleDisconnect(packet)
done()
break
default:
// do nothing
// maybe we should do an error handling
// or just log it
break
}
}
return this
}
MqttClient.prototype._checkDisconnecting = function (callback) {
if (this.disconnecting) {
if (callback && callback !== nop) {
callback(new Error('client disconnecting'))
} else {
this.emit('error', new Error('client disconnecting'))
}
}
return this.disconnecting
}
_flushVolatile(queue) {
if (queue) {
this.log(
'_flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function',
)
Object.keys(queue).forEach((messageId) => {
if (
queue[messageId].volatile &&
typeof queue[messageId].cb === 'function'
) {
queue[messageId].cb(new Error('Connection closed'))
delete queue[messageId]
}
})
}
}
/**
* publish - publish <message> to <topic>
*
* @param {String} topic - topic to publish to
* @param {String, Buffer} message - message to publish
* @param {Object} [opts] - publish options, includes:
* {Number} qos - qos level to publish on
* {Boolean} retain - whether or not to retain the message
* {Boolean} dup - whether or not mark a message as duplicate
* {Function} cbStorePut - function(){} called when message is put into `outgoingStore`
* @param {Function} [callback] - function(err){}
* called when publish succeeds or fails
* @returns {MqttClient} this - for chaining
* @api public
*
* @example client.publish('topic', 'message');
* @example
* client.publish('topic', 'message', {qos: 1, retain: true, dup: true});
* @example client.publish('topic', 'message', console.log);
*/
MqttClient.prototype.publish = function (topic, message, opts, callback) {
debug('publish :: message `%s` to topic `%s`', message, topic)
const options = this.options
_flush(queue) {
if (queue) {
this.log('_flush: queue exists? %b', !!queue)
Object.keys(queue).forEach((messageId) => {
if (typeof queue[messageId].cb === 'function') {
queue[messageId].cb(new Error('Connection closed'))
// This is suspicious. Why do we only delete this if we have a callback?
// If this is by-design, then adding no as callback would cause this to get deleted unintentionally.
delete queue[messageId]
}
})
}
}
// .publish(topic, payload, cb);
if (typeof opts === 'function') {
callback = opts
opts = null
}
_removeTopicAliasAndRecoverTopicName(packet) {
let alias
if (packet.properties) {
alias = packet.properties.topicAlias
}
// default opts
const defaultOpts = { qos: 0, retain: false, dup: false }
opts = xtend(defaultOpts, opts)
let topic = packet.topic.toString()
if (this._checkDisconnecting(callback)) {
return this
}
this.log(
'_removeTopicAliasAndRecoverTopicName :: alias %d, topic %o',
alias,
topic,
)
const that = this
const publishProc = function () {
let messageId = 0
if (opts.qos === 1 || opts.qos === 2) {
messageId = that._nextId()
if (messageId === null) {
debug('No messageId left')
return false
}
}
const packet = {
cmd: 'publish',
topic: topic,
payload: message,
qos: opts.qos,
retain: opts.retain,
messageId: messageId,
dup: opts.dup
}
if (topic.length === 0) {
// restore topic from alias
if (typeof alias === 'undefined') {
return new Error('Unregistered Topic Alias')
}
topic = this.topicAliasSend.getTopicByAlias(alias)
if (typeof topic === 'undefined') {
return new Error('Unregistered Topic Alias')
}
packet.topic = topic
}
if (alias) {
delete packet.properties.topicAlias
}
}
if (options.protocolVersion === 5) {
packet.properties = opts.properties
}
_checkDisconnecting(callback) {
if (this.disconnecting) {
if (callback && callback !== this.noop) {
callback(new Error('client disconnecting'))
} else {
this.emit('error', new Error('client disconnecting'))
}
}
return this.disconnecting
}
debug('publish :: qos', opts.qos)
switch (opts.qos) {
case 1:
case 2:
// Add to callbacks
that.outgoing[packet.messageId] = {
volatile: false,
cb: callback || nop
}
debug('MqttClient:publish: packet cmd: %s', packet.cmd)
that._sendPacket(packet, undefined, opts.cbStorePut)
break
default:
debug('MqttClient:publish: packet cmd: %s', packet.cmd)
that._sendPacket(packet, callback, opts.cbStorePut)
break
}
return true
}
/**
* publish - publish <message> to <topic>
*
* @param {String} topic - topic to publish to
* @param {String, Buffer} message - message to publish
* @param {Object} [opts] - publish options, includes:
* {Number} qos - qos level to publish on
* {Boolean} retain - whether or not to retain the message
* {Boolean} dup - whether or not mark a message as duplicate
* {Function} cbStorePut - function(){} called when message is put into `outgoingStore`
* @param {Function} [callback] - function(err){}
* called when publish succeeds or fails
* @returns {MqttClient} this - for chaining
* @api public
*
* @example client.publish('topic', 'message');
* @example
* client.publish('topic', 'message', {qos: 1, retain: true, dup: true});
* @example client.publish('topic', 'message', console.log);
*/
publish(topic, message, opts, callback) {
this.log('publish :: message `%s` to topic `%s`', message, topic)
const { options } = this
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !publishProc()) {
this._storeProcessingQueue.push(
{
invoke: publishProc,
cbStorePut: opts.cbStorePut,
callback: callback
}
)
}
return this
}
// .publish(topic, payload, cb);
if (typeof opts === 'function') {
callback = opts
opts = null
}
/**
* subscribe - subscribe to <topic>
*
* @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
* @param {Object} [opts] - optional subscription options, includes:
* {Number} qos - subscribe qos level
* @param {Function} [callback] - function(err, granted){} where:
* {Error} err - subscription error (none at the moment!)
* {Array} granted - array of {topic: 't', qos: 0}
* @returns {MqttClient} this - for chaining
* @api public
* @example client.subscribe('topic');
* @example client.subscribe('topic', {qos: 1});
* @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log);
* @example client.subscribe('topic', console.log);
*/
MqttClient.prototype.subscribe = function () {
const that = this
const args = new Array(arguments.length)
for (let i = 0; i < arguments.length; i++) {
args[i] = arguments[i]
}
const subs = []
let obj = args.shift()
const resubscribe = obj.resubscribe
let callback = args.pop() || nop
let opts = args.pop()
const version = this.options.protocolVersion
// default opts
const defaultOpts = { qos: 0, retain: false, dup: false }
opts = { ...defaultOpts, ...opts }
delete obj.resubscribe
if (this._checkDisconnecting(callback)) {
return this
}
if (typeof obj === 'string') {
obj = [obj]
}
const publishProc = () => {
let messageId = 0
if (opts.qos === 1 || opts.qos === 2) {
messageId = this._nextId()
if (messageId === null) {
this.log('No messageId left')
return false
}
}
const packet = {
cmd: 'publish',
topic,
payload: message,
qos: opts.qos,
retain: opts.retain,
messageId,
dup: opts.dup,
}
if (typeof callback !== 'function') {
opts = callback
callback = nop
}
if (options.protocolVersion === 5) {
packet.properties = opts.properties
}
const invalidTopic = validations.validateTopics(obj)
if (invalidTopic !== null) {
setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
return this
}
this.log('publish :: qos', opts.qos)
switch (opts.qos) {
case 1:
case 2:
// Add to callbacks
this.outgoing[packet.messageId] = {
volatile: false,
cb: callback || this.noop,
}
this.log('MqttClient:publish: packet cmd: %s', packet.cmd)
this._sendPacket(packet, undefined, opts.cbStorePut)
break
default:
this.log('MqttClient:publish: packet cmd: %s', packet.cmd)
this._sendPacket(packet, callback, opts.cbStorePut)
break
}
return true
}
if (this._checkDisconnecting(callback)) {
debug('subscribe: discconecting true')
return this
}
if (
this._storeProcessing ||
this._storeProcessingQueue.length > 0 ||
!publishProc()
) {
this._storeProcessingQueue.push({
invoke: publishProc,
cbStorePut: opts.cbStorePut,
callback,
})
}
return this
}
const defaultOpts = {
qos: 0
}
if (version === 5) {
defaultOpts.nl = false
defaultOpts.rap = false
defaultOpts.rh = 0
}
opts = xtend(defaultOpts, opts)
/**
* subscribe - subscribe to <topic>
*
* @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
* @param {Object} [opts] - optional subscription options, includes:
* {Number} qos - subscribe qos level
* @param {Function} [callback] - function(err, granted){} where:
* {Error} err - subscription error (none at the moment!)
* {Array} granted - array of {topic: 't', qos: 0}
* @returns {MqttClient} this - for chaining
* @api public
* @example client.subscribe('topic');
* @example client.subscribe('topic', {qos: 1});
* @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log);
* @example client.subscribe('topic', console.log);
*/
subscribe(...args) {
const subs = []
let obj = args.shift()
const { resubscribe } = obj
let callback = args.pop() || this.noop
let opts = args.pop()
const version = this.options.protocolVersion
if (Array.isArray(obj)) {
obj.forEach(function (topic) {
debug('subscribe: array topic %s', topic)
if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, topic) ||
that._resubscribeTopics[topic].qos < opts.qos ||
resubscribe) {
const currentOpts = {
topic: topic,
qos: opts.qos
}
if (version === 5) {
currentOpts.nl = opts.nl
currentOpts.rap = opts.rap
currentOpts.rh = opts.rh
currentOpts.properties = opts.properties
}
debug('subscribe: pushing topic `%s` and qos `%s` to subs list', currentOpts.topic, currentOpts.qos)
subs.push(currentOpts)
}
})
} else {
Object
.keys(obj)
.forEach(function (k) {
debug('subscribe: object topic %s', k)
if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, k) ||
that._resubscribeTopics[k].qos < obj[k].qos ||
resubscribe) {
const currentOpts = {
topic: k,
qos: obj[k].qos
}
if (version === 5) {
currentOpts.nl = obj[k].nl
currentOpts.rap = obj[k].rap
currentOpts.rh = obj[k].rh
currentOpts.properties = opts.properties
}
debug('subscribe: pushing `%s` to subs list', currentOpts)
subs.push(currentOpts)
}
})
}
delete obj.resubscribe
if (!subs.length) {
callback(null, [])
return this
}
if (typeof obj === 'string') {
obj = [obj]
}
const subscribeProc = function () {
const messageId = that._nextId()
if (messageId === null) {
debug('No messageId left')
return false
}
if (typeof callback !== 'function') {
opts = callback
callback = this.noop
}
const packet = {
cmd: 'subscribe',
subscriptions: subs,
qos: 1,
retain: false,
dup: false,
messageId: messageId
}
const invalidTopic = validations.validateTopics(obj)
if (invalidTopic !== null) {
setImmediate(callback, new Error(`Invalid topic ${invalidTopic}`))
return this
}
if (opts.properties) {
packet.properties = opts.properties
}
if (this._checkDisconnecting(callback)) {
this.log('subscribe: discconecting true')
return this
}
// subscriptions to resubscribe to in case of disconnect
if (that.options.resubscribe) {
debug('subscribe :: resubscribe true')
const topics = []
subs.forEach(function (sub) {
if (that.options.reconnectPeriod > 0) {
const topic = { qos: sub.qos }
if (version === 5) {
topic.nl = sub.nl || false
topic.rap = sub.rap || false
topic.rh = sub.rh || 0
topic.properties = sub.properties
}
that._resubscribeTopics[sub.topic] = topic
topics.push(sub.topic)
}
})
that.messageIdToTopic[packet.messageId] = topics
}
const defaultOpts = {
qos: 0,
}
if (version === 5) {
defaultOpts.nl = false
defaultOpts.rap = false
defaultOpts.rh = 0
}
opts = { ...defaultOpts, ...opts }
that.outgoing[packet.messageId] = {
volatile: true,
cb: function (err, packet) {
if (!err) {
const granted = packet.granted
for (let i = 0; i < granted.length; i += 1) {
subs[i].qos = granted[i]
}
}
const parseSub = (topic, subOptions) => {
// subOptions is defined only when providing a subs map, use opts otherwise
subOptions = subOptions || opts
if (
!Object.prototype.hasOwnProperty.call(
this._resubscribeTopics,
topic,
) ||
this._resubscribeTopics[topic].qos < subOptions.qos ||
resubscribe
) {
const currentOpts = {
topic,
qos: subOptions.qos,
}
if (version === 5) {
currentOpts.nl = subOptions.nl
currentOpts.rap = subOptions.rap
currentOpts.rh = subOptions.rh
// use opts.properties
currentOpts.properties = opts.properties
}
this.log(
'subscribe: pushing topic `%s` and qos `%s` to subs list',
currentOpts.topic,
currentOpts.qos,
)
subs.push(currentOpts)
}
}
callback(err, subs)
}
}
debug('subscribe :: call _sendPacket')
that._sendPacket(packet)
return true
}
if (Array.isArray(obj)) {
// array of topics
obj.forEach((topic) => {
this.log('subscribe: array topic %s', topic)
parseSub(topic)
})
} else {
// object topic --> subOptions (no properties)
Object.keys(obj).forEach((topic) => {
this.log('subscribe: object topic %s, %o', topic, obj[topic])
parseSub(topic, obj[topic])
})
}
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !subscribeProc()) {
this._storeProcessingQueue.push(
{
invoke: subscribeProc,
callback: callback
}
)
}
if (!subs.length) {
callback(null, [])
return this
}
return this
}
const subscribeProc = () => {
const messageId = this._nextId()
if (messageId === null) {
this.log('No messageId left')
return false
}
/**
* unsubscribe - unsubscribe from topic(s)
*
* @param {String, Array} topic - topics to unsubscribe from
* @param {Object} [opts] - optional subscription options, includes:
* {Object} properties - properties of unsubscribe packet
* @param {Function} [callback] - callback fired on unsuback
* @returns {MqttClient} this - for chaining
* @api public
* @example client.unsubscribe('topic');
* @example client.unsubscribe('topic', console.log);
*/
MqttClient.prototype.unsubscribe = function () {
const that = this
const args = new Array(arguments.length)
for (let i = 0; i < arguments.length; i++) {
args[i] = arguments[i]
}
let topic = args.shift()
let callback = args.pop() || nop
let opts = args.pop()
if (typeof topic === 'string') {
topic = [topic]
}
const packet = {
cmd: 'subscribe',
subscriptions: subs,
qos: 1,
retain: false,
dup: false,
messageId,
}
if (typeof callback !== 'function') {
opts = callback
callback = nop
}
if (opts.properties) {
packet.properties = opts.properties
}
const invalidTopic = validations.validateTopics(topic)
if (invalidTopic !== null) {
setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
return this
}
// subscriptions to resubscribe to in case of disconnect
if (this.options.resubscribe) {
this.log('subscribe :: resubscribe true')
const topics = []
subs.forEach((sub) => {
if (this.options.reconnectPeriod > 0) {
const topic = { qos: sub.qos }
if (version === 5) {
topic.nl = sub.nl || false
topic.rap = sub.rap || false
topic.rh = sub.rh || 0
topic.properties = sub.properties
}
this._resubscribeTopics[sub.topic] = topic
topics.push(sub.topic)
}
})
this.messageIdToTopic[packet.messageId] = topics
}
if (that._checkDisconnecting(callback)) {
return this
}
this.outgoing[packet.messageId] = {
volatile: true,
cb(err, packet2) {
if (!err) {
const { granted } = packet2
for (let i = 0; i < granted.length; i += 1) {
subs[i].qos = granted[i]
}
}
const unsubscribeProc = function () {
const messageId = that._nextId()
if (messageId === null) {
debug('No messageId left')
return false
}
const packet = {
cmd: 'unsubscribe',
qos: 1,
messageId: messageId
}
callback(err, subs)
},
}
this.log('subscribe :: call _sendPacket')
this._sendPacket(packet)
return true
}
if (typeof topic === 'string') {
packet.unsubscriptions = [topic]
} else if (Array.isArray(topic)) {
packet.unsubscriptions = topic
}
if (
this._storeProcessing ||
this._storeProcessingQueue.length > 0 ||
!subscribeProc()
) {
this._storeProcessingQueue.push({
invoke: subscribeProc,
callback,
})
}
if (that.options.resubscribe) {
packet.unsubscriptions.forEach(function (topic) {
delete that._resubscribeTopics[topic]
})
}
return this
}
if (typeof opts === 'object' && opts.properties) {
packet.properties = opts.properties
}
/**
* unsubscribe - unsubscribe from topic(s)
*
* @param {String, Array} topic - topics to unsubscribe from
* @param {Object} [opts] - optional subscription options, includes:
* {Object} properties - properties of unsubscribe packet
* @param {Function} [callback] - callback fired on unsuback
* @returns {MqttClient} this - for chaining
* @api public
* @example client.unsubscribe('topic');
* @example client.unsubscribe('topic', console.log);
*/
unsubscribe(...args) {
let topic = args.shift()
let callback = args.pop() || this.noop
let opts = args.pop()
if (typeof topic === 'string') {
topic = [topic]
}
that.outgoing[packet.messageId] = {
volatile: true,
cb: callback
}
if (typeof callback !== 'function') {
opts = callback
callback = this.noop
}
debug('unsubscribe: call _sendPacket')
that._sendPacket(packet)
const invalidTopic = validations.validateTopics(topic)
if (invalidTopic !== null) {
setImmediate(callback, new Error(`Invalid topic ${invalidTopic}`))
return this
}
return true
}
if (this._checkDisconnecting(callback)) {
return this
}
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !unsubscribeProc()) {
this._storeProcessingQueue.push(
{
invoke: unsubscribeProc,
callback: callback
}
)
}
const unsubscribeProc = () => {
const messageId = this._nextId()
if (messageId === null) {
this.log('No messageId left')
return false
}
const packet = {
cmd: 'unsubscribe',
qos: 1,
messageId,
}
return this
}
if (typeof topic === 'string') {
packet.unsubscriptions = [topic]
} else if (Array.isArray(topic)) {
packet.unsubscriptions = topic
}
/**
* end - close connection
*
* @returns {MqttClient} this - for chaining
* @param {Boolean} force - do not wait for all in-flight messages to be acked
* @param {Object} opts - added to the disconnect packet
* @param {Function} cb - called when the client has been closed
*
* @api public
*/
MqttClient.prototype.end = function (force, opts, cb) {
const that = this
if (this.options.resubscribe) {
packet.unsubscriptions.forEach((topic2) => {
delete this._resubscribeTopics[topic2]
})
}
debug('end :: (%s)', this.options.clientId)
if (typeof opts === 'object' && opts.properties) {
packet.properties = opts.properties
}
if (force == null || typeof force !== 'boolean') {
cb = opts || nop
opts = force
force = false
if (typeof opts !== 'object') {
cb = opts
opts = null
if (typeof cb !== 'function') {
cb = nop
}
}
}
this.outgoing[packet.messageId] = {
volatile: true,
cb: callback,
}
if (typeof opts !== 'object') {
cb = opts
opts = null
}
this.log('unsubscribe: call _sendPacket')
this._sendPacket(packet)
debug('end :: cb? %s', !!cb)
cb = cb || nop
return true
}
function closeStores () {
debug('end :: closeStores: closing incoming and outgoing stores')
that.disconnected = true
that.incomingStore.close(function (e1) {
that.outgoingStore.close(function (e2) {
debug('end :: closeStores: emitting end')
that.emit('end')
if (cb) {
const err = e1 || e2
debug('end :: closeStores: invoking callback with args')
cb(err)
}
})
})
if (that._deferredReconnect) {
that._deferredReconnect()
}
}
if (
this._storeProcessing ||
this._storeProcessingQueue.length > 0 ||
!unsubscribeProc()
) {
this._storeProcessingQueue.push({
invoke: unsubscribeProc,
callback,
})
}
function finish () {
// defer closesStores of an I/O cycle,
// just to make sure things are
// ok for websockets
debug('end :: (%s) :: finish :: calling _cleanUp with force %s', that.options.clientId, force)
that._cleanUp(force, () => {
debug('end :: finish :: calling process.nextTick on closeStores')
// const boundProcess = nextTick.bind(null, closeStores)
nextTick(closeStores.bind(that))
}, opts)
}
return this
}
if (this.disconnecting) {
cb()
return this
}
/**
* end - close connection
*
* @returns {MqttClient} this - for chaining
* @param {Boolean} force - do not wait for all in-flight messages to be acked
* @param {Object} opts - added to the disconnect packet
* @param {Function} cb - called when the client has been closed
*
* @api public
*/
end(force, opts, cb) {
this.log('end :: (%s)', this.options.clientId)
this._clearReconnect()
if (force == null || typeof force !== 'boolean') {
cb = opts || this.noop
opts = force
force = false
if (typeof opts !== 'object') {
cb = opts
opts = null
if (typeof cb !== 'function') {
cb = this.noop
}
}
}
this.disconnecting = true
if (typeof opts !== 'object') {
cb = opts
opts = null
}
if (!force && Object.keys(this.outgoing).length > 0) {
// wait 10ms, just to be sure we received all of it
debug('end :: (%s) :: calling finish in 10ms once outgoing is empty', that.options.clientId)
this.once('outgoingEmpty', setTimeout.bind(null, finish, 10))
} else {
debug('end :: (%s) :: immediately calling finish', that.options.clientId)
finish()
}
this.log('end :: cb? %s', !!cb)
cb = cb || this.noop
return this
}
const closeStores = () => {
this.log('end :: closeStores: closing incoming and outgoing stores')
this.disconnected = true
this.incomingStore.close((e1) => {
this.outgoingStore.close((e2) => {
this.log('end :: closeStores: emitting end')
this.emit('end')
if (cb) {
const err = e1 || e2
this.log(
'end :: closeStores: invoking callback with args',
)
cb(err)
}
})
})
if (this._deferredReconnect) {
this._deferredReconnect()
}
}
/**
* removeOutgoingMessage - remove a message in outgoing store
* the outgoing callback will be called withe Error('Message removed') if the message is removed
*
* @param {Number} messageId - messageId to remove message
* @returns {MqttClient} this - for chaining
* @api public
*
* @example client.removeOutgoingMessage(client.getLastAllocated());
*/
MqttClient.prototype.removeOutgoingMessage = function (messageId) {
const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null
delete this.outgoing[messageId]
this.outgoingStore.del({ messageId: messageId }, function () {
cb(new Error('Message removed'))
})
return this
}
const finish = () => {
// defer closesStores of an I/O cycle,
// just to make sure things are
// ok for websockets
this.log(
'end :: (%s) :: finish :: calling _cleanUp with force %s',
this.options.clientId,
force,
)
this._cleanUp(
force,
() => {
this.log(
'end :: finish :: calling process.nextTick on closeStores',
)
// const boundProcess = nextTick.bind(null, closeStores)
nextTick(closeStores)
},
opts,
)
}
/**
* reconnect - connect again using the same options as connect()
*
* @param {Object} [opts] - optional reconnect options, includes:
* {Store} incomingStore - a store for the incoming packets
* {Store} outgoingStore - a store for the outgoing packets
* if opts is not given, current stores are used
* @returns {MqttClient} this - for chaining
*
* @api public
*/
MqttClient.prototype.reconnect = function (opts) {
debug('client reconnect')
const that = this
const f = function () {
if (opts) {
that.options.incomingStore = opts.incomingStore
that.options.outgoingStore = opts.outgoingStore
} else {
that.options.incomingStore = null
that.options.outgoingStore = null
}
that.incomingStore = that.options.incomingStore || new Store()
that.outgoingStore = that.options.outgoingStore || new Store()
that.disconnecting = false
that.disconnected = false
that._deferredReconnect = null
that._reconnect()
}
if (this.disconnecting) {
cb()
return this
}
if (this.disconnecting && !this.disconnected) {
this._deferredReconnect = f
} else {
f()
}
return this
}
this._clearReconnect()
/**
* _reconnect - implement reconnection
* @api privateish
*/
MqttClient.prototype._reconnect = function () {
debug('_reconnect: emitting reconnect to client')
this.emit('reconnect')
if (this.connected) {
this.end(() => { this._setupStream() })
debug('client already connected. disconnecting first.')
} else {
debug('_reconnect: calling _setupStream')
this._setupStream()
}
}
this.disconnecting = true
/**
* _setupReconnect - setup reconnect timer
*/
MqttClient.prototype._setupReconnect = function () {
const that = this
if (!force && Object.keys(this.outgoing).length > 0) {
// wait 10ms, just to be sure we received all of it
this.log(
'end :: (%s) :: calling finish in 10ms once outgoing is empty',
this.options.clientId,
)
this.once('outgoingEmpty', setTimeout.bind(null, finish, 10))
} else {
this.log(
'end :: (%s) :: immediately calling finish',
this.options.clientId,
)
finish()
}
if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) {
if (!this.reconnecting) {
debug('_setupReconnect :: emit `offline` state')
this.emit('offline')
debug('_setupReconnect :: set `reconnecting` to `true`')
this.reconnecting = true
}
debug('_setupReconnect :: setting reconnectTimer for %d ms', that.options.reconnectPeriod)
that.reconnectTimer = setInterval(function () {
debug('reconnectTimer :: reconnect triggered!')
that._reconnect()
}, that.options.reconnectPeriod)
} else {
debug('_setupReconnect :: doing nothing...')
}
}
return this
}
/**
* _clearReconnect - clear the reconnect timer
*/
MqttClient.prototype._clearReconnect = function () {
debug('_clearReconnect : clearing reconnect timer')
if (this.reconnectTimer) {
clearInterval(this.reconnectTimer)
this.reconnectTimer = null
}
}
/**
* removeOutgoingMessage - remove a message in outgoing store
* the outgoing callback will be called withe Error('Message removed') if the message is removed
*
* @param {Number} messageId - messageId to remove message
* @returns {MqttClient} this - for chaining
* @api public
*
* @example client.removeOutgoingMessage(client.getLastAllocated());
*/
removeOutgoingMessage(messageId) {
if (this.outgoing[messageId]) {
const { cb } = this.outgoing[messageId]
this._removeOutgoingAndStoreMessage(messageId, () => {
cb(new Error('Message removed'))
})
}
return this
}
/**
* _cleanUp - clean up on connection end
* @api private
*/
MqttClient.prototype._cleanUp = function (forced, done) {
const opts = arguments[2]
if (done) {
debug('_cleanUp :: done callback provided for on stream close')
this.stream.on('close', done)
}
/**
* reconnect - connect again using the same options as connect()
*
* @param {Object} [opts] - optional reconnect options, includes:
* {Store} incomingStore - a store for the incoming packets
* {Store} outgoingStore - a store for the outgoing packets
* if opts is not given, current stores are used
* @returns {MqttClient} this - for chaining
*
* @api public
*/
reconnect(opts) {
this.log('client reconnect')
const f = () => {
if (opts) {
this.options.incomingStore = opts.incomingStore
this.options.outgoingStore = opts.outgoingStore
} else {
this.options.incomingStore = null
this.options.outgoingStore = null
}
this.incomingStore = this.options.incomingStore || new Store()
this.outgoingStore = this.options.outgoingStore || new Store()
this.disconnecting = false
this.disconnected = false
this._deferredReconnect = null
this._reconnect()
}
debug('_cleanUp :: forced? %s', forced)
if (forced) {
if ((this.options.reconnectPeriod === 0) && this.options.clean) {
flush(this.outgoing)
}
debug('_cleanUp :: (%s) :: destroying stream', this.options.clientId)
this.stream.destroy()
} else {
const packet = xtend({ cmd: 'disconnect' }, opts)
debug('_cleanUp :: (%s) :: call _sendPacket with disconnect packet', this.options.clientId)
this._sendPacket(
packet,
setImmediate.bind(
null,
this.stream.end.bind(this.stream)
)
)
}
if (this.disconnecting && !this.disconnected) {
this._deferredReconnect = f
} else {
f()
}
return this
}
if (!this.disconnecting) {
debug('_cleanUp :: client not disconnecting. Clearing and resetting reconnect.')
this._clearReconnect()
this._setupReconnect()
}
/**
* _reconnect - implement reconnection
* @api privateish
*/
_reconnect() {
this.log('_reconnect: emitting reconnect to client')
this.emit('reconnect')
if (this.connected) {
this.end(() => {
this.connect()
})
this.log('client already connected. disconnecting first.')
} else {
this.log('_reconnect: calling connect')
this.connect()
}
}
if (this.pingTimer !== null) {
debug('_cleanUp :: clearing pingTimer')
this.pingTimer.clear()
this.pingTimer = null
}
/**
* _setupReconnect - setup reconnect timer
*/
_setupReconnect() {
if (
!this.disconnecting &&
!this.reconnectTimer &&
this.options.reconnectPeriod > 0
) {
if (!this.reconnecting) {
this.log('_setupReconnect :: emit `offline` state')
this.emit('offline')
this.log('_setupReconnect :: set `reconnecting` to `true`')
this.reconnecting = true
}
this.log(
'_setupReconnect :: setting reconnectTimer for %d ms',
this.options.reconnectPeriod,
)
this.reconnectTimer = setInterval(() => {
this.log('reconnectTimer :: reconnect triggered!')
this._reconnect()
}, this.options.reconnectPeriod)
} else {
this.log('_setupReconnect :: doing nothing...')
}
}
if (done && !this.connected) {
debug('_cleanUp :: (%s) :: removing stream `done` callback `close` listener', this.options.clientId)
this.stream.removeListener('close', done)
done()
}
}
/**
* _clearReconnect - clear the reconnect timer
*/
_clearReconnect() {
this.log('_clearReconnect : clearing reconnect timer')
if (this.reconnectTimer) {
clearInterval(this.reconnectTimer)
this.reconnectTimer = null
}
}
/**
* _sendPacket - send or queue a packet
* @param {Object} packet - packet options
* @param {Function} cb - callback when the packet is sent
* @param {Function} cbStorePut - called when message is put into outgoingStore
* @param {Boolean} noStore - send without put to the store
* @api private
*/
MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut, noStore) {
debug('_sendPacket :: (%s) :: start', this.options.clientId)
cbStorePut = cbStorePut || nop
cb = cb || nop
/**
* _cleanUp - clean up on connection end
* @api private
*/
_cleanUp(forced, done, opts = {}) {
if (done) {
this.log('_cleanUp :: done callback provided for on stream close')
this.stream.on('close', done)
}
const err = applyTopicAlias(this, packet)
if (err) {
cb(err)
return
}
this.log('_cleanUp :: forced? %s', forced)
if (forced) {
if (this.options.reconnectPeriod === 0 && this.options.clean) {
this._flush(this.outgoing)
}
this.log(
'_cleanUp :: (%s) :: destroying stream',
this.options.clientId,
)
this.stream.destroy()
} else {
const packet = { cmd: 'disconnect', ...opts }
this.log(
'_cleanUp :: (%s) :: call _sendPacket with disconnect packet',
this.options.clientId,
)
this._sendPacket(packet, () => {
this.log(
'_cleanUp :: (%s) :: destroying stream',
this.options.clientId,
)
setImmediate(() => {
this.stream.end(() => {
this.log(
'_cleanUp :: (%s) :: stream destroyed',
this.options.clientId,
)
// once stream is closed the 'close' event will fire and that will
// emit client `close` event and call `done` callback if done is provided
})
})
})
}
if (!this.connected) {
// allow auth packets to be sent while authenticating with the broker (mqtt5 enhanced auth)
if (packet.cmd === 'auth') {
this._shiftPingInterval()
sendPacket(this, packet, cb)
return
}
if (!this.disconnecting) {
this.log(
'_cleanUp :: client not disconnecting. Clearing and resetting reconnect.',
)
this._clearReconnect()
this._setupReconnect()
}
debug('_sendPacket :: client not connected. Storing packet offline.')
this._storePacket(packet, cb, cbStorePut)
return
}
if (this.pingTimer !== null) {
this.log('_cleanUp :: clearing pingTimer')
this.pingTimer.clear()
this.pingTimer = null
}
// When sending a packet, reschedule the ping timer
this._shiftPingInterval()
if (done && !this.connected) {
this.log(
'_cleanUp :: (%s) :: removing stream `done` callback `close` listener',
this.options.clientId,
)
this.stream.removeListener('close', done)
done()
}
}
// If "noStore" is true, the message is sent without being recorded in the store.
// Messages that have not received puback or pubcomp remain in the store after disconnection
// and are resent from the store upon reconnection.
// For resend upon reconnection, "noStore" is set to true. This is because the message is already stored in the store.
// This is to avoid interrupting other processes while recording to the store.
if (noStore) {
sendPacket(this, packet, cb)
return
}
_storeAndSend(packet, cb, cbStorePut) {
this.log(
'storeAndSend :: store packet with cmd %s to outgoingStore',
packet.cmd,
)
let storePacket = packet
let err
if (storePacket.cmd === 'publish') {
// The original packet is for sending.
// The cloned storePacket is for storing to resend on reconnect.
// Topic Alias must not be used after disconnected.
storePacket = clone(packet)
err = this._removeTopicAliasAndRecoverTopicName(storePacket)
if (err) {
return cb && cb(err)
}
}
this.outgoingStore.put(storePacket, (err2) => {
if (err2) {
return cb && cb(err2)
}
cbStorePut()
this._writePacket(packet, cb)
})
}
switch (packet.cmd) {
case 'publish':
break
case 'pubrel':
storeAndSend(this, packet, cb, cbStorePut)
return
default:
sendPacket(this, packet, cb)
return
}
_applyTopicAlias(packet) {
if (this.options.protocolVersion === 5) {
if (packet.cmd === 'publish') {
let alias
if (packet.properties) {
alias = packet.properties.topicAlias
}
const topic = packet.topic.toString()
if (this.topicAliasSend) {
if (alias) {
if (topic.length !== 0) {
// register topic alias
this.log(
'applyTopicAlias :: register topic: %s - alias: %d',
topic,
alias,
)
if (!this.topicAliasSend.put(topic, alias)) {
this.log(
'applyTopicAlias :: error out of range. topic: %s - alias: %d',
topic,
alias,
)
return new Error(
'Sending Topic Alias out of range',
)
}
}
} else if (topic.length !== 0) {
if (this.options.autoAssignTopicAlias) {
alias = this.topicAliasSend.getAliasByTopic(topic)
if (alias) {
packet.topic = ''
packet.properties = {
...packet.properties,
topicAlias: alias,
}
this.log(
'applyTopicAlias :: auto assign(use) topic: %s - alias: %d',
topic,
alias,
)
} else {
alias = this.topicAliasSend.getLruAlias()
this.topicAliasSend.put(topic, alias)
packet.properties = {
...packet.properties,
topicAlias: alias,
}
this.log(
'applyTopicAlias :: auto assign topic: %s - alias: %d',
topic,
alias,
)
}
} else if (this.options.autoUseTopicAlias) {
alias = this.topicAliasSend.getAliasByTopic(topic)
if (alias) {
packet.topic = ''
packet.properties = {
...packet.properties,
topicAlias: alias,
}
this.log(
'applyTopicAlias :: auto use topic: %s - alias: %d',
topic,
alias,
)
}
}
}
} else if (alias) {
this.log(
'applyTopicAlias :: error out of range. topic: %s - alias: %d',
topic,
alias,
)
return new Error('Sending Topic Alias out of range')
}
}
}
}
switch (packet.qos) {
case 2:
case 1:
storeAndSend(this, packet, cb, cbStorePut)
break
/**
* no need of case here since it will be caught by default
* and jshint comply that before default it must be a break
* anyway it will result in -1 evaluation
*/
case 0:
/* falls through */
default:
sendPacket(this, packet, cb)
break
}
debug('_sendPacket :: (%s) :: end', this.options.clientId)
}
_noop(err) {
this.log('noop ::', err)
}
/**
* _storePacket - queue a packet
* @param {Object} packet - packet options
* @param {Function} cb - callback when the packet is sent
* @param {Function} cbStorePut - called when message is put into outgoingStore
* @api private
*/
MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
debug('_storePacket :: packet: %o', packet)
debug('_storePacket :: cb? %s', !!cb)
cbStorePut = cbStorePut || nop
/** Writes the packet to stream and emits events */
_writePacket(packet, cb) {
this.log('_writePacket :: packet: %O', packet)
this.log('_writePacket :: emitting `packetsend`')
let storePacket = packet
if (storePacket.cmd === 'publish') {
// The original packet is for sending.
// The cloned storePacket is for storing to resend on reconnect.
// Topic Alias must not be used after disconnected.
storePacket = clone(packet)
const err = removeTopicAliasAndRecoverTopicName(this, storePacket)
if (err) {
return cb && cb(err)
}
}
// check that the packet is not a qos of 0, or that the command is not a publish
if (((storePacket.qos || 0) === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') {
this.queue.push({ packet: storePacket, cb: cb })
} else if (storePacket.qos > 0) {
cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null
this.outgoingStore.put(storePacket, function (err) {
if (err) {
return cb && cb(err)
}
cbStorePut()
})
} else if (cb) {
cb(new Error('No connection to broker'))
}
}
this.emit('packetsend', packet)
/**
* _setupPingTimer - setup the ping timer
*
* @api private
*/
MqttClient.prototype._setupPingTimer = function () {
debug('_setupPingTimer :: keepalive %d (seconds)', this.options.keepalive)
const that = this
// When writing a packet, reschedule the ping timer
this._shiftPingInterval()
if (!this.pingTimer && this.options.keepalive) {
this.pingResp = true
this.pingTimer = reInterval(function () {
that._checkPing()
}, this.options.keepalive * 1000)
}
}
this.log('_writePacket :: writing to stream')
const result = mqttPacket.writeToStream(
packet,
this.stream,
this.options,
)
this.log('_writePacket :: writeToStream result %s', result)
if (!result && cb && cb !== this.noop) {
this.log(
'_writePacket :: handle events on `drain` once through callback.',
)
this.stream.once('drain', cb)
} else if (cb) {
this.log('_writePacket :: invoking cb')
cb()
}
}
/**
* _shiftPingInterval - reschedule the ping interval
*
* @api private
*/
MqttClient.prototype._shiftPingInterval = function () {
if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) {
this.pingTimer.reschedule(this.options.keepalive * 1000)
}
}
/**
* _checkPing - check if a pingresp has come back, and ping the server again
*
* @api private
*/
MqttClient.prototype._checkPing = function () {
debug('_checkPing :: checking ping...')
if (this.pingResp) {
debug('_checkPing :: ping response received. Clearing flag and sending `pingreq`')
this.pingResp = false
this._sendPacket({ cmd: 'pingreq' })
} else {
// do a forced cleanup since socket will be in bad shape
debug('_checkPing :: calling _cleanUp with force true')
this._cleanUp(true)
}
}
/**
* _sendPacket - send or queue a packet
* @param {Object} packet - packet options
* @param {Function} cb - callback when the packet is sent
* @param {Function} cbStorePut - called when message is put into outgoingStore
* @param {Boolean} noStore - send without put to the store
* @api private
*/
_sendPacket(packet, cb, cbStorePut, noStore) {
this.log('_sendPacket :: (%s) :: start', this.options.clientId)
cbStorePut = cbStorePut || this.noop
cb = cb || this.noop
/**
* _handlePingresp - handle a pingresp
*
* @api private
*/
MqttClient.prototype._handlePingresp = function () {
this.pingResp = true
}
const err = this._applyTopicAlias(packet)
if (err) {
cb(err)
return
}
/**
* _handleConnack
*
* @param {Object} packet
* @api private
*/
MqttClient.prototype._handleConnack = function (packet) {
debug('_handleConnack')
const options = this.options
const version = options.protocolVersion
const rc = version === 5 ? packet.reasonCode : packet.returnCode
if (!this.connected) {
// allow auth packets to be sent while authenticating with the broker (mqtt5 enhanced auth)
if (packet.cmd === 'auth') {
this._writePacket(this, packet, cb)
return
}
clearTimeout(this.connackTimer)
delete this.topicAliasSend
this.log(
'_sendPacket :: client not connected. Storing packet offline.',
)
this._storePacket(packet, cb, cbStorePut)
return
}
if (packet.properties) {
if (packet.properties.topicAliasMaximum) {
if (packet.properties.topicAliasMaximum > 0xffff) {
this.emit('error', new Error('topicAliasMaximum from broker is out of range'))
return
}
if (packet.properties.topicAliasMaximum > 0) {
this.topicAliasSend = new TopicAliasSend(packet.properties.topicAliasMaximum)
}
}
if (packet.properties.serverKeepAlive && options.keepalive) {
options.keepalive = packet.properties.serverKeepAlive
this._shiftPingInterval()
}
if (packet.properties.maximumPacketSize) {
if (!options.properties) { options.properties = {} }
options.properties.maximumPacketSize = packet.properties.maximumPacketSize
}
}
// If "noStore" is true, the message is sent without being recorded in the store.
// Messages that have not received puback or pubcomp remain in the store after disconnection
// and are resent from the store upon reconnection.
// For resend upon reconnection, "noStore" is set to true. This is because the message is already stored in the store.
// This is to avoid interrupting other processes while recording to the store.
if (noStore) {
this._writePacket(packet, cb)
return
}
if (rc === 0) {
this.reconnecting = false
this._onConnect(packet)
} else if (rc > 0) {
const err = new Error('Connection refused: ' + errors[rc])
err.code = rc
this.emit('error', err)
}
}
switch (packet.cmd) {
case 'publish':
break
case 'pubrel':
this._storeAndSend(packet, cb, cbStorePut)
return
default:
this._writePacket(packet, cb)
return
}
MqttClient.prototype._handleAuth = function (packet) {
const options = this.options
const version = options.protocolVersion
const rc = version === 5 ? packet.reasonCode : packet.returnCode
switch (packet.qos) {
case 2:
case 1:
this._storeAndSend(packet, cb, cbStorePut)
break
/**
* no need of case here since it will be caught by default
* and jshint comply that before default it must be a break
* anyway it will result in -1 evaluation
*/
case 0:
/* falls through */
default:
this._writePacket(packet, cb)
break
}
this.log('_sendPacket :: (%s) :: end', this.options.clientId)
}
if (version !== 5) {
const err = new Error('Protocol error: Auth packets are only supported in MQTT 5. Your version:' + version)
err.code = rc
this.emit('error', err)
return
}
/**
* _storePacket - queue a packet
* @param {Object} packet - packet options
* @param {Function} cb - callback when the packet is sent
* @param {Function} cbStorePut - called when message is put into outgoingStore
* @api private
*/
_storePacket(packet, cb, cbStorePut) {
this.log('_storePacket :: packet: %o', packet)
this.log('_storePacket :: cb? %s', !!cb)
cbStorePut = cbStorePut || this.noop
const that = this
this.handleAuth(packet, function (err, packet) {
if (err) {
that.emit('error', err)
return
}
let storePacket = packet
if (storePacket.cmd === 'publish') {
// The original packet is for sending.
// The cloned storePacket is for storing to resend on reconnect.
// Topic Alias must not be used after disconnected.
storePacket = clone(packet)
const err = this._removeTopicAliasAndRecoverTopicName(storePacket)
if (err) {
return cb && cb(err)
}
}
// check that the packet is not a qos of 0, or that the command is not a publish
if (
((storePacket.qos || 0) === 0 && this.queueQoSZero) ||
storePacket.cmd !== 'publish'
) {
this.queue.push({ packet: storePacket, cb })
} else if (storePacket.qos > 0) {
cb = this.outgoing[storePacket.messageId]
? this.outgoing[storePacket.messageId].cb
: null
this.outgoingStore.put(storePacket, (err) => {
if (err) {
return cb && cb(err)
}
cbStorePut()
})
} else if (cb) {
cb(new Error('No connection to broker'))
}
}
if (rc === 24) {
that.reconnecting = false
that._sendPacket(packet)
} else {
const error = new Error('Connection refused: ' + errors[rc])
err.code = rc
that.emit('error', error)
}
})
}
/**
* _setupPingTimer - setup the ping timer
*
* @api private
*/
_setupPingTimer() {
this.log(
'_setupPingTimer :: keepalive %d (seconds)',
this.options.keepalive,
)
/**
* @param packet the packet received by the broker
* @return the auth packet to be returned to the broker
* @api public
*/
MqttClient.prototype.handleAuth = function (packet, callback) {
callback()
}
if (!this.pingTimer && this.options.keepalive) {
this.pingResp = true
this.pingTimer = reInterval(() => {
this._checkPing()
}, this.options.keepalive * 1000)
}
}
/**
* _handlePublish
*
* @param {Object} packet
* @api private
*/
/*
those late 2 case should be rewrite to comply with coding style:
/**
* _shiftPingInterval - reschedule the ping interval
*
* @api private
*/
_shiftPingInterval() {
if (
this.pingTimer &&
this.options.keepalive &&
this.options.reschedulePings
) {
this.pingTimer.reschedule(this.options.keepalive * 1000)
}
}
case 1:
case 0:
// do not wait sending a puback
// no callback passed
if (1 === qos) {
this._sendPacket({
cmd: 'puback',
messageId: messageId
});
}
// emit the message event for both qos 1 and 0
this.emit('message', topic, message, packet);
this.handleMessage(packet, done);
break;
default:
// do nothing but every switch mus have a default
// log or throw an error about unknown qos
break;
/**
* _checkPing - check if a pingresp has come back, and ping the server again
*
* @api private
*/
_checkPing() {
this.log('_checkPing :: checking ping...')
if (this.pingResp) {
this.log(
'_checkPing :: ping response received. Clearing flag and sending `pingreq`',
)
this.pingResp = false
this._sendPacket({ cmd: 'pingreq' })
} else {
// do a forced cleanup since socket will be in bad shape
this.log('_checkPing :: calling _cleanUp with force true')
this._cleanUp(true)
}
}
for now i just suppressed the warnings
*/
MqttClient.prototype._handlePublish = function (packet, done) {
debug('_handlePublish: packet %o', packet)
done = typeof done !== 'undefined' ? done : nop
let topic = packet.topic.toString()
const message = packet.payload
const qos = packet.qos
const messageId = packet.messageId
const that = this
const options = this.options
const validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]
if (this.options.protocolVersion === 5) {
let alias
if (packet.properties) {
alias = packet.properties.topicAlias
}
if (typeof alias !== 'undefined') {
if (topic.length === 0) {
if (alias > 0 && alias <= 0xffff) {
const gotTopic = this.topicAliasRecv.getTopicByAlias(alias)
if (gotTopic) {
topic = gotTopic
debug('_handlePublish :: topic complemented by alias. topic: %s - alias: %d', topic, alias)
} else {
debug('_handlePublish :: unregistered topic alias. alias: %d', alias)
this.emit('error', new Error('Received unregistered Topic Alias'))
return
}
} else {
debug('_handlePublish :: topic alias out of range. alias: %d', alias)
this.emit('error', new Error('Received Topic Alias is out of range'))
return
}
} else {
if (this.topicAliasRecv.put(topic, alias)) {
debug('_handlePublish :: registered topic: %s - alias: %d', topic, alias)
} else {
debug('_handlePublish :: topic alias out of range. alias: %d', alias)
this.emit('error', new Error('Received Topic Alias is out of range'))
return
}
}
}
}
debug('_handlePublish: qos %d', qos)
switch (qos) {
case 2: {
options.customHandleAcks(topic, message, packet, function (error, code) {
if (!(error instanceof Error)) {
code = error
error = null
}
if (error) { return that.emit('error', error) }
if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) }
if (code) {
that._sendPacket({ cmd: 'pubrec', messageId: messageId, reasonCode: code }, done)
} else {
that.incomingStore.put(packet, function () {
that._sendPacket({ cmd: 'pubrec', messageId: messageId }, done)
})
}
})
break
}
case 1: {
// emit the message event
options.customHandleAcks(topic, message, packet, function (error, code) {
if (!(error instanceof Error)) {
code = error
error = null
}
if (error) { return that.emit('error', error) }
if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) }
if (!code) { that.emit('message', topic, message, packet) }
that.handleMessage(packet, function (err) {
if (err) {
return done && done(err)
}
that._sendPacket({ cmd: 'puback', messageId: messageId, reasonCode: code }, done)
})
})
break
}
case 0:
// emit the message event
this.emit('message', topic, message, packet)
this.handleMessage(packet, done)
break
default:
// do nothing
debug('_handlePublish: unknown QoS. Doing nothing.')
// log or throw an error about unknown qos
break
}
}
/**
* @param packet the packet received by the broker
* @return the auth packet to be returned to the broker
* @api public
*/
handleAuth(packet, callback) {
callback()
}
/**
* Handle messages with backpressure support, one at a time.
* Override at will.
*
* @param Packet packet the packet
* @param Function callback call when finished
* @api public
*/
MqttClient.prototype.handleMessage = function (packet, callback) {
callback()
}
/**
* Handle messages with backpressure support, one at a time.
* Override at will.
*
* @param Packet packet the packet
* @param Function callback call when finished
* @api public
*/
handleMessage(packet, callback) {
callback()
}
/**
* _handleAck
*
* @param {Object} packet
* @api private
*/
/**
* _nextId
* @return unsigned int
*/
_nextId() {
return this.messageIdProvider.allocate()
}
MqttClient.prototype._handleAck = function (packet) {
/* eslint no-fallthrough: "off" */
const messageId = packet.messageId
const type = packet.cmd
let response = null
const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null
const that = this
let err
/**
* getLastMessageId
* @return unsigned int
*/
getLastMessageId() {
return this.messageIdProvider.getLastAllocated()
}
// Checking `!cb` happens to work, but it's not technically "correct".
//
// Why? This code assumes that "no callback" is the same as that "we're not
// waiting for responses" (puback, pubrec, pubcomp, suback, or unsuback).
//
// It would be better to check `if (!this.outgoing[messageId])` here, but
// there's no reason to change it and risk (another) regression.
//
// The only reason this code works is becaues code in MqttClient.publish,
// MqttClinet.subscribe, and MqttClient.unsubscribe ensures that we will
// have a callback even if the user doesn't pass one in.)
if (!cb) {
debug('_handleAck :: Server sent an ack in error. Ignoring.')
// Server sent an ack in error, ignore it.
return
}
/**
* _resubscribe
* @api private
*/
_resubscribe() {
this.log('_resubscribe')
const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics)
if (
!this._firstConnection &&
(this.options.clean ||
(this.options.protocolVersion === 5 &&
!this.connackPacket.sessionPresent)) &&
_resubscribeTopicsKeys.length > 0
) {
if (this.options.resubscribe) {
if (this.options.protocolVersion === 5) {
this.log('_resubscribe: protocolVersion 5')
for (
let topicI = 0;
topicI < _resubscribeTopicsKeys.length;
topicI++
) {
const resubscribeTopic = {}
resubscribeTopic[_resubscribeTopicsKeys[topicI]] =
this._resubscribeTopics[
_resubscribeTopicsKeys[topicI]
]
resubscribeTopic.resubscribe = true
this.subscribe(resubscribeTopic, {
properties:
resubscribeTopic[_resubscribeTopicsKeys[topicI]]
.properties,
})
}
} else {
this._resubscribeTopics.resubscribe = true
this.subscribe(this._resubscribeTopics)
}
} else {
this._resubscribeTopics = {}
}
}
// Process
debug('_handleAck :: packet type', type)
switch (type) {
case 'pubcomp':
// same thing as puback for QoS 2
case 'puback': {
const pubackRC = packet.reasonCode
// Callback - we're done
if (pubackRC && pubackRC > 0 && pubackRC !== 16) {
err = new Error('Publish error: ' + errors[pubackRC])
err.code = pubackRC
cb(err, packet)
}
delete this.outgoing[messageId]
this.outgoingStore.del(packet, cb)
this.messageIdProvider.deallocate(messageId)
this._invokeStoreProcessingQueue()
break
}
case 'pubrec': {
response = {
cmd: 'pubrel',
qos: 2,
messageId: messageId
}
const pubrecRC = packet.reasonCode
this._firstConnection = false
}
if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) {
err = new Error('Publish error: ' + errors[pubrecRC])
err.code = pubrecRC
cb(err, packet)
} else {
this._sendPacket(response)
}
break
}
case 'suback': {
delete this.outgoing[messageId]
this.messageIdProvider.deallocate(messageId)
for (let grantedI = 0; grantedI < packet.granted.length; grantedI++) {
if ((packet.granted[grantedI] & 0x80) !== 0) {
// suback with Failure status
const topics = this.messageIdToTopic[messageId]
if (topics) {
topics.forEach(function (topic) {
delete that._resubscribeTopics[topic]
})
}
}
}
delete this.messageIdToTopic[messageId]
this._invokeStoreProcessingQueue()
cb(null, packet)
break
}
case 'unsuback': {
delete this.outgoing[messageId]
this.messageIdProvider.deallocate(messageId)
this._invokeStoreProcessingQueue()
cb(null)
break
}
default:
that.emit('error', new Error('unrecognized packet type'))
}
/**
* _onConnect
*
* @api private
*/
_onConnect(packet) {
if (this.disconnected) {
this.emit('connect', packet)
return
}
if (this.disconnecting &&
Object.keys(this.outgoing).length === 0) {
this.emit('outgoingEmpty')
}
}
this.connackPacket = packet
this.messageIdProvider.clear()
this._setupPingTimer()
/**
* _handlePubrel
*
* @param {Object} packet
* @api private
*/
MqttClient.prototype._handlePubrel = function (packet, callback) {
debug('handling pubrel packet')
callback = typeof callback !== 'undefined' ? callback : nop
const messageId = packet.messageId
const that = this
this.connected = true
const comp = { cmd: 'pubcomp', messageId: messageId }
const startStreamProcess = () => {
let outStore = this.outgoingStore.createStream()
that.incomingStore.get(packet, function (err, pub) {
if (!err) {
that.emit('message', pub.topic, pub.payload, pub)
that.handleMessage(pub, function (err) {
if (err) {
return callback(err)
}
that.incomingStore.del(pub, nop)
that._sendPacket(comp, callback)
})
} else {
that._sendPacket(comp, callback)
}
})
}
const remove = () => {
outStore.destroy()
outStore = null
this._flushStoreProcessingQueue()
clearStoreProcessing()
}
/**
* _handleDisconnect
*
* @param {Object} packet
* @api private
*/
MqttClient.prototype._handleDisconnect = function (packet) {
this.emit('disconnect', packet)
}
const clearStoreProcessing = () => {
this._storeProcessing = false
this._packetIdsDuringStoreProcessing = {}
}
/**
* _nextId
* @return unsigned int
*/
MqttClient.prototype._nextId = function () {
return this.messageIdProvider.allocate()
}
this.once('close', remove)
outStore.on('error', (err) => {
clearStoreProcessing()
this._flushStoreProcessingQueue()
this.removeListener('close', remove)
this.emit('error', err)
})
/**
* getLastMessageId
* @return unsigned int
*/
MqttClient.prototype.getLastMessageId = function () {
return this.messageIdProvider.getLastAllocated()
}
const storeDeliver = () => {
// edge case, we wrapped this twice
if (!outStore) {
return
}
/**
* _resubscribe
* @api private
*/
MqttClient.prototype._resubscribe = function () {
debug('_resubscribe')
const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics)
if (!this._firstConnection &&
(this.options.clean || (this.options.protocolVersion === 5 && !this.connackPacket.sessionPresent)) &&
_resubscribeTopicsKeys.length > 0) {
if (this.options.resubscribe) {
if (this.options.protocolVersion === 5) {
debug('_resubscribe: protocolVersion 5')
for (let topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) {
const resubscribeTopic = {}
resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]]
resubscribeTopic.resubscribe = true
this.subscribe(resubscribeTopic, { properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties })
}
} else {
this._resubscribeTopics.resubscribe = true
this.subscribe(this._resubscribeTopics)
}
} else {
this._resubscribeTopics = {}
}
}
const packet2 = outStore.read(1)
this._firstConnection = false
}
let cb
/**
* _onConnect
*
* @api private
*/
MqttClient.prototype._onConnect = function (packet) {
if (this.disconnected) {
this.emit('connect', packet)
return
}
if (!packet2) {
// read when data is available in the future
outStore.once('readable', storeDeliver)
return
}
const that = this
this._storeProcessing = true
this.connackPacket = packet
this.messageIdProvider.clear()
this._setupPingTimer()
// Skip already processed store packets
if (this._packetIdsDuringStoreProcessing[packet2.messageId]) {
storeDeliver()
return
}
this.connected = true
// Avoid unnecessary stream read operations when disconnected
if (!this.disconnecting && !this.reconnectTimer) {
cb = this.outgoing[packet2.messageId]
? this.outgoing[packet2.messageId].cb
: null
this.outgoing[packet2.messageId] = {
volatile: false,
cb(err, status) {
// Ensure that the original callback passed in to publish gets invoked
if (cb) {
cb(err, status)
}
function startStreamProcess () {
let outStore = that.outgoingStore.createStream()
storeDeliver()
},
}
this._packetIdsDuringStoreProcessing[
packet2.messageId
] = true
if (this.messageIdProvider.register(packet2.messageId)) {
this._sendPacket(packet2, undefined, undefined, true)
} else {
this.log(
'messageId: %d has already used.',
packet2.messageId,
)
}
} else if (outStore.destroy) {
outStore.destroy()
}
}
function clearStoreProcessing () {
that._storeProcessing = false
that._packetIdsDuringStoreProcessing = {}
}
outStore.on('end', () => {
let allProcessed = true
for (const id in this._packetIdsDuringStoreProcessing) {
if (!this._packetIdsDuringStoreProcessing[id]) {
allProcessed = false
break
}
}
if (allProcessed) {
clearStoreProcessing()
this.removeListener('close', remove)
this._invokeAllStoreProcessingQueue()
this.emit('connect', packet)
} else {
startStreamProcess()
}
})
storeDeliver()
}
// start flowing
startStreamProcess()
}
that.once('close', remove)
outStore.on('error', function (err) {
clearStoreProcessing()
that._flushStoreProcessingQueue()
that.removeListener('close', remove)
that.emit('error', err)
})
_invokeStoreProcessingQueue() {
// If _storeProcessing is true, the message is resending.
// During resend, processing is skipped to prevent new messages from interrupting. #1635
if (!this._storeProcessing && this._storeProcessingQueue.length > 0) {
const f = this._storeProcessingQueue[0]
if (f && f.invoke()) {
this._storeProcessingQueue.shift()
return true
}
}
return false
}
function remove () {
outStore.destroy()
outStore = null
that._flushStoreProcessingQueue()
clearStoreProcessing()
}
_invokeAllStoreProcessingQueue() {
while (this._invokeStoreProcessingQueue()) {
/* empty */
}
}
function storeDeliver () {
// edge case, we wrapped this twice
if (!outStore) {
return
}
_flushStoreProcessingQueue() {
for (const f of this._storeProcessingQueue) {
if (f.cbStorePut) f.cbStorePut(new Error('Connection closed'))
if (f.callback) f.callback(new Error('Connection closed'))
}
this._storeProcessingQueue.splice(0)
}
const packet = outStore.read(1)
let cb
if (!packet) {
// read when data is available in the future
outStore.once('readable', storeDeliver)
return
}
that._storeProcessing = true
// Skip already processed store packets
if (that._packetIdsDuringStoreProcessing[packet.messageId]) {
storeDeliver()
return
}
// Avoid unnecessary stream read operations when disconnected
if (!that.disconnecting && !that.reconnectTimer) {
cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null
that.outgoing[packet.messageId] = {
volatile: false,
cb: function (err, status) {
// Ensure that the original callback passed in to publish gets invoked
if (cb) {
cb(err, status)
}
storeDeliver()
}
}
that._packetIdsDuringStoreProcessing[packet.messageId] = true
if (that.messageIdProvider.register(packet.messageId)) {
that._sendPacket(packet, undefined, undefined, true)
} else {
debug('messageId: %d has already used.', packet.messageId)
}
} else if (outStore.destroy) {
outStore.destroy()
}
}
outStore.on('end', function () {
let allProcessed = true
for (const id in that._packetIdsDuringStoreProcessing) {
if (!that._packetIdsDuringStoreProcessing[id]) {
allProcessed = false
break
}
}
if (allProcessed) {
clearStoreProcessing()
that.removeListener('close', remove)
that._invokeAllStoreProcessingQueue()
that.emit('connect', packet)
} else {
startStreamProcess()
}
})
storeDeliver()
}
// start flowing
startStreamProcess()
/**
* _removeOutgoingAndStoreMessage
* @param {Number} messageId - messageId to remove message
* @param {Function} cb - called when the message removed
* @api private
*/
_removeOutgoingAndStoreMessage(messageId, cb) {
const self = this
delete this.outgoing[messageId]
self.outgoingStore.del({ messageId }, (err, packet) => {
cb(err, packet)
self.messageIdProvider.deallocate(messageId)
self._invokeStoreProcessingQueue()
})
}
}
MqttClient.prototype._invokeStoreProcessingQueue = function () {
if (this._storeProcessingQueue.length > 0) {
const f = this._storeProcessingQueue[0]
if (f && f.invoke()) {
this._storeProcessingQueue.shift()
return true
}
}
return false
}
MqttClient.prototype._invokeAllStoreProcessingQueue = function () {
while (this._invokeStoreProcessingQueue()) { /* empty */ }
}
MqttClient.prototype._flushStoreProcessingQueue = function () {
for (const f of this._storeProcessingQueue) {
if (f.cbStorePut) f.cbStorePut(new Error('Connection closed'))
if (f.callback) f.callback(new Error('Connection closed'))
}
this._storeProcessingQueue.splice(0)
}
module.exports = MqttClient

@@ -1,8 +0,5 @@

'use strict'
const { Buffer } = require('buffer')
const Transform = require('readable-stream').Transform
const { Transform } = require('readable-stream')
const duplexify = require('duplexify')
/* global FileReader */
let my

@@ -13,118 +10,118 @@ let proxy

function buildProxy () {
const proxy = new Transform()
proxy._write = function (chunk, encoding, next) {
my.sendSocketMessage({
data: chunk.buffer,
success: function () {
next()
},
fail: function () {
next(new Error())
}
})
}
proxy._flush = function socketEnd (done) {
my.closeSocket({
success: function () {
done()
}
})
}
function buildProxy() {
const _proxy = new Transform()
_proxy._write = (chunk, encoding, next) => {
my.sendSocketMessage({
data: chunk.buffer,
success() {
next()
},
fail() {
next(new Error())
},
})
}
_proxy._flush = (done) => {
my.closeSocket({
success() {
done()
},
})
}
return proxy
return _proxy
}
function setDefaultOpts (opts) {
if (!opts.hostname) {
opts.hostname = 'localhost'
}
if (!opts.path) {
opts.path = '/'
}
function setDefaultOpts(opts) {
if (!opts.hostname) {
opts.hostname = 'localhost'
}
if (!opts.path) {
opts.path = '/'
}
if (!opts.wsOptions) {
opts.wsOptions = {}
}
if (!opts.wsOptions) {
opts.wsOptions = {}
}
}
function buildUrl (opts, client) {
const protocol = opts.protocol === 'alis' ? 'wss' : 'ws'
let url = protocol + '://' + opts.hostname + opts.path
if (opts.port && opts.port !== 80 && opts.port !== 443) {
url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path
}
if (typeof (opts.transformWsUrl) === 'function') {
url = opts.transformWsUrl(url, opts, client)
}
return url
function buildUrl(opts, client) {
const protocol = opts.protocol === 'alis' ? 'wss' : 'ws'
let url = `${protocol}://${opts.hostname}${opts.path}`
if (opts.port && opts.port !== 80 && opts.port !== 443) {
url = `${protocol}://${opts.hostname}:${opts.port}${opts.path}`
}
if (typeof opts.transformWsUrl === 'function') {
url = opts.transformWsUrl(url, opts, client)
}
return url
}
function bindEventHandler () {
if (isInitialized) return
function bindEventHandler() {
if (isInitialized) return
isInitialized = true
isInitialized = true
my.onSocketOpen(function () {
stream.setReadable(proxy)
stream.setWritable(proxy)
stream.emit('connect')
})
my.onSocketOpen(() => {
stream.setReadable(proxy)
stream.setWritable(proxy)
stream.emit('connect')
})
my.onSocketMessage(function (res) {
if (typeof res.data === 'string') {
const buffer = Buffer.from(res.data, 'base64')
proxy.push(buffer)
} else {
const reader = new FileReader()
reader.addEventListener('load', function () {
let data = reader.result
my.onSocketMessage((res) => {
if (typeof res.data === 'string') {
const buffer = Buffer.from(res.data, 'base64')
proxy.push(buffer)
} else {
const reader = new FileReader()
reader.addEventListener('load', () => {
let data = reader.result
if (data instanceof ArrayBuffer) data = Buffer.from(data)
else data = Buffer.from(data, 'utf8')
proxy.push(data)
})
reader.readAsArrayBuffer(res.data)
}
})
if (data instanceof ArrayBuffer) data = Buffer.from(data)
else data = Buffer.from(data, 'utf8')
proxy.push(data)
})
reader.readAsArrayBuffer(res.data)
}
})
my.onSocketClose(function () {
stream.end()
stream.destroy()
})
my.onSocketClose(() => {
stream.end()
stream.destroy()
})
my.onSocketError(function (res) {
stream.destroy(res)
})
my.onSocketError((res) => {
stream.destroy(res)
})
}
function buildStream (client, opts) {
opts.hostname = opts.hostname || opts.host
function buildStream(client, opts) {
opts.hostname = opts.hostname || opts.host
if (!opts.hostname) {
throw new Error('Could not determine host. Specify host manually.')
}
if (!opts.hostname) {
throw new Error('Could not determine host. Specify host manually.')
}
const websocketSubProtocol =
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
? 'mqttv3.1'
: 'mqtt'
const websocketSubProtocol =
opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3
? 'mqttv3.1'
: 'mqtt'
setDefaultOpts(opts)
setDefaultOpts(opts)
const url = buildUrl(opts, client)
my = opts.my
my.connectSocket({
url: url,
protocols: websocketSubProtocol
})
const url = buildUrl(opts, client)
my = opts.my
my.connectSocket({
url,
protocols: websocketSubProtocol,
})
proxy = buildProxy()
stream = duplexify.obj()
proxy = buildProxy()
stream = duplexify.obj()
bindEventHandler()
bindEventHandler()
return stream
return stream
}
module.exports = buildStream

@@ -1,3 +0,2 @@

'use strict'
const url = require('url')
const MqttClient = require('../client')

@@ -7,5 +6,3 @@ const Store = require('../store')

const UniqueMessageIdProvider = require('../unique-message-id-provider')
const IS_BROWSER = require('../is-browser').IS_BROWSER
const url = require('url')
const xtend = require('xtend')
const { IS_BROWSER } = require('../is-browser')
const debug = require('debug')('mqttjs')

@@ -16,13 +13,13 @@

if (!IS_BROWSER) {
protocols.mqtt = require('./tcp')
protocols.tcp = require('./tcp')
protocols.ssl = require('./tls')
protocols.tls = require('./tls')
protocols.mqtts = require('./tls')
protocols.mqtt = require('./tcp')
protocols.tcp = require('./tcp')
protocols.ssl = require('./tls')
protocols.tls = require('./tls')
protocols.mqtts = require('./tls')
} else {
protocols.wx = require('./wx')
protocols.wxs = require('./wx')
protocols.wx = require('./wx')
protocols.wxs = require('./wx')
protocols.ali = require('./ali')
protocols.alis = require('./ali')
protocols.ali = require('./ali')
protocols.alis = require('./ali')
}

@@ -38,13 +35,13 @@

*/
function parseAuthOptions (opts) {
let matches
if (opts.auth) {
matches = opts.auth.match(/^(.+):(.+)$/)
if (matches) {
opts.username = matches[1]
opts.password = matches[2]
} else {
opts.username = opts.auth
}
}
function parseAuthOptions(opts) {
let matches
if (opts.auth) {
matches = opts.auth.match(/^(.+):(.+)$/)
if (matches) {
opts.username = matches[1]
opts.password = matches[2]
} else {
opts.username = opts.auth
}
}
}

@@ -58,109 +55,118 @@

*/
function connect (brokerUrl, opts) {
debug('connecting to an MQTT broker...')
if ((typeof brokerUrl === 'object') && !opts) {
opts = brokerUrl
brokerUrl = null
}
function connect(brokerUrl, opts) {
debug('connecting to an MQTT broker...')
if (typeof brokerUrl === 'object' && !opts) {
opts = brokerUrl
brokerUrl = null
}
opts = opts || {}
opts = opts || {}
if (brokerUrl) {
// eslint-disable-next-line
if (brokerUrl) {
// eslint-disable-next-line
const parsed = url.parse(brokerUrl, true)
if (parsed.port != null) {
parsed.port = Number(parsed.port)
}
if (parsed.port != null) {
parsed.port = Number(parsed.port)
}
opts = xtend(parsed, opts)
opts = { ...parsed, ...opts }
if (opts.protocol === null) {
throw new Error('Missing protocol')
}
if (opts.protocol === null) {
throw new Error('Missing protocol')
}
opts.protocol = opts.protocol.replace(/:$/, '')
}
opts.protocol = opts.protocol.replace(/:$/, '')
}
// merge in the auth options if supplied
parseAuthOptions(opts)
// merge in the auth options if supplied
parseAuthOptions(opts)
// support clientId passed in the query string of the url
if (opts.query && typeof opts.query.clientId === 'string') {
opts.clientId = opts.query.clientId
}
// support clientId passed in the query string of the url
if (opts.query && typeof opts.query.clientId === 'string') {
opts.clientId = opts.query.clientId
}
if (opts.cert && opts.key) {
if (opts.protocol) {
if (['mqtts', 'wss', 'wxs', 'alis'].indexOf(opts.protocol) === -1) {
switch (opts.protocol) {
case 'mqtt':
opts.protocol = 'mqtts'
break
case 'ws':
opts.protocol = 'wss'
break
case 'wx':
opts.protocol = 'wxs'
break
case 'ali':
opts.protocol = 'alis'
break
default:
throw new Error('Unknown protocol for secure connection: "' + opts.protocol + '"!')
}
}
} else {
// A cert and key was provided, however no protocol was specified, so we will throw an error.
throw new Error('Missing secure protocol key')
}
}
if (opts.cert && opts.key) {
if (opts.protocol) {
if (['mqtts', 'wss', 'wxs', 'alis'].indexOf(opts.protocol) === -1) {
switch (opts.protocol) {
case 'mqtt':
opts.protocol = 'mqtts'
break
case 'ws':
opts.protocol = 'wss'
break
case 'wx':
opts.protocol = 'wxs'
break
case 'ali':
opts.protocol = 'alis'
break
default:
throw new Error(
`Unknown protocol for secure connection: "${opts.protocol}"!`,
)
}
}
} else {
// A cert and key was provided, however no protocol was specified, so we will throw an error.
throw new Error('Missing secure protocol key')
}
}
if (!protocols[opts.protocol]) {
const isSecure = ['mqtts', 'wss'].indexOf(opts.protocol) !== -1
opts.protocol = [
'mqtt',
'mqtts',
'ws',
'wss',
'wx',
'wxs',
'ali',
'alis'
].filter(function (key, index) {
if (isSecure && index % 2 === 0) {
// Skip insecure protocols when requesting a secure one.
return false
}
return (typeof protocols[key] === 'function')
})[0]
}
if (!protocols[opts.protocol]) {
const isSecure = ['mqtts', 'wss'].indexOf(opts.protocol) !== -1
opts.protocol = [
'mqtt',
'mqtts',
'ws',
'wss',
'wx',
'wxs',
'ali',
'alis',
].filter((key, index) => {
if (isSecure && index % 2 === 0) {
// Skip insecure protocols when requesting a secure one.
return false
}
return typeof protocols[key] === 'function'
})[0]
}
if (opts.clean === false && !opts.clientId) {
throw new Error('Missing clientId for unclean clients')
}
if (opts.clean === false && !opts.clientId) {
throw new Error('Missing clientId for unclean clients')
}
if (opts.protocol) {
opts.defaultProtocol = opts.protocol
}
if (opts.protocol) {
opts.defaultProtocol = opts.protocol
}
function wrapper (client) {
if (opts.servers) {
if (!client._reconnectCount || client._reconnectCount === opts.servers.length) {
client._reconnectCount = 0
}
function wrapper(client) {
if (opts.servers) {
if (
!client._reconnectCount ||
client._reconnectCount === opts.servers.length
) {
client._reconnectCount = 0
}
opts.host = opts.servers[client._reconnectCount].host
opts.port = opts.servers[client._reconnectCount].port
opts.protocol = (!opts.servers[client._reconnectCount].protocol ? opts.defaultProtocol : opts.servers[client._reconnectCount].protocol)
opts.hostname = opts.host
opts.host = opts.servers[client._reconnectCount].host
opts.port = opts.servers[client._reconnectCount].port
opts.protocol = !opts.servers[client._reconnectCount].protocol
? opts.defaultProtocol
: opts.servers[client._reconnectCount].protocol
opts.hostname = opts.host
client._reconnectCount++
}
client._reconnectCount++
}
debug('calling streambuilder for', opts.protocol)
return protocols[opts.protocol](client, opts)
}
const client = new MqttClient(wrapper, opts)
client.on('error', function () { /* Automatically set up client error handling */ })
return client
debug('calling streambuilder for', opts.protocol)
return protocols[opts.protocol](client, opts)
}
const client = new MqttClient(wrapper, opts)
client.on('error', () => {
/* Automatically set up client error handling */
})
return client
}

@@ -167,0 +173,0 @@

@@ -1,2 +0,1 @@

'use strict'
const net = require('net')

@@ -9,13 +8,13 @@ const debug = require('debug')('mqttjs:tcp')

*/
function streamBuilder (client, opts) {
opts.port = opts.port || 1883
opts.hostname = opts.hostname || opts.host || 'localhost'
function streamBuilder(client, opts) {
opts.port = opts.port || 1883
opts.hostname = opts.hostname || opts.host || 'localhost'
const port = opts.port
const host = opts.hostname
const { port } = opts
const host = opts.hostname
debug('port %d and host %s', port, host)
return net.createConnection(port, host)
debug('port %d and host %s', port, host)
return net.createConnection(port, host)
}
module.exports = streamBuilder

@@ -1,2 +0,1 @@

'use strict'
const tls = require('tls')

@@ -6,44 +5,49 @@ const net = require('net')

function buildBuilder (mqttClient, opts) {
opts.port = opts.port || 8883
opts.host = opts.hostname || opts.host || 'localhost'
function buildBuilder(mqttClient, opts) {
opts.port = opts.port || 8883
opts.host = opts.hostname || opts.host || 'localhost'
if (net.isIP(opts.host) === 0) {
opts.servername = opts.host
}
if (net.isIP(opts.host) === 0) {
opts.servername = opts.host
}
opts.rejectUnauthorized = opts.rejectUnauthorized !== false
opts.rejectUnauthorized = opts.rejectUnauthorized !== false
delete opts.path
delete opts.path
debug('port %d host %s rejectUnauthorized %b', opts.port, opts.host, opts.rejectUnauthorized)
debug(
'port %d host %s rejectUnauthorized %b',
opts.port,
opts.host,
opts.rejectUnauthorized,
)
const connection = tls.connect(opts)
/* eslint no-use-before-define: [2, "nofunc"] */
connection.on('secureConnect', function () {
if (opts.rejectUnauthorized && !connection.authorized) {
connection.emit('error', new Error('TLS not authorized'))
} else {
connection.removeListener('error', handleTLSerrors)
}
})
const connection = tls.connect(opts)
/* eslint no-use-before-define: [2, "nofunc"] */
connection.on('secureConnect', () => {
if (opts.rejectUnauthorized && !connection.authorized) {
connection.emit('error', new Error('TLS not authorized'))
} else {
connection.removeListener('error', handleTLSerrors)
}
})
function handleTLSerrors (err) {
// How can I get verify this error is a tls error?
if (opts.rejectUnauthorized) {
mqttClient.emit('error', err)
}
function handleTLSerrors(err) {
// How can I get verify this error is a tls error?
if (opts.rejectUnauthorized) {
mqttClient.emit('error', err)
}
// close this connection to match the behaviour of net
// otherwise all we get is an error from the connection
// and close event doesn't fire. This is a work around
// to enable the reconnect code to work the same as with
// net.createConnection
connection.end()
}
// close this connection to match the behaviour of net
// otherwise all we get is an error from the connection
// and close event doesn't fire. This is a work around
// to enable the reconnect code to work the same as with
// net.createConnection
connection.end()
}
connection.on('error', handleTLSerrors)
return connection
connection.on('error', handleTLSerrors)
return connection
}
module.exports = buildBuilder

@@ -1,3 +0,1 @@

'use strict'
const { Buffer } = require('buffer')

@@ -7,252 +5,262 @@ const WS = require('ws')

const duplexify = require('duplexify')
const Transform = require('readable-stream').Transform
const IS_BROWSER = require('../is-browser').IS_BROWSER
const { Transform } = require('readable-stream')
const { IS_BROWSER } = require('../is-browser')
const WSS_OPTIONS = [
'rejectUnauthorized',
'ca',
'cert',
'key',
'pfx',
'passphrase'
'rejectUnauthorized',
'ca',
'cert',
'key',
'pfx',
'passphrase',
]
function buildUrl (opts, client) {
let url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path
if (typeof (opts.transformWsUrl) === 'function') {
url = opts.transformWsUrl(url, opts, client)
}
return url
function buildUrl(opts, client) {
let url = `${opts.protocol}://${opts.hostname}:${opts.port}${opts.path}`
if (typeof opts.transformWsUrl === 'function') {
url = opts.transformWsUrl(url, opts, client)
}
return url
}
function setDefaultOpts (opts) {
const options = opts
if (!opts.hostname) {
options.hostname = 'localhost'
}
if (!opts.port) {
if (opts.protocol === 'wss') {
options.port = 443
} else {
options.port = 80
}
}
if (!opts.path) {
options.path = '/'
}
function setDefaultOpts(opts) {
const options = opts
if (!opts.hostname) {
options.hostname = 'localhost'
}
if (!opts.port) {
if (opts.protocol === 'wss') {
options.port = 443
} else {
options.port = 80
}
}
if (!opts.path) {
options.path = '/'
}
if (!opts.wsOptions) {
options.wsOptions = {}
}
if (!IS_BROWSER && opts.protocol === 'wss') {
// Add cert/key/ca etc options
WSS_OPTIONS.forEach(function (prop) {
if (Object.prototype.hasOwnProperty.call(opts, prop) && !Object.prototype.hasOwnProperty.call(opts.wsOptions, prop)) {
options.wsOptions[prop] = opts[prop]
}
})
}
if (!opts.wsOptions) {
options.wsOptions = {}
}
if (!IS_BROWSER && opts.protocol === 'wss') {
// Add cert/key/ca etc options
WSS_OPTIONS.forEach((prop) => {
if (
Object.prototype.hasOwnProperty.call(opts, prop) &&
!Object.prototype.hasOwnProperty.call(opts.wsOptions, prop)
) {
options.wsOptions[prop] = opts[prop]
}
})
}
return options
return options
}
function setDefaultBrowserOpts (opts) {
const options = setDefaultOpts(opts)
function setDefaultBrowserOpts(opts) {
const options = setDefaultOpts(opts)
if (!options.hostname) {
options.hostname = options.host
}
if (!options.hostname) {
options.hostname = options.host
}
if (!options.hostname) {
// Throwing an error in a Web Worker if no `hostname` is given, because we
// can not determine the `hostname` automatically. If connecting to
// localhost, please supply the `hostname` as an argument.
if (typeof (document) === 'undefined') {
throw new Error('Could not determine host. Specify host manually.')
}
const parsed = new URL(document.URL)
options.hostname = parsed.hostname
if (!options.hostname) {
// Throwing an error in a Web Worker if no `hostname` is given, because we
// can not determine the `hostname` automatically. If connecting to
// localhost, please supply the `hostname` as an argument.
if (typeof document === 'undefined') {
throw new Error('Could not determine host. Specify host manually.')
}
const parsed = new URL(document.URL)
options.hostname = parsed.hostname
if (!options.port) {
options.port = parsed.port
}
}
if (!options.port) {
options.port = parsed.port
}
}
// objectMode should be defined for logic
if (options.objectMode === undefined) {
options.objectMode = !(options.binary === true || options.binary === undefined)
}
// objectMode should be defined for logic
if (options.objectMode === undefined) {
options.objectMode = !(
options.binary === true || options.binary === undefined
)
}
return options
return options
}
function createWebSocket (client, url, opts) {
debug('createWebSocket')
debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion)
const websocketSubProtocol =
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
? 'mqttv3.1'
: 'mqtt'
function createWebSocket(client, url, opts) {
debug('createWebSocket')
debug(`protocol: ${opts.protocolId} ${opts.protocolVersion}`)
const websocketSubProtocol =
opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3
? 'mqttv3.1'
: 'mqtt'
debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol)
const socket = new WS(url, [websocketSubProtocol], opts.wsOptions)
return socket
debug(
`creating new Websocket for url: ${url} and protocol: ${websocketSubProtocol}`,
)
const socket = new WS(url, [websocketSubProtocol], opts.wsOptions)
return socket
}
function createBrowserWebSocket (client, opts) {
const websocketSubProtocol =
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
? 'mqttv3.1'
: 'mqtt'
function createBrowserWebSocket(client, opts) {
const websocketSubProtocol =
opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3
? 'mqttv3.1'
: 'mqtt'
const url = buildUrl(opts, client)
/* global WebSocket */
const socket = new WebSocket(url, [websocketSubProtocol])
socket.binaryType = 'arraybuffer'
return socket
const url = buildUrl(opts, client)
const socket = new WebSocket(url, [websocketSubProtocol])
socket.binaryType = 'arraybuffer'
return socket
}
function streamBuilder (client, opts) {
debug('streamBuilder')
const options = setDefaultOpts(opts)
const url = buildUrl(options, client)
const socket = createWebSocket(client, url, options)
const webSocketStream = WS.createWebSocketStream(socket, options.wsOptions)
webSocketStream.url = url
socket.on('close', () => { webSocketStream.destroy() })
return webSocketStream
function streamBuilder(client, opts) {
debug('streamBuilder')
const options = setDefaultOpts(opts)
const url = buildUrl(options, client)
const socket = createWebSocket(client, url, options)
const webSocketStream = WS.createWebSocketStream(socket, options.wsOptions)
webSocketStream.url = url
socket.on('close', () => {
webSocketStream.destroy()
})
return webSocketStream
}
function browserStreamBuilder (client, opts) {
debug('browserStreamBuilder')
let stream
const options = setDefaultBrowserOpts(opts)
// sets the maximum socket buffer size before throttling
const bufferSize = options.browserBufferSize || 1024 * 512
function browserStreamBuilder(client, opts) {
debug('browserStreamBuilder')
let stream
const options = setDefaultBrowserOpts(opts)
// sets the maximum socket buffer size before throttling
const bufferSize = options.browserBufferSize || 1024 * 512
const bufferTimeout = opts.browserBufferTimeout || 1000
const bufferTimeout = opts.browserBufferTimeout || 1000
const coerceToBuffer = !opts.objectMode
const coerceToBuffer = !opts.objectMode
const socket = createBrowserWebSocket(client, opts)
const socket = createBrowserWebSocket(client, opts)
const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser)
const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser)
if (!opts.objectMode) {
proxy._writev = writev
}
proxy.on('close', () => { socket.close() })
if (!opts.objectMode) {
proxy._writev = writev
}
proxy.on('close', () => {
socket.close()
})
const eventListenerSupport = (typeof socket.addEventListener !== 'undefined')
const eventListenerSupport = typeof socket.addEventListener !== 'undefined'
// was already open when passed in
if (socket.readyState === socket.OPEN) {
stream = proxy
} else {
stream = stream = duplexify(undefined, undefined, opts)
if (!opts.objectMode) {
stream._writev = writev
}
// was already open when passed in
if (socket.readyState === socket.OPEN) {
stream = proxy
} else {
stream = duplexify(undefined, undefined, opts)
if (!opts.objectMode) {
stream._writev = writev
}
if (eventListenerSupport) {
socket.addEventListener('open', onopen)
} else {
socket.onopen = onopen
}
}
if (eventListenerSupport) {
socket.addEventListener('open', onOpen)
} else {
socket.onopen = onOpen
}
}
stream.socket = socket
stream.socket = socket
if (eventListenerSupport) {
socket.addEventListener('close', onclose)
socket.addEventListener('error', onerror)
socket.addEventListener('message', onmessage)
} else {
socket.onclose = onclose
socket.onerror = onerror
socket.onmessage = onmessage
}
if (eventListenerSupport) {
socket.addEventListener('close', onclose)
socket.addEventListener('error', onerror)
socket.addEventListener('message', onmessage)
} else {
socket.onclose = onclose
socket.onerror = onerror
socket.onmessage = onmessage
}
// methods for browserStreamBuilder
// methods for browserStreamBuilder
function buildProxy (options, socketWrite, socketEnd) {
const proxy = new Transform({
objectModeMode: options.objectMode
})
function buildProxy(pOptions, socketWrite, socketEnd) {
const _proxy = new Transform({
objectModeMode: pOptions.objectMode,
})
proxy._write = socketWrite
proxy._flush = socketEnd
_proxy._write = socketWrite
_proxy._flush = socketEnd
return proxy
}
return _proxy
}
function onopen () {
stream.setReadable(proxy)
stream.setWritable(proxy)
stream.emit('connect')
}
function onOpen() {
stream.setReadable(proxy)
stream.setWritable(proxy)
stream.emit('connect')
}
function onclose () {
stream.end()
stream.destroy()
}
function onclose() {
stream.end()
stream.destroy()
}
function onerror (err) {
stream.destroy(err)
}
function onerror(err) {
stream.destroy(err)
}
function onmessage (event) {
let data = event.data
if (data instanceof ArrayBuffer) data = Buffer.from(data)
else data = Buffer.from(data, 'utf8')
proxy.push(data)
}
function onmessage(event) {
let { data } = event
if (data instanceof ArrayBuffer) data = Buffer.from(data)
else data = Buffer.from(data, 'utf8')
proxy.push(data)
}
// this is to be enabled only if objectMode is false
function writev (chunks, cb) {
const buffers = new Array(chunks.length)
for (let i = 0; i < chunks.length; i++) {
if (typeof chunks[i].chunk === 'string') {
buffers[i] = Buffer.from(chunks[i], 'utf8')
} else {
buffers[i] = chunks[i].chunk
}
}
// this is to be enabled only if objectMode is false
function writev(chunks, cb) {
const buffers = new Array(chunks.length)
for (let i = 0; i < chunks.length; i++) {
if (typeof chunks[i].chunk === 'string') {
buffers[i] = Buffer.from(chunks[i], 'utf8')
} else {
buffers[i] = chunks[i].chunk
}
}
this._write(Buffer.concat(buffers), 'binary', cb)
}
this._write(Buffer.concat(buffers), 'binary', cb)
}
function socketWriteBrowser (chunk, enc, next) {
if (socket.bufferedAmount > bufferSize) {
// throttle data until buffered amount is reduced.
setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next)
}
function socketWriteBrowser(chunk, enc, next) {
if (socket.bufferedAmount > bufferSize) {
// throttle data until buffered amount is reduced.
setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next)
}
if (coerceToBuffer && typeof chunk === 'string') {
chunk = Buffer.from(chunk, 'utf8')
}
if (coerceToBuffer && typeof chunk === 'string') {
chunk = Buffer.from(chunk, 'utf8')
}
try {
socket.send(chunk)
} catch (err) {
return next(err)
}
try {
socket.send(chunk)
} catch (err) {
return next(err)
}
next()
}
next()
}
function socketEndBrowser (done) {
socket.close()
done()
}
function socketEndBrowser(done) {
socket.close()
done()
}
// end methods for browserStreamBuilder
// end methods for browserStreamBuilder
return stream
return stream
}
if (IS_BROWSER) {
module.exports = browserStreamBuilder
module.exports = browserStreamBuilder
} else {
module.exports = streamBuilder
module.exports = streamBuilder
}

@@ -1,133 +0,132 @@

'use strict'
const { Buffer } = require('buffer')
const Transform = require('readable-stream').Transform
const { Transform } = require('readable-stream')
const duplexify = require('duplexify')
/* global wx */
let socketTask, proxy, stream
let socketTask
let proxy
let stream
function buildProxy () {
const proxy = new Transform()
proxy._write = function (chunk, encoding, next) {
socketTask.send({
data: chunk.buffer,
success: function () {
next()
},
fail: function (errMsg) {
next(new Error(errMsg))
}
})
}
proxy._flush = function socketEnd (done) {
socketTask.close({
success: function () {
done()
}
})
}
function buildProxy() {
const _proxy = new Transform()
_proxy._write = (chunk, encoding, next) => {
socketTask.send({
data: chunk.buffer,
success() {
next()
},
fail(errMsg) {
next(new Error(errMsg))
},
})
}
_proxy._flush = (done) => {
socketTask.close({
success() {
done()
},
})
}
return proxy
return _proxy
}
function setDefaultOpts (opts) {
if (!opts.hostname) {
opts.hostname = 'localhost'
}
if (!opts.path) {
opts.path = '/'
}
function setDefaultOpts(opts) {
if (!opts.hostname) {
opts.hostname = 'localhost'
}
if (!opts.path) {
opts.path = '/'
}
if (!opts.wsOptions) {
opts.wsOptions = {}
}
if (!opts.wsOptions) {
opts.wsOptions = {}
}
}
function buildUrl (opts, client) {
const protocol = opts.protocol === 'wxs' ? 'wss' : 'ws'
let url = protocol + '://' + opts.hostname + opts.path
if (opts.port && opts.port !== 80 && opts.port !== 443) {
url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path
}
if (typeof (opts.transformWsUrl) === 'function') {
url = opts.transformWsUrl(url, opts, client)
}
return url
function buildUrl(opts, client) {
const protocol = opts.protocol === 'wxs' ? 'wss' : 'ws'
let url = `${protocol}://${opts.hostname}${opts.path}`
if (opts.port && opts.port !== 80 && opts.port !== 443) {
url = `${protocol}://${opts.hostname}:${opts.port}${opts.path}`
}
if (typeof opts.transformWsUrl === 'function') {
url = opts.transformWsUrl(url, opts, client)
}
return url
}
function bindEventHandler () {
socketTask.onOpen(function () {
stream.setReadable(proxy)
stream.setWritable(proxy)
stream.emit('connect')
})
function bindEventHandler() {
socketTask.onOpen(() => {
stream.setReadable(proxy)
stream.setWritable(proxy)
stream.emit('connect')
})
socketTask.onMessage(function (res) {
let data = res.data
socketTask.onMessage((res) => {
let { data } = res
if (data instanceof ArrayBuffer) data = Buffer.from(data)
else data = Buffer.from(data, 'utf8')
proxy.push(data)
})
if (data instanceof ArrayBuffer) data = Buffer.from(data)
else data = Buffer.from(data, 'utf8')
proxy.push(data)
})
socketTask.onClose(function () {
stream.end()
stream.destroy()
})
socketTask.onClose(() => {
stream.end()
stream.destroy()
})
socketTask.onError(function (res) {
stream.destroy(new Error(res.errMsg))
})
socketTask.onError((res) => {
stream.destroy(new Error(res.errMsg))
})
}
function buildStream (client, opts) {
opts.hostname = opts.hostname || opts.host
function buildStream(client, opts) {
opts.hostname = opts.hostname || opts.host
if (!opts.hostname) {
throw new Error('Could not determine host. Specify host manually.')
}
if (!opts.hostname) {
throw new Error('Could not determine host. Specify host manually.')
}
const websocketSubProtocol =
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
? 'mqttv3.1'
: 'mqtt'
const websocketSubProtocol =
opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3
? 'mqttv3.1'
: 'mqtt'
setDefaultOpts(opts)
setDefaultOpts(opts)
const url = buildUrl(opts, client)
socketTask = wx.connectSocket({
url: url,
protocols: [websocketSubProtocol]
})
const url = buildUrl(opts, client)
socketTask = wx.connectSocket({
url,
protocols: [websocketSubProtocol],
})
proxy = buildProxy()
stream = duplexify.obj()
stream._destroy = function (err, cb) {
socketTask.close({
success: function () {
cb && cb(err)
}
})
}
proxy = buildProxy()
stream = duplexify.obj()
stream._destroy = (err, cb) => {
socketTask.close({
success() {
if (cb) cb(err)
},
})
}
const destroyRef = stream.destroy
stream.destroy = function () {
stream.destroy = destroyRef
const destroyRef = stream.destroy
stream.destroy = () => {
stream.destroy = destroyRef
const self = this
setTimeout(function () {
socketTask.close({
fail: function () {
self._destroy(new Error())
}
})
}, 0)
}.bind(stream)
setTimeout(() => {
socketTask.close({
fail() {
stream._destroy(new Error())
},
})
}, 0)
}
bindEventHandler()
bindEventHandler()
return stream
return stream
}
module.exports = buildStream

@@ -1,3 +0,1 @@

'use strict'
/**

@@ -7,64 +5,60 @@ * DefaultMessageAllocator constructor

*/
function DefaultMessageIdProvider () {
if (!(this instanceof DefaultMessageIdProvider)) {
return new DefaultMessageIdProvider()
}
class DefaultMessageIdProvider {
constructor() {
/**
* MessageIDs starting with 1
* ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810
*/
this.nextId = Math.max(1, Math.floor(Math.random() * 65535))
}
/**
* MessageIDs starting with 1
* ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810
*/
this.nextId = Math.max(1, Math.floor(Math.random() * 65535))
}
/**
* allocate
*
* Get the next messageId.
* @return unsigned int
*/
allocate() {
// id becomes current state of this.nextId and increments afterwards
const id = this.nextId++
// Ensure 16 bit unsigned int (max 65535, nextId got one higher)
if (this.nextId === 65536) {
this.nextId = 1
}
return id
}
/**
* allocate
*
* Get the next messageId.
* @return unsigned int
*/
DefaultMessageIdProvider.prototype.allocate = function () {
// id becomes current state of this.nextId and increments afterwards
const id = this.nextId++
// Ensure 16 bit unsigned int (max 65535, nextId got one higher)
if (this.nextId === 65536) {
this.nextId = 1
}
return id
}
/**
* getLastAllocated
* Get the last allocated messageId.
* @return unsigned int
*/
getLastAllocated() {
return this.nextId === 1 ? 65535 : this.nextId - 1
}
/**
* getLastAllocated
* Get the last allocated messageId.
* @return unsigned int
*/
DefaultMessageIdProvider.prototype.getLastAllocated = function () {
return (this.nextId === 1) ? 65535 : (this.nextId - 1)
}
/**
* register
* Register messageId. If success return true, otherwise return false.
* @param { unsigned int } - messageId to register,
* @return boolean
*/
register(messageId) {
return true
}
/**
* register
* Register messageId. If success return true, otherwise return false.
* @param { unsigned int } - messageId to register,
* @return boolean
*/
DefaultMessageIdProvider.prototype.register = function (messageId) {
return true
}
/**
* deallocate
* Deallocate messageId.
* @param { unsigned int } - messageId to deallocate,
*/
deallocate(messageId) {}
/**
* deallocate
* Deallocate messageId.
* @param { unsigned int } - messageId to deallocate,
*/
DefaultMessageIdProvider.prototype.deallocate = function (messageId) {
/**
* clear
* Deallocate all messageIds.
*/
clear() {}
}
/**
* clear
* Deallocate all messageIds.
*/
DefaultMessageIdProvider.prototype.clear = function () {
}
module.exports = DefaultMessageIdProvider
const legacyIsBrowser =
(typeof process !== 'undefined' && process.title === 'browser') ||
// eslint-disable-next-line camelcase
typeof __webpack_require__ === 'function'
(typeof process !== 'undefined' && process.title === 'browser') ||
// eslint-disable-next-line camelcase
typeof __webpack_require__ === 'function'
const isBrowser =
typeof window !== 'undefined' && typeof document !== 'undefined'
typeof window !== 'undefined' && typeof document !== 'undefined'
module.exports = {
IS_BROWSER: isBrowser || legacyIsBrowser
IS_BROWSER: isBrowser || legacyIsBrowser,
}

@@ -1,12 +0,9 @@

'use strict'
/**
* Module dependencies
*/
const xtend = require('xtend')
const { Readable } = require('readable-stream')
const Readable = require('readable-stream').Readable
const streamsOpts = { objectMode: true }
const defaultStoreOptions = {
clean: true
clean: true,
}

@@ -20,110 +17,106 @@

*/
function Store (options) {
if (!(this instanceof Store)) {
return new Store(options)
}
class Store {
constructor(options) {
this.options = options || {}
this.options = options || {}
// Defaults
this.options = { ...defaultStoreOptions, ...options }
// Defaults
this.options = xtend(defaultStoreOptions, options)
this._inflights = new Map()
}
this._inflights = new Map()
}
/**
* Adds a packet to the store, a packet is
* anything that has a messageId property.
*
*/
put(packet, cb) {
this._inflights.set(packet.messageId, packet)
/**
* Adds a packet to the store, a packet is
* anything that has a messageId property.
*
*/
Store.prototype.put = function (packet, cb) {
this._inflights.set(packet.messageId, packet)
if (cb) {
cb()
}
if (cb) {
cb()
}
return this
}
return this
}
/**
* Creates a stream with all the packets in the store
*
*/
createStream() {
const stream = new Readable(streamsOpts)
const values = []
let destroyed = false
let i = 0
/**
* Creates a stream with all the packets in the store
*
*/
Store.prototype.createStream = function () {
const stream = new Readable(streamsOpts)
const values = []
let destroyed = false
let i = 0
this._inflights.forEach((value, key) => {
values.push(value)
})
this._inflights.forEach(function (value, key) {
values.push(value)
})
stream._read = () => {
if (!destroyed && i < values.length) {
stream.push(values[i++])
} else {
stream.push(null)
}
}
stream._read = function () {
if (!destroyed && i < values.length) {
this.push(values[i++])
} else {
this.push(null)
}
}
stream.destroy = () => {
if (destroyed) {
return
}
stream.destroy = function () {
if (destroyed) {
return
}
destroyed = true
const self = this
setTimeout(() => {
stream.emit('close')
}, 0)
}
destroyed = true
return stream
}
setTimeout(function () {
self.emit('close')
}, 0)
}
/**
* deletes a packet from the store.
*/
del(packet, cb) {
packet = this._inflights.get(packet.messageId)
if (packet) {
this._inflights.delete(packet.messageId)
cb(null, packet)
} else if (cb) {
cb(new Error('missing packet'))
}
return stream
}
return this
}
/**
* deletes a packet from the store.
*/
Store.prototype.del = function (packet, cb) {
packet = this._inflights.get(packet.messageId)
if (packet) {
this._inflights.delete(packet.messageId)
cb(null, packet)
} else if (cb) {
cb(new Error('missing packet'))
}
/**
* get a packet from the store.
*/
get(packet, cb) {
packet = this._inflights.get(packet.messageId)
if (packet) {
cb(null, packet)
} else if (cb) {
cb(new Error('missing packet'))
}
return this
}
return this
}
/**
* get a packet from the store.
*/
Store.prototype.get = function (packet, cb) {
packet = this._inflights.get(packet.messageId)
if (packet) {
cb(null, packet)
} else if (cb) {
cb(new Error('missing packet'))
}
return this
/**
* Close the store
*/
close(cb) {
if (this.options.clean) {
this._inflights = null
}
if (cb) {
cb()
}
}
}
/**
* Close the store
*/
Store.prototype.close = function (cb) {
if (this.options.clean) {
this._inflights = null
}
if (cb) {
cb()
}
}
module.exports = Store

@@ -1,3 +0,1 @@

'use strict'
/**

@@ -8,41 +6,40 @@ * Topic Alias receiving manager

*/
function TopicAliasRecv (max) {
if (!(this instanceof TopicAliasRecv)) {
return new TopicAliasRecv(max)
}
this.aliasToTopic = {}
this.max = max
}
class TopicAliasRecv {
constructor(max) {
this.aliasToTopic = {}
this.max = max
}
/**
* Insert or update topic - alias entry.
* @param {String} [topic] - topic
* @param {Number} [alias] - topic alias
* @returns {Boolean} - if success return true otherwise false
*/
TopicAliasRecv.prototype.put = function (topic, alias) {
if (alias === 0 || alias > this.max) {
return false
}
this.aliasToTopic[alias] = topic
this.length = Object.keys(this.aliasToTopic).length
return true
}
/**
* Insert or update topic - alias entry.
* @param {String} [topic] - topic
* @param {Number} [alias] - topic alias
* @returns {Boolean} - if success return true otherwise false
*/
put(topic, alias) {
if (alias === 0 || alias > this.max) {
return false
}
this.aliasToTopic[alias] = topic
this.length = Object.keys(this.aliasToTopic).length
return true
}
/**
* Get topic by alias
* @param {String} [topic] - topic
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined
*/
TopicAliasRecv.prototype.getTopicByAlias = function (alias) {
return this.aliasToTopic[alias]
}
/**
* Get topic by alias
* @param {String} [topic] - topic
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined
*/
getTopicByAlias(alias) {
return this.aliasToTopic[alias]
}
/**
* Clear all entries
*/
TopicAliasRecv.prototype.clear = function () {
this.aliasToTopic = {}
/**
* Clear all entries
*/
clear() {
this.aliasToTopic = {}
}
}
module.exports = TopicAliasRecv

@@ -1,8 +0,6 @@

'use strict'
/**
* Module dependencies
*/
const LruMap = require('lru-cache')
const NumberAllocator = require('number-allocator').NumberAllocator
const LRUCache = require('lru-cache')
const { NumberAllocator } = require('number-allocator')

@@ -14,79 +12,78 @@ /**

*/
function TopicAliasSend (max) {
if (!(this instanceof TopicAliasSend)) {
return new TopicAliasSend(max)
}
class TopicAliasSend {
constructor(max) {
if (max > 0) {
this.aliasToTopic = new LRUCache({ max })
this.topicToAlias = {}
this.numberAllocator = new NumberAllocator(1, max)
this.max = max
this.length = 0
}
}
if (max > 0) {
this.aliasToTopic = new LruMap({ max: max })
this.topicToAlias = {}
this.numberAllocator = new NumberAllocator(1, max)
this.max = max
this.length = 0
}
}
/**
* Insert or update topic - alias entry.
* @param {String} [topic] - topic
* @param {Number} [alias] - topic alias
* @returns {Boolean} - if success return true otherwise false
*/
put(topic, alias) {
if (alias === 0 || alias > this.max) {
return false
}
const entry = this.aliasToTopic.get(alias)
if (entry) {
delete this.topicToAlias[entry]
}
this.aliasToTopic.set(alias, topic)
this.topicToAlias[topic] = alias
this.numberAllocator.use(alias)
this.length = this.aliasToTopic.size
return true
}
/**
* Insert or update topic - alias entry.
* @param {String} [topic] - topic
* @param {Number} [alias] - topic alias
* @returns {Boolean} - if success return true otherwise false
*/
TopicAliasSend.prototype.put = function (topic, alias) {
if (alias === 0 || alias > this.max) {
return false
}
const entry = this.aliasToTopic.get(alias)
if (entry) {
delete this.topicToAlias[entry]
}
this.aliasToTopic.set(alias, topic)
this.topicToAlias[topic] = alias
this.numberAllocator.use(alias)
this.length = this.aliasToTopic.length
return true
}
/**
* Get topic by alias
* @param {Number} [alias] - topic alias
* @returns {String} - if mapped topic exists return topic, otherwise return undefined
*/
getTopicByAlias(alias) {
return this.aliasToTopic.get(alias)
}
/**
* Get topic by alias
* @param {Number} [alias] - topic alias
* @returns {String} - if mapped topic exists return topic, otherwise return undefined
*/
TopicAliasSend.prototype.getTopicByAlias = function (alias) {
return this.aliasToTopic.get(alias)
}
/**
* Get topic by alias
* @param {String} [topic] - topic
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined
*/
getAliasByTopic(topic) {
const alias = this.topicToAlias[topic]
if (typeof alias !== 'undefined') {
this.aliasToTopic.get(alias) // LRU update
}
return alias
}
/**
* Get topic by alias
* @param {String} [topic] - topic
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined
*/
TopicAliasSend.prototype.getAliasByTopic = function (topic) {
const alias = this.topicToAlias[topic]
if (typeof alias !== 'undefined') {
this.aliasToTopic.get(alias) // LRU update
}
return alias
}
/**
* Clear all entries
*/
clear() {
this.aliasToTopic.clear()
this.topicToAlias = {}
this.numberAllocator.clear()
this.length = 0
}
/**
* Clear all entries
*/
TopicAliasSend.prototype.clear = function () {
this.aliasToTopic.reset()
this.topicToAlias = {}
this.numberAllocator.clear()
this.length = 0
/**
* Get Least Recently Used (LRU) topic alias
* @returns {Number} - if vacant alias exists then return it, otherwise then return LRU alias
*/
getLruAlias() {
const alias = this.numberAllocator.firstVacant()
if (alias) return alias
// get last alias (key) from LRU cache
return [...this.aliasToTopic.keys()][this.aliasToTopic.size - 1]
}
}
/**
* Get Least Recently Used (LRU) topic alias
* @returns {Number} - if vacant alias exists then return it, otherwise then return LRU alias
*/
TopicAliasSend.prototype.getLruAlias = function () {
const alias = this.numberAllocator.firstVacant()
if (alias) return alias
return this.aliasToTopic.keys()[this.aliasToTopic.length - 1]
}
module.exports = TopicAliasSend

@@ -1,5 +0,3 @@

'use strict'
const { NumberAllocator } = require('number-allocator')
const NumberAllocator = require('number-allocator').NumberAllocator
/**

@@ -9,58 +7,56 @@ * UniqueMessageAllocator constructor

*/
function UniqueMessageIdProvider () {
if (!(this instanceof UniqueMessageIdProvider)) {
return new UniqueMessageIdProvider()
}
class UniqueMessageIdProvider {
constructor() {
this.numberAllocator = new NumberAllocator(1, 65535)
}
this.numberAllocator = new NumberAllocator(1, 65535)
}
/**
* allocate
*
* Get the next messageId.
* @return if messageId is fully allocated then return null,
* otherwise return the smallest usable unsigned int messageId.
*/
allocate() {
this.lastId = this.numberAllocator.alloc()
return this.lastId
}
/**
* allocate
*
* Get the next messageId.
* @return if messageId is fully allocated then return null,
* otherwise return the smallest usable unsigned int messageId.
*/
UniqueMessageIdProvider.prototype.allocate = function () {
this.lastId = this.numberAllocator.alloc()
return this.lastId
}
/**
* getLastAllocated
* Get the last allocated messageId.
* @return unsigned int
*/
getLastAllocated() {
return this.lastId
}
/**
* getLastAllocated
* Get the last allocated messageId.
* @return unsigned int
*/
UniqueMessageIdProvider.prototype.getLastAllocated = function () {
return this.lastId
}
/**
* register
* Register messageId. If success return true, otherwise return false.
* @param { unsigned int } - messageId to register,
* @return boolean
*/
register(messageId) {
return this.numberAllocator.use(messageId)
}
/**
* register
* Register messageId. If success return true, otherwise return false.
* @param { unsigned int } - messageId to register,
* @return boolean
*/
UniqueMessageIdProvider.prototype.register = function (messageId) {
return this.numberAllocator.use(messageId)
}
/**
* deallocate
* Deallocate messageId.
* @param { unsigned int } - messageId to deallocate,
*/
deallocate(messageId) {
this.numberAllocator.free(messageId)
}
/**
* deallocate
* Deallocate messageId.
* @param { unsigned int } - messageId to deallocate,
*/
UniqueMessageIdProvider.prototype.deallocate = function (messageId) {
this.numberAllocator.free(messageId)
/**
* clear
* Deallocate all messageIds.
*/
clear() {
this.numberAllocator.clear()
}
}
/**
* clear
* Deallocate all messageIds.
*/
UniqueMessageIdProvider.prototype.clear = function () {
this.numberAllocator.clear()
}
module.exports = UniqueMessageIdProvider

@@ -1,3 +0,1 @@

'use strict'
/**

@@ -12,21 +10,21 @@ * Validate a topic to see if it's valid or not.

*/
function validateTopic (topic) {
const parts = topic.split('/')
function validateTopic(topic) {
const parts = topic.split('/')
for (let i = 0; i < parts.length; i++) {
if (parts[i] === '+') {
continue
}
for (let i = 0; i < parts.length; i++) {
if (parts[i] === '+') {
continue
}
if (parts[i] === '#') {
// for Rule #2
return i === parts.length - 1
}
if (parts[i] === '#') {
// for Rule #2
return i === parts.length - 1
}
if (parts[i].indexOf('+') !== -1 || parts[i].indexOf('#') !== -1) {
return false
}
}
if (parts[i].indexOf('+') !== -1 || parts[i].indexOf('#') !== -1) {
return false
}
}
return true
return true
}

@@ -36,19 +34,19 @@

* Validate an array of topics to see if any of them is valid or not
* @param {Array} topics - Array of topics
* @param {Array} topics - Array of topics
* @returns {String} If the topics is valid, returns null. Otherwise, returns the invalid one
*/
function validateTopics (topics) {
if (topics.length === 0) {
return 'empty_topic_list'
}
for (let i = 0; i < topics.length; i++) {
if (!validateTopic(topics[i])) {
return topics[i]
}
}
return null
function validateTopics(topics) {
if (topics.length === 0) {
return 'empty_topic_list'
}
for (let i = 0; i < topics.length; i++) {
if (!validateTopic(topics[i])) {
return topics[i]
}
}
return null
}
module.exports = {
validateTopics: validateTopics
validateTopics,
}
{
"name": "mqtt",
"description": "A library for the MQTT protocol",
"version": "5.0.0-beta.2",
"version": "5.0.0-beta.3",
"contributors": [

@@ -26,3 +26,4 @@ "Adam Rudd <adamvrr@gmail.com>",

"scripts": {
"pretest": "standard | snazzy",
"lint": "eslint --ext .js .",
"lint-fix": "eslint --fix --ext .js .",
"typescript-compile-test": "tsc -p test/typescript/tsconfig.json",

@@ -66,3 +67,3 @@ "typescript-compile-execute": "node test/typescript/broker-connect-subscribe-and-publish.js",

"pre-commit": [
"pretest"
"lint"
],

@@ -93,55 +94,52 @@ "bin": {

"dependencies": {
"commist": "^1.0.0",
"commist": "^3.2.0",
"concat-stream": "^2.0.0",
"debug": "^4.1.1",
"duplexify": "^4.1.1",
"help-me": "^3.0.0",
"inherits": "^2.0.3",
"lru-cache": "^6.0.0",
"minimist": "^1.2.6",
"mqtt-packet": "^8.1.2",
"debug": "^4.3.4",
"duplexify": "^4.1.2",
"help-me": "^4.2.0",
"lru-cache": "^7.18.3",
"minimist": "^1.2.8",
"mqtt-packet": "^8.2.0",
"number-allocator": "^1.0.14",
"pump": "^3.0.0",
"readable-stream": "^4.1.0",
"readable-stream": "^4.4.2",
"reinterval": "^1.1.0",
"rfdc": "^1.3.0",
"split2": "^3.1.0",
"ws": "^7.5.5",
"xtend": "^4.0.2"
"split2": "^4.2.0",
"ws": "^8.13.0"
},
"devDependencies": {
"@release-it/conventional-changelog": "^5.1.1",
"@types/node": "^12.20.55",
"@types/tape": "^4.13.2",
"@types/ws": "^7.4.7",
"aedes": "^0.46.2",
"@release-it/conventional-changelog": "^6.0.0",
"@types/node": "^20.4.0",
"@types/tape": "^5.6.0",
"@types/ws": "^8.5.5",
"airtap": "^4.0.4",
"airtap-playwright": "^1.0.1",
"browserify": "^17.0.0",
"chai": "^4.2.0",
"chai": "^4.3.7",
"chokidar": "^3.5.3",
"codecov": "^3.0.4",
"codecov": "^3.8.2",
"conventional-changelog-cli": "^3.0.0",
"end-of-stream": "^1.4.1",
"global": "^4.3.2",
"mkdirp": "^0.5.1",
"mocha": "^9.2.0",
"mqtt-connection": "^4.0.0",
"end-of-stream": "^1.4.4",
"eslint": "^8.45.0",
"eslint-config-airbnb-base": "^15.0.0",
"eslint-config-prettier": "^8.8.0",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^5.0.0",
"global": "^4.4.0",
"mkdirp": "^3.0.1",
"mocha": "^10.2.0",
"mqtt-connection": "^4.1.0",
"mqtt-level-store": "^3.1.0",
"nyc": "^15.0.1",
"nyc": "^15.1.0",
"pre-commit": "^1.2.2",
"prettier": "^3.0.0",
"release-it": "^15.11.0",
"rimraf": "^3.0.2",
"should": "^13.2.1",
"sinon": "^9.0.0",
"rimraf": "^5.0.1",
"should": "^13.2.3",
"sinon": "^15.2.0",
"snazzy": "^9.0.0",
"standard": "^16.0.4",
"tape": "^5.5.2",
"terser": "^5.14.2",
"typescript": "^4.5.5"
},
"standard": {
"env": [
"mocha"
]
"tape": "^5.6.4",
"terser": "^5.18.2",
"typescript": "^5.1.6"
}
}

@@ -5,6 +5,11 @@ # ![mqtt.js](https://raw.githubusercontent.com/mqttjs/MQTT.js/137ee0e3940c1f01049a30248c70f24dc6e6f829/MQTT.js.png)

[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat)](http://standardjs.com/) [![Maintenance](https://img.shields.io/badge/Maintained%3F-yes-green.svg)](https://github.com/mqttjs/MQTT.js/graphs/commit-activity)
[![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](https://github.com/mqttjs/MQTT.js/pulls)\
![NPM Version](https://img.shields.io/npm/v/mqtt?logo=npm) ![NPM Downloads](https://img.shields.io/npm/dm/mqtt.svg)
MQTT.js is a client library for the [MQTT](http://mqtt.org/) protocol, written
in JavaScript for node.js and the browser.
> MQTT [5.0.0-beta.0](https://github.com/mqttjs/MQTT.js/releases/tag/v5.0.0-beta.0) is now available! Try it out and give us feedback! `npm i mqtt@5.0.0-beta.0`
> MQTT [5.0.0 BETA](https://www.npmjs.com/package/mqtt/v/beta) is now available! Try it out and give us [feedback](https://github.com/mqttjs/MQTT.js/issues/1639): `npm i mqtt@beta`

@@ -100,4 +105,4 @@ ## Table of Contents

client.on("connect", function () {
client.subscribe("presence", function (err) {
client.on("connect", () => {
client.subscribe("presence", (err) => {
if (!err) {

@@ -109,3 +114,3 @@ client.publish("presence", "Hello mqtt");

client.on("message", function (topic, message) {
client.on("message", (topic, message) => {
// message is Buffer

@@ -306,19 +311,20 @@ console.log(message.toString());

- <a href="#connect"><code>mqtt.<b>connect()</b></code></a>
- <a href="#client"><code>mqtt.<b>Client()</b></code></a>
- <a href="#publish"><code>mqtt.Client#<b>publish()</b></code></a>
- <a href="#subscribe"><code>mqtt.Client#<b>subscribe()</b></code></a>
- <a href="#unsubscribe"><code>mqtt.Client#<b>unsubscribe()</b></code></a>
- <a href="#end"><code>mqtt.Client#<b>end()</b></code></a>
- <a href="#removeOutgoingMessage"><code>mqtt.Client#<b>removeOutgoingMessage()</b></code></a>
- <a href="#reconnect"><code>mqtt.Client#<b>reconnect()</b></code></a>
- <a href="#handleMessage"><code>mqtt.Client#<b>handleMessage()</b></code></a>
- <a href="#connected"><code>mqtt.Client#<b>connected</b></code></a>
- <a href="#reconnecting"><code>mqtt.Client#<b>reconnecting</b></code></a>
- <a href="#getLastMessageId"><code>mqtt.Client#<b>getLastMessageId()</b></code></a>
- <a href="#store"><code>mqtt.<b>Store()</b></code></a>
- <a href="#put"><code>mqtt.Store#<b>put()</b></code></a>
- <a href="#del"><code>mqtt.Store#<b>del()</b></code></a>
- <a href="#createStream"><code>mqtt.Store#<b>createStream()</b></code></a>
- <a href="#close"><code>mqtt.Store#<b>close()</b></code></a>
- [`mqtt.connect()`](#connect)
- [`mqtt.Client()`](#client)
- [`mqtt.Client#connect()`](#client-connect)
- [`mqtt.Client#publish()`](#publish)
- [`mqtt.Client#subscribe()`](#subscribe)
- [`mqtt.Client#unsubscribe()`](#unsubscribe)
- [`mqtt.Client#end()`](#end)
- [`mqtt.Client#removeOutgoingMessage()`](#removeOutgoingMessage)
- [`mqtt.Client#reconnect()`](#reconnect)
- [`mqtt.Client#handleMessage()`](#handleMessage)
- [`mqtt.Client#connected`](#connected)
- [`mqtt.Client#reconnecting`](#reconnecting)
- [`mqtt.Client#getLastMessageId()`](#getLastMessageId)
- [`mqtt.Store()`](#store)
- [`mqtt.Store#put()`](#put)
- [`mqtt.Store#del()`](#del)
- [`mqtt.Store#createStream()`](#createStream)
- [`mqtt.Store#close()`](#close)

@@ -370,3 +376,3 @@ ---

- `wsOptions`: is the WebSocket connection options. Default is `{}`.
It's specific for WebSockets. For possible options have a look at: https://github.com/websockets/ws/blob/master/doc/ws.md.
It's specific for WebSockets. For possible options have a look at: <https://github.com/websockets/ws/blob/master/doc/ws.md>.
- `keepalive`: `60` seconds, set to `0` to disable

@@ -389,5 +395,7 @@ - `reschedulePings`: reschedule ping messages after sending packets (default `true`)

- `customHandleAcks`: MQTT 5 feature of custom handling puback and pubrec packets. Its callback:
```js
customHandleAcks: function(topic, message, packet, done) {/*some logic wit colling done(error, reasonCode)*/}
```
- `autoUseTopicAlias`: enabling automatic Topic Alias using functionality

@@ -427,2 +435,4 @@ - `autoAssignTopicAlias`: enabling automatic Topic Alias assign functionality

- `messageIdProvider`: custom messageId provider. when `new UniqueMessageIdProvider()` is set, then non conflict messageId is provided.
- `log`: custom log function. Default uses [debug](https://www.npmjs.com/package/debug) package.
- `manualConnect`: prevents the constructor to call `connect`. In this case after the `mqtt.connect` is called you should call `client.connect` manually.

@@ -501,3 +511,3 @@ In case mqtts (mqtt over tls) is required, the `options` object is

Emitted when <a href="#end"><code>mqtt.Client#<b>end()</b></code></a> is called.
Emitted when [`mqtt.Client#end()`](#end) is called.
If a callback was passed to `mqtt.Client#end()`, this event is emitted once the

@@ -540,2 +550,8 @@ callback returns.

<a name="client-connect"></a>
### mqtt.Client#connect()
By default client connects when constructor is called. To prevent this you can set `manualConnect` option to `true` and call `client.connect()` manually.
<a name="publish"></a>

@@ -752,5 +768,5 @@

The MQTT.js bundle is available through http://unpkg.com, specifically
at https://unpkg.com/mqtt/dist/mqtt.min.js.
See http://unpkg.com for the full documentation on version ranges.
The MQTT.js bundle is available through <http://unpkg.com>, specifically
at <https://unpkg.com/mqtt/dist/mqtt.min.js>.
See <http://unpkg.com> for the full documentation on version ranges.

@@ -757,0 +773,0 @@ <a name="browserify"></a>

@@ -64,2 +64,4 @@ import { MqttClient } from './client'

log?: (...args: any[]) => void
autoUseTopicAlias?: boolean

@@ -109,3 +111,8 @@ autoAssignTopicAlias?: boolean

userProperties?: UserProperties
}
},
authPacket?: any
/** Prevent to call `connect` in constructor */
manualConnect?: boolean
}

@@ -201,2 +208,11 @@ transformWsUrl?: (url: string, options: IClientOptions, client: MqttClient) => string,

}
export interface IClientSubscribeProperties {
/*
* MQTT 5.0 properies object of subscribe
* */
properties?: {
subscriptionIdentifier?: number,
userProperties?: UserProperties
}
}
export interface IClientReconnectOptions {

@@ -203,0 +219,0 @@ /**

@@ -8,2 +8,3 @@ /// <reference types="node" />

IClientSubscribeOptions,
IClientSubscribeProperties,
IClientReconnectOptions

@@ -102,2 +103,4 @@ } from './client-options'

static defaultId (): string
constructor (streamBuilder: (client: MqttClient) => IStream, options: IClientOptions)

@@ -124,2 +127,7 @@

/**
* Setup the event handlers in the inner stream, sends `connect` and `auth` packets
*/
public connect(): this
/**
* publish - publish <message> to <topic>

@@ -170,3 +178,7 @@ *

string
| string[], opts: IClientSubscribeOptions, callback?: ClientSubscribeCallback): this
| string[]
| ISubscriptionMap,
opts:
IClientSubscribeOptions
| IClientSubscribeProperties, callback?: ClientSubscribeCallback): this
public subscribe (topic:

@@ -173,0 +185,0 @@ string

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc