Comparing version 5.0.0-beta.2 to 5.0.0-beta.3
#!/usr/bin/env node | ||
'use strict' | ||
@@ -13,4 +12,4 @@ /* | ||
const helpMe = require('help-me')({ | ||
dir: path.join(path.dirname(require.main.filename), '/../doc'), | ||
ext: '.txt' | ||
dir: path.join(path.dirname(require.main.filename), '/../doc'), | ||
ext: '.txt', | ||
}) | ||
@@ -20,4 +19,5 @@ | ||
commist.register('subscribe', require('./sub')) | ||
commist.register('version', function () { | ||
console.log('MQTT.js version:', require('./../package.json').version) | ||
commist.register('version', () => { | ||
console.log('MQTT.js version:', require('../package.json').version) | ||
}) | ||
@@ -27,4 +27,4 @@ commist.register('help', helpMe.toStdout) | ||
if (commist.parse(process.argv.slice(2)) !== null) { | ||
console.log('No such command:', process.argv[2], '\n') | ||
helpMe.toStdout() | ||
console.log('No such command:', process.argv[2], '\n') | ||
helpMe.toStdout() | ||
} |
241
bin/pub.js
#!/usr/bin/env node | ||
'use strict' | ||
const mqtt = require('../') | ||
const pump = require('pump') | ||
const mqtt = require('..') | ||
const { pipeline, Writable } = require('readable-stream') | ||
const path = require('path') | ||
const fs = require('fs') | ||
const concat = require('concat-stream') | ||
const Writable = require('readable-stream').Writable | ||
const helpMe = require('help-me')({ | ||
dir: path.join(__dirname, '..', 'doc') | ||
dir: path.join(__dirname, '..', 'doc'), | ||
}) | ||
@@ -17,125 +14,141 @@ const minimist = require('minimist') | ||
function send (args) { | ||
const client = mqtt.connect(args) | ||
client.on('connect', function () { | ||
client.publish(args.topic, args.message, args, function (err) { | ||
if (err) { | ||
console.warn(err) | ||
} | ||
client.end() | ||
}) | ||
}) | ||
client.on('error', function (err) { | ||
console.warn(err) | ||
client.end() | ||
}) | ||
function send(args) { | ||
const client = mqtt.connect(args) | ||
client.on('connect', () => { | ||
client.publish(args.topic, args.message, args, (err) => { | ||
if (err) { | ||
console.warn(err) | ||
} | ||
client.end() | ||
}) | ||
}) | ||
client.on('error', (err) => { | ||
console.warn(err) | ||
client.end() | ||
}) | ||
} | ||
function multisend (args) { | ||
const client = mqtt.connect(args) | ||
const sender = new Writable({ | ||
objectMode: true | ||
}) | ||
sender._write = function (line, enc, cb) { | ||
client.publish(args.topic, line.trim(), args, cb) | ||
} | ||
function multisend(args) { | ||
const client = mqtt.connect(args) | ||
const sender = new Writable({ | ||
objectMode: true, | ||
}) | ||
sender._write = (line, enc, cb) => { | ||
client.publish(args.topic, line.trim(), args, cb) | ||
} | ||
client.on('connect', function () { | ||
pump(process.stdin, split2(), sender, function (err) { | ||
client.end() | ||
if (err) { | ||
throw err | ||
} | ||
}) | ||
}) | ||
client.on('connect', () => { | ||
pipeline(process.stdin, split2(), sender, (err) => { | ||
client.end() | ||
if (err) { | ||
throw err | ||
} | ||
}) | ||
}) | ||
} | ||
function start (args) { | ||
args = minimist(args, { | ||
string: ['hostname', 'username', 'password', 'key', 'cert', 'ca', 'message', 'clientId', 'i', 'id'], | ||
boolean: ['stdin', 'retain', 'help', 'insecure', 'multiline'], | ||
alias: { | ||
port: 'p', | ||
hostname: ['h', 'host'], | ||
topic: 't', | ||
message: 'm', | ||
qos: 'q', | ||
clientId: ['i', 'id'], | ||
retain: 'r', | ||
username: 'u', | ||
password: 'P', | ||
stdin: 's', | ||
multiline: 'M', | ||
protocol: ['C', 'l'], | ||
help: 'H', | ||
ca: 'cafile' | ||
}, | ||
default: { | ||
host: 'localhost', | ||
qos: 0, | ||
retain: false, | ||
topic: '', | ||
message: '' | ||
} | ||
}) | ||
function start(args) { | ||
args = minimist(args, { | ||
string: [ | ||
'hostname', | ||
'username', | ||
'password', | ||
'key', | ||
'cert', | ||
'ca', | ||
'message', | ||
'clientId', | ||
'i', | ||
'id', | ||
], | ||
boolean: ['stdin', 'retain', 'help', 'insecure', 'multiline'], | ||
alias: { | ||
port: 'p', | ||
hostname: ['h', 'host'], | ||
topic: 't', | ||
message: 'm', | ||
qos: 'q', | ||
clientId: ['i', 'id'], | ||
retain: 'r', | ||
username: 'u', | ||
password: 'P', | ||
stdin: 's', | ||
multiline: 'M', | ||
protocol: ['C', 'l'], | ||
help: 'H', | ||
ca: 'cafile', | ||
}, | ||
default: { | ||
host: 'localhost', | ||
qos: 0, | ||
retain: false, | ||
topic: '', | ||
message: '', | ||
}, | ||
}) | ||
if (args.help) { | ||
return helpMe.toStdout('publish') | ||
} | ||
if (args.help) { | ||
return helpMe.toStdout('publish') | ||
} | ||
if (args.key) { | ||
args.key = fs.readFileSync(args.key) | ||
} | ||
if (args.key) { | ||
args.key = fs.readFileSync(args.key) | ||
} | ||
if (args.cert) { | ||
args.cert = fs.readFileSync(args.cert) | ||
} | ||
if (args.cert) { | ||
args.cert = fs.readFileSync(args.cert) | ||
} | ||
if (args.ca) { | ||
args.ca = fs.readFileSync(args.ca) | ||
} | ||
if (args.ca) { | ||
args.ca = fs.readFileSync(args.ca) | ||
} | ||
if (args.key && args.cert && !args.protocol) { | ||
args.protocol = 'mqtts' | ||
} | ||
if (args.key && args.cert && !args.protocol) { | ||
args.protocol = 'mqtts' | ||
} | ||
if (args.port) { | ||
if (typeof args.port !== 'number') { | ||
console.warn('# Port: number expected, \'%s\' was given.', typeof args.port) | ||
return | ||
} | ||
} | ||
if (args.port) { | ||
if (typeof args.port !== 'number') { | ||
console.warn( | ||
"# Port: number expected, '%s' was given.", | ||
typeof args.port, | ||
) | ||
return | ||
} | ||
} | ||
if (args['will-topic']) { | ||
args.will = {} | ||
args.will.topic = args['will-topic'] | ||
args.will.payload = args['will-message'] | ||
args.will.qos = args['will-qos'] | ||
args.will.retain = args['will-retain'] | ||
} | ||
if (args['will-topic']) { | ||
args.will = {} | ||
args.will.topic = args['will-topic'] | ||
args.will.payload = args['will-message'] | ||
args.will.qos = args['will-qos'] | ||
args.will.retain = args['will-retain'] | ||
} | ||
if (args.insecure) { | ||
args.rejectUnauthorized = false | ||
} | ||
if (args.insecure) { | ||
args.rejectUnauthorized = false | ||
} | ||
args.topic = (args.topic || args._.shift()).toString() | ||
args.message = (args.message || args._.shift()).toString() | ||
args.topic = (args.topic || args._.shift()).toString() | ||
args.message = (args.message || args._.shift()).toString() | ||
if (!args.topic) { | ||
console.error('missing topic\n') | ||
return helpMe.toStdout('publish') | ||
} | ||
if (!args.topic) { | ||
console.error('missing topic\n') | ||
return helpMe.toStdout('publish') | ||
} | ||
if (args.stdin) { | ||
if (args.multiline) { | ||
multisend(args) | ||
} else { | ||
process.stdin.pipe(concat(function (data) { | ||
args.message = data | ||
send(args) | ||
})) | ||
} | ||
} else { | ||
send(args) | ||
} | ||
if (args.stdin) { | ||
if (args.multiline) { | ||
multisend(args) | ||
} else { | ||
process.stdin.pipe( | ||
concat((data) => { | ||
args.message = data | ||
send(args) | ||
}), | ||
) | ||
} | ||
} else { | ||
send(args) | ||
} | ||
} | ||
@@ -146,3 +159,3 @@ | ||
if (require.main === module) { | ||
start(process.argv.slice(2)) | ||
start(process.argv.slice(2)) | ||
} |
204
bin/sub.js
#!/usr/bin/env node | ||
const mqtt = require('../') | ||
const mqtt = require('..') | ||
const path = require('path') | ||
const fs = require('fs') | ||
const helpMe = require('help-me')({ | ||
dir: path.join(__dirname, '..', 'doc') | ||
dir: path.join(__dirname, '..', 'doc'), | ||
}) | ||
const minimist = require('minimist') | ||
function start (args) { | ||
args = minimist(args, { | ||
string: ['hostname', 'username', 'password', 'key', 'cert', 'ca', 'clientId', 'i', 'id'], | ||
boolean: ['stdin', 'help', 'clean', 'insecure'], | ||
alias: { | ||
port: 'p', | ||
hostname: ['h', 'host'], | ||
topic: 't', | ||
qos: 'q', | ||
clean: 'c', | ||
keepalive: 'k', | ||
clientId: ['i', 'id'], | ||
username: 'u', | ||
password: 'P', | ||
protocol: ['C', 'l'], | ||
verbose: 'v', | ||
help: '-H', | ||
ca: 'cafile' | ||
}, | ||
default: { | ||
host: 'localhost', | ||
qos: 0, | ||
retain: false, | ||
clean: true, | ||
keepAlive: 30 // 30 sec | ||
} | ||
}) | ||
function start(args) { | ||
args = minimist(args, { | ||
string: [ | ||
'hostname', | ||
'username', | ||
'password', | ||
'key', | ||
'cert', | ||
'ca', | ||
'clientId', | ||
'i', | ||
'id', | ||
], | ||
boolean: ['stdin', 'help', 'clean', 'insecure'], | ||
alias: { | ||
port: 'p', | ||
hostname: ['h', 'host'], | ||
topic: 't', | ||
qos: 'q', | ||
clean: 'c', | ||
keepalive: 'k', | ||
clientId: ['i', 'id'], | ||
username: 'u', | ||
password: 'P', | ||
protocol: ['C', 'l'], | ||
verbose: 'v', | ||
help: '-H', | ||
ca: 'cafile', | ||
}, | ||
default: { | ||
host: 'localhost', | ||
qos: 0, | ||
retain: false, | ||
clean: true, | ||
keepAlive: 30, // 30 sec | ||
}, | ||
}) | ||
if (args.help) { | ||
return helpMe.toStdout('subscribe') | ||
} | ||
if (args.help) { | ||
return helpMe.toStdout('subscribe') | ||
} | ||
args.topic = args.topic || args._.shift() | ||
args.topic = args.topic || args._.shift() | ||
if (!args.topic) { | ||
console.error('missing topic\n') | ||
return helpMe.toStdout('subscribe') | ||
} | ||
if (!args.topic) { | ||
console.error('missing topic\n') | ||
return helpMe.toStdout('subscribe') | ||
} | ||
if (args.key) { | ||
args.key = fs.readFileSync(args.key) | ||
} | ||
if (args.key) { | ||
args.key = fs.readFileSync(args.key) | ||
} | ||
if (args.cert) { | ||
args.cert = fs.readFileSync(args.cert) | ||
} | ||
if (args.cert) { | ||
args.cert = fs.readFileSync(args.cert) | ||
} | ||
if (args.ca) { | ||
args.ca = fs.readFileSync(args.ca) | ||
} | ||
if (args.ca) { | ||
args.ca = fs.readFileSync(args.ca) | ||
} | ||
if (args.key && args.cert && !args.protocol) { | ||
args.protocol = 'mqtts' | ||
} | ||
if (args.key && args.cert && !args.protocol) { | ||
args.protocol = 'mqtts' | ||
} | ||
if (args.insecure) { | ||
args.rejectUnauthorized = false | ||
} | ||
if (args.insecure) { | ||
args.rejectUnauthorized = false | ||
} | ||
if (args.port) { | ||
if (typeof args.port !== 'number') { | ||
console.warn('# Port: number expected, \'%s\' was given.', typeof args.port) | ||
return | ||
} | ||
} | ||
if (args.port) { | ||
if (typeof args.port !== 'number') { | ||
console.warn( | ||
"# Port: number expected, '%s' was given.", | ||
typeof args.port, | ||
) | ||
return | ||
} | ||
} | ||
if (args['will-topic']) { | ||
args.will = {} | ||
args.will.topic = args['will-topic'] | ||
args.will.payload = args['will-message'] | ||
args.will.qos = args['will-qos'] | ||
args.will.retain = args['will-retain'] | ||
} | ||
if (args['will-topic']) { | ||
args.will = {} | ||
args.will.topic = args['will-topic'] | ||
args.will.payload = args['will-message'] | ||
args.will.qos = args['will-qos'] | ||
args.will.retain = args['will-retain'] | ||
} | ||
args.keepAlive = args['keep-alive'] | ||
args.keepAlive = args['keep-alive'] | ||
const client = mqtt.connect(args) | ||
const client = mqtt.connect(args) | ||
client.on('connect', function () { | ||
client.subscribe(args.topic, { qos: args.qos }, function (err, result) { | ||
if (err) { | ||
console.error(err) | ||
process.exit(1) | ||
} | ||
client.on('connect', () => { | ||
client.subscribe(args.topic, { qos: args.qos }, (err, result) => { | ||
if (err) { | ||
console.error(err) | ||
process.exit(1) | ||
} | ||
result.forEach(function (sub) { | ||
if (sub.qos > 2) { | ||
console.error('subscription negated to', sub.topic, 'with code', sub.qos) | ||
process.exit(1) | ||
} | ||
}) | ||
}) | ||
}) | ||
result.forEach((sub) => { | ||
if (sub.qos > 2) { | ||
console.error( | ||
'subscription negated to', | ||
sub.topic, | ||
'with code', | ||
sub.qos, | ||
) | ||
process.exit(1) | ||
} | ||
}) | ||
}) | ||
}) | ||
client.on('message', function (topic, payload) { | ||
if (args.verbose) { | ||
console.log(topic, payload.toString()) | ||
} else { | ||
console.log(payload.toString()) | ||
} | ||
}) | ||
client.on('message', (topic, payload) => { | ||
if (args.verbose) { | ||
console.log(topic, payload.toString()) | ||
} else { | ||
console.log(payload.toString()) | ||
} | ||
}) | ||
client.on('error', function (err) { | ||
console.warn(err) | ||
client.end() | ||
}) | ||
client.on('error', (err) => { | ||
console.warn(err) | ||
client.end() | ||
}) | ||
} | ||
@@ -122,3 +140,3 @@ | ||
if (require.main === module) { | ||
start(process.argv.slice(2)) | ||
start(process.argv.slice(2)) | ||
} |
3206
lib/client.js
@@ -1,243 +0,51 @@ | ||
'use strict' | ||
/** | ||
* Module dependencies | ||
*/ | ||
const EventEmitter = require('events').EventEmitter | ||
const Store = require('./store') | ||
const { EventEmitter } = require('events') | ||
const TopicAliasRecv = require('./topic-alias-recv') | ||
const TopicAliasSend = require('./topic-alias-send') | ||
const mqttPacket = require('mqtt-packet') | ||
const DefaultMessageIdProvider = require('./default-message-id-provider') | ||
const Writable = require('readable-stream').Writable | ||
const inherits = require('inherits') | ||
const { Writable } = require('readable-stream') | ||
const reInterval = require('reinterval') | ||
const clone = require('rfdc/default') | ||
const validations = require('./validations') | ||
const xtend = require('xtend') | ||
const debug = require('debug')('mqttjs:client') | ||
const nextTick = process ? process.nextTick : function (callback) { setTimeout(callback, 0) } | ||
const setImmediate = global.setImmediate || function (...args) { | ||
const callback = args.shift() | ||
nextTick(callback.bind(null, ...args)) | ||
} | ||
const Store = require('./store') | ||
const handlePacket = require('./handlers') | ||
const nextTick = process | ||
? process.nextTick | ||
: (callback) => { | ||
setTimeout(callback, 0) | ||
} | ||
const setImmediate = | ||
global.setImmediate || | ||
((...args) => { | ||
const callback = args.shift() | ||
nextTick(() => { | ||
callback(...args) | ||
}) | ||
}) | ||
const defaultConnectOptions = { | ||
keepalive: 60, | ||
reschedulePings: true, | ||
protocolId: 'MQTT', | ||
protocolVersion: 4, | ||
reconnectPeriod: 1000, | ||
connectTimeout: 30 * 1000, | ||
clean: true, | ||
resubscribe: true, | ||
writeCache: true | ||
keepalive: 60, | ||
reschedulePings: true, | ||
protocolId: 'MQTT', | ||
protocolVersion: 4, | ||
reconnectPeriod: 1000, | ||
connectTimeout: 30 * 1000, | ||
clean: true, | ||
resubscribe: true, | ||
writeCache: true, | ||
} | ||
const socketErrors = [ | ||
'ECONNREFUSED', | ||
'EADDRINUSE', | ||
'ECONNRESET', | ||
'ENOTFOUND', | ||
'ETIMEDOUT' | ||
'ECONNREFUSED', | ||
'EADDRINUSE', | ||
'ECONNRESET', | ||
'ENOTFOUND', | ||
'ETIMEDOUT', | ||
] | ||
// Other Socket Errors: EADDRINUSE, ECONNRESET, ENOTFOUND, ETIMEDOUT. | ||
const errors = { | ||
0: '', | ||
1: 'Unacceptable protocol version', | ||
2: 'Identifier rejected', | ||
3: 'Server unavailable', | ||
4: 'Bad username or password', | ||
5: 'Not authorized', | ||
16: 'No matching subscribers', | ||
17: 'No subscription existed', | ||
128: 'Unspecified error', | ||
129: 'Malformed Packet', | ||
130: 'Protocol Error', | ||
131: 'Implementation specific error', | ||
132: 'Unsupported Protocol Version', | ||
133: 'Client Identifier not valid', | ||
134: 'Bad User Name or Password', | ||
135: 'Not authorized', | ||
136: 'Server unavailable', | ||
137: 'Server busy', | ||
138: 'Banned', | ||
139: 'Server shutting down', | ||
140: 'Bad authentication method', | ||
141: 'Keep Alive timeout', | ||
142: 'Session taken over', | ||
143: 'Topic Filter invalid', | ||
144: 'Topic Name invalid', | ||
145: 'Packet identifier in use', | ||
146: 'Packet Identifier not found', | ||
147: 'Receive Maximum exceeded', | ||
148: 'Topic Alias invalid', | ||
149: 'Packet too large', | ||
150: 'Message rate too high', | ||
151: 'Quota exceeded', | ||
152: 'Administrative action', | ||
153: 'Payload format invalid', | ||
154: 'Retain not supported', | ||
155: 'QoS not supported', | ||
156: 'Use another server', | ||
157: 'Server moved', | ||
158: 'Shared Subscriptions not supported', | ||
159: 'Connection rate exceeded', | ||
160: 'Maximum connect time', | ||
161: 'Subscription Identifiers not supported', | ||
162: 'Wildcard Subscriptions not supported' | ||
} | ||
function defaultId () { | ||
return 'mqttjs_' + Math.random().toString(16).substr(2, 8) | ||
} | ||
function applyTopicAlias (client, packet) { | ||
if (client.options.protocolVersion === 5) { | ||
if (packet.cmd === 'publish') { | ||
let alias | ||
if (packet.properties) { | ||
alias = packet.properties.topicAlias | ||
} | ||
const topic = packet.topic.toString() | ||
if (client.topicAliasSend) { | ||
if (alias) { | ||
if (topic.length !== 0) { | ||
// register topic alias | ||
debug('applyTopicAlias :: register topic: %s - alias: %d', topic, alias) | ||
if (!client.topicAliasSend.put(topic, alias)) { | ||
debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias) | ||
return new Error('Sending Topic Alias out of range') | ||
} | ||
} | ||
} else { | ||
if (topic.length !== 0) { | ||
if (client.options.autoAssignTopicAlias) { | ||
alias = client.topicAliasSend.getAliasByTopic(topic) | ||
if (alias) { | ||
packet.topic = '' | ||
packet.properties = { ...(packet.properties), topicAlias: alias } | ||
debug('applyTopicAlias :: auto assign(use) topic: %s - alias: %d', topic, alias) | ||
} else { | ||
alias = client.topicAliasSend.getLruAlias() | ||
client.topicAliasSend.put(topic, alias) | ||
packet.properties = { ...(packet.properties), topicAlias: alias } | ||
debug('applyTopicAlias :: auto assign topic: %s - alias: %d', topic, alias) | ||
} | ||
} else if (client.options.autoUseTopicAlias) { | ||
alias = client.topicAliasSend.getAliasByTopic(topic) | ||
if (alias) { | ||
packet.topic = '' | ||
packet.properties = { ...(packet.properties), topicAlias: alias } | ||
debug('applyTopicAlias :: auto use topic: %s - alias: %d', topic, alias) | ||
} | ||
} | ||
} | ||
} | ||
} else if (alias) { | ||
debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias) | ||
return new Error('Sending Topic Alias out of range') | ||
} | ||
} | ||
} | ||
} | ||
function removeTopicAliasAndRecoverTopicName (client, packet) { | ||
let alias | ||
if (packet.properties) { | ||
alias = packet.properties.topicAlias | ||
} | ||
let topic = packet.topic.toString() | ||
if (topic.length === 0) { | ||
// restore topic from alias | ||
if (typeof alias === 'undefined') { | ||
return new Error('Unregistered Topic Alias') | ||
} else { | ||
topic = client.topicAliasSend.getTopicByAlias(alias) | ||
if (typeof topic === 'undefined') { | ||
return new Error('Unregistered Topic Alias') | ||
} else { | ||
packet.topic = topic | ||
} | ||
} | ||
} | ||
if (alias) { | ||
delete packet.properties.topicAlias | ||
} | ||
} | ||
function sendPacket (client, packet, cb) { | ||
debug('sendPacket :: packet: %O', packet) | ||
debug('sendPacket :: emitting `packetsend`') | ||
client.emit('packetsend', packet) | ||
debug('sendPacket :: writing to stream') | ||
const result = mqttPacket.writeToStream(packet, client.stream, client.options) | ||
debug('sendPacket :: writeToStream result %s', result) | ||
if (!result && cb && cb !== nop) { | ||
debug('sendPacket :: handle events on `drain` once through callback.') | ||
client.stream.once('drain', cb) | ||
} else if (cb) { | ||
debug('sendPacket :: invoking cb') | ||
cb() | ||
} | ||
} | ||
function flush (queue) { | ||
if (queue) { | ||
debug('flush: queue exists? %b', !!(queue)) | ||
Object.keys(queue).forEach(function (messageId) { | ||
if (typeof queue[messageId].cb === 'function') { | ||
queue[messageId].cb(new Error('Connection closed')) | ||
// This is suspicious. Why do we only delete this if we have a callbck? | ||
// If this is by-design, then adding no as callback would cause this to get deleted unintentionally. | ||
delete queue[messageId] | ||
} | ||
}) | ||
} | ||
} | ||
function flushVolatile (queue) { | ||
if (queue) { | ||
debug('flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function') | ||
Object.keys(queue).forEach(function (messageId) { | ||
if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') { | ||
queue[messageId].cb(new Error('Connection closed')) | ||
delete queue[messageId] | ||
} | ||
}) | ||
} | ||
} | ||
function storeAndSend (client, packet, cb, cbStorePut) { | ||
debug('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd) | ||
let storePacket = packet | ||
let err | ||
if (storePacket.cmd === 'publish') { | ||
// The original packet is for sending. | ||
// The cloned storePacket is for storing to resend on reconnect. | ||
// Topic Alias must not be used after disconnected. | ||
storePacket = clone(packet) | ||
err = removeTopicAliasAndRecoverTopicName(client, storePacket) | ||
if (err) { | ||
return cb && cb(err) | ||
} | ||
} | ||
client.outgoingStore.put(storePacket, function storedPacket (err) { | ||
if (err) { | ||
return cb && cb(err) | ||
} | ||
cbStorePut() | ||
sendPacket(client, packet, cb) | ||
}) | ||
} | ||
function nop (error) { | ||
debug('nop ::', error) | ||
} | ||
/** | ||
@@ -250,1670 +58,1612 @@ * MqttClient constructor | ||
*/ | ||
function MqttClient (streamBuilder, options) { | ||
let k | ||
const that = this | ||
class MqttClient extends EventEmitter { | ||
static defaultId() { | ||
return `mqttjs_${Math.random().toString(16).substr(2, 8)}` | ||
} | ||
if (!(this instanceof MqttClient)) { | ||
return new MqttClient(streamBuilder, options) | ||
} | ||
constructor(streamBuilder, options) { | ||
super() | ||
this.options = options || {} | ||
let k | ||
// Defaults | ||
for (k in defaultConnectOptions) { | ||
if (typeof this.options[k] === 'undefined') { | ||
this.options[k] = defaultConnectOptions[k] | ||
} else { | ||
this.options[k] = options[k] | ||
} | ||
} | ||
this.options = options || {} | ||
debug('MqttClient :: options.protocol', options.protocol) | ||
debug('MqttClient :: options.protocolVersion', options.protocolVersion) | ||
debug('MqttClient :: options.username', options.username) | ||
debug('MqttClient :: options.keepalive', options.keepalive) | ||
debug('MqttClient :: options.reconnectPeriod', options.reconnectPeriod) | ||
debug('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized) | ||
debug('MqttClient :: options.properties.topicAliasMaximum', options.properties ? options.properties.topicAliasMaximum : undefined) | ||
// Defaults | ||
for (k in defaultConnectOptions) { | ||
if (typeof this.options[k] === 'undefined') { | ||
this.options[k] = defaultConnectOptions[k] | ||
} else { | ||
this.options[k] = options[k] | ||
} | ||
} | ||
this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId() | ||
this.log = this.options.log || debug | ||
this.noop = this._noop.bind(this) | ||
debug('MqttClient :: clientId', this.options.clientId) | ||
this.log('MqttClient :: options.protocol', options.protocol) | ||
this.log( | ||
'MqttClient :: options.protocolVersion', | ||
options.protocolVersion, | ||
) | ||
this.log('MqttClient :: options.username', options.username) | ||
this.log('MqttClient :: options.keepalive', options.keepalive) | ||
this.log( | ||
'MqttClient :: options.reconnectPeriod', | ||
options.reconnectPeriod, | ||
) | ||
this.log( | ||
'MqttClient :: options.rejectUnauthorized', | ||
options.rejectUnauthorized, | ||
) | ||
this.log( | ||
'MqttClient :: options.properties.topicAliasMaximum', | ||
options.properties | ||
? options.properties.topicAliasMaximum | ||
: undefined, | ||
) | ||
this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) } | ||
this.options.clientId = | ||
typeof options.clientId === 'string' | ||
? options.clientId | ||
: MqttClient.defaultId() | ||
// Disable pre-generated write cache if requested. Will allocate buffers on-the-fly instead. WARNING: This can affect write performance | ||
if (!this.options.writeCache) { | ||
mqttPacket.writeToStream.cacheNumbers = false | ||
} | ||
this.log('MqttClient :: clientId', this.options.clientId) | ||
this.streamBuilder = streamBuilder | ||
this.options.customHandleAcks = | ||
options.protocolVersion === 5 && options.customHandleAcks | ||
? options.customHandleAcks | ||
: (...args) => { | ||
args[3](0) | ||
} | ||
this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider | ||
// Disable pre-generated write cache if requested. Will allocate buffers on-the-fly instead. WARNING: This can affect write performance | ||
if (!this.options.writeCache) { | ||
mqttPacket.writeToStream.cacheNumbers = false | ||
} | ||
// Inflight message storages | ||
this.outgoingStore = options.outgoingStore || new Store() | ||
this.incomingStore = options.incomingStore || new Store() | ||
this.streamBuilder = streamBuilder | ||
// Should QoS zero messages be queued when the connection is broken? | ||
this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero | ||
this.messageIdProvider = | ||
typeof this.options.messageIdProvider === 'undefined' | ||
? new DefaultMessageIdProvider() | ||
: this.options.messageIdProvider | ||
// map of subscribed topics to support reconnection | ||
this._resubscribeTopics = {} | ||
// Inflight message storages | ||
this.outgoingStore = options.outgoingStore || new Store() | ||
this.incomingStore = options.incomingStore || new Store() | ||
// map of a subscribe messageId and a topic | ||
this.messageIdToTopic = {} | ||
// Should QoS zero messages be queued when the connection is broken? | ||
this.queueQoSZero = | ||
options.queueQoSZero === undefined ? true : options.queueQoSZero | ||
// Ping timer, setup in _setupPingTimer | ||
this.pingTimer = null | ||
// Is the client connected? | ||
this.connected = false | ||
// Are we disconnecting? | ||
this.disconnecting = false | ||
// Packet queue | ||
this.queue = [] | ||
// connack timer | ||
this.connackTimer = null | ||
// Reconnect timer | ||
this.reconnectTimer = null | ||
// Is processing store? | ||
this._storeProcessing = false | ||
// Packet Ids are put into the store during store processing | ||
this._packetIdsDuringStoreProcessing = {} | ||
// Store processing queue | ||
this._storeProcessingQueue = [] | ||
// map of subscribed topics to support reconnection | ||
this._resubscribeTopics = {} | ||
// Inflight callbacks | ||
this.outgoing = {} | ||
// map of a subscribe messageId and a topic | ||
this.messageIdToTopic = {} | ||
// True if connection is first time. | ||
this._firstConnection = true | ||
// Ping timer, setup in _setupPingTimer | ||
this.pingTimer = null | ||
// Is the client connected? | ||
this.connected = false | ||
// Are we disconnecting? | ||
this.disconnecting = false | ||
// Packet queue | ||
this.queue = [] | ||
// connack timer | ||
this.connackTimer = null | ||
// Reconnect timer | ||
this.reconnectTimer = null | ||
// Is processing store? | ||
this._storeProcessing = false | ||
// Packet Ids are put into the store during store processing | ||
this._packetIdsDuringStoreProcessing = {} | ||
// Store processing queue | ||
this._storeProcessingQueue = [] | ||
if (options.properties && options.properties.topicAliasMaximum > 0) { | ||
if (options.properties.topicAliasMaximum > 0xffff) { | ||
debug('MqttClient :: options.properties.topicAliasMaximum is out of range') | ||
} else { | ||
this.topicAliasRecv = new TopicAliasRecv(options.properties.topicAliasMaximum) | ||
} | ||
} | ||
// Inflight callbacks | ||
this.outgoing = {} | ||
// Send queued packets | ||
this.on('connect', function () { | ||
const queue = that.queue | ||
// True if connection is first time. | ||
this._firstConnection = true | ||
function deliver () { | ||
const entry = queue.shift() | ||
debug('deliver :: entry %o', entry) | ||
let packet = null | ||
if (options.properties && options.properties.topicAliasMaximum > 0) { | ||
if (options.properties.topicAliasMaximum > 0xffff) { | ||
this.log( | ||
'MqttClient :: options.properties.topicAliasMaximum is out of range', | ||
) | ||
} else { | ||
this.topicAliasRecv = new TopicAliasRecv( | ||
options.properties.topicAliasMaximum, | ||
) | ||
} | ||
} | ||
if (!entry) { | ||
that._resubscribe() | ||
return | ||
} | ||
// Send queued packets | ||
this.on('connect', () => { | ||
const { queue } = this | ||
packet = entry.packet | ||
debug('deliver :: call _sendPacket for %o', packet) | ||
let send = true | ||
if (packet.messageId && packet.messageId !== 0) { | ||
if (!that.messageIdProvider.register(packet.messageId)) { | ||
send = false | ||
} | ||
} | ||
if (send) { | ||
that._sendPacket( | ||
packet, | ||
function (err) { | ||
if (entry.cb) { | ||
entry.cb(err) | ||
} | ||
deliver() | ||
} | ||
) | ||
} else { | ||
debug('messageId: %d has already used. The message is skipped and removed.', packet.messageId) | ||
deliver() | ||
} | ||
} | ||
const deliver = () => { | ||
const entry = queue.shift() | ||
this.log('deliver :: entry %o', entry) | ||
let packet = null | ||
debug('connect :: sending queued packets') | ||
deliver() | ||
}) | ||
if (!entry) { | ||
this._resubscribe() | ||
return | ||
} | ||
this.on('close', function () { | ||
debug('close :: connected set to `false`') | ||
that.connected = false | ||
packet = entry.packet | ||
this.log('deliver :: call _sendPacket for %o', packet) | ||
let send = true | ||
if (packet.messageId && packet.messageId !== 0) { | ||
if (!this.messageIdProvider.register(packet.messageId)) { | ||
send = false | ||
} | ||
} | ||
if (send) { | ||
this._sendPacket(packet, (err) => { | ||
if (entry.cb) { | ||
entry.cb(err) | ||
} | ||
deliver() | ||
}) | ||
} else { | ||
this.log( | ||
'messageId: %d has already used. The message is skipped and removed.', | ||
packet.messageId, | ||
) | ||
deliver() | ||
} | ||
} | ||
debug('close :: clearing connackTimer') | ||
clearTimeout(that.connackTimer) | ||
this.log('connect :: sending queued packets') | ||
deliver() | ||
}) | ||
debug('close :: clearing ping timer') | ||
if (that.pingTimer !== null) { | ||
that.pingTimer.clear() | ||
that.pingTimer = null | ||
} | ||
this.on('close', () => { | ||
this.log('close :: connected set to `false`') | ||
this.connected = false | ||
if (that.topicAliasRecv) { | ||
that.topicAliasRecv.clear() | ||
} | ||
this.log('close :: clearing connackTimer') | ||
clearTimeout(this.connackTimer) | ||
debug('close :: calling _setupReconnect') | ||
that._setupReconnect() | ||
}) | ||
EventEmitter.call(this) | ||
this.log('close :: clearing ping timer') | ||
if (this.pingTimer !== null) { | ||
this.pingTimer.clear() | ||
this.pingTimer = null | ||
} | ||
debug('MqttClient :: setting up stream') | ||
this._setupStream() | ||
} | ||
inherits(MqttClient, EventEmitter) | ||
if (this.topicAliasRecv) { | ||
this.topicAliasRecv.clear() | ||
} | ||
/** | ||
* setup the event handlers in the inner stream. | ||
* | ||
* @api private | ||
*/ | ||
MqttClient.prototype._setupStream = function () { | ||
const that = this | ||
const writable = new Writable() | ||
const parser = mqttPacket.parser(this.options) | ||
let completeParse = null | ||
const packets = [] | ||
this.log('close :: calling _setupReconnect') | ||
this._setupReconnect() | ||
}) | ||
debug('_setupStream :: calling method to clear reconnect') | ||
this._clearReconnect() | ||
if (!this.options.manualConnect) { | ||
this.log('MqttClient :: setting up stream') | ||
this.connect() | ||
} | ||
} | ||
debug('_setupStream :: using streamBuilder provided to client to create stream') | ||
this.stream = this.streamBuilder(this) | ||
/** | ||
* Setup the event handlers in the inner stream, sends `connect` and `auth` packets | ||
*/ | ||
connect() { | ||
const writable = new Writable() | ||
const parser = mqttPacket.parser(this.options) | ||
let completeParse = null | ||
const packets = [] | ||
parser.on('packet', function (packet) { | ||
debug('parser :: on packet push to packets array.') | ||
packets.push(packet) | ||
}) | ||
this.log('connect :: calling method to clear reconnect') | ||
this._clearReconnect() | ||
function nextTickWork () { | ||
if (packets.length) { | ||
nextTick(work) | ||
} else { | ||
const done = completeParse | ||
completeParse = null | ||
done() | ||
} | ||
} | ||
this.log( | ||
'connect :: using streamBuilder provided to client to create stream', | ||
) | ||
this.stream = this.streamBuilder(this) | ||
function work () { | ||
debug('work :: getting next packet in queue') | ||
const packet = packets.shift() | ||
parser.on('packet', (packet) => { | ||
this.log('parser :: on packet push to packets array.') | ||
packets.push(packet) | ||
}) | ||
if (packet) { | ||
debug('work :: packet pulled from queue') | ||
that._handlePacket(packet, nextTickWork) | ||
} else { | ||
debug('work :: no packets in queue') | ||
const done = completeParse | ||
completeParse = null | ||
debug('work :: done flag is %s', !!(done)) | ||
if (done) done() | ||
} | ||
} | ||
const work = () => { | ||
this.log('work :: getting next packet in queue') | ||
const packet = packets.shift() | ||
writable._write = function (buf, enc, done) { | ||
completeParse = done | ||
debug('writable stream :: parsing buffer') | ||
parser.parse(buf) | ||
work() | ||
} | ||
if (packet) { | ||
this.log('work :: packet pulled from queue') | ||
handlePacket(this, packet, nextTickWork) | ||
} else { | ||
this.log('work :: no packets in queue') | ||
const done = completeParse | ||
completeParse = null | ||
this.log('work :: done flag is %s', !!done) | ||
if (done) done() | ||
} | ||
} | ||
function streamErrorHandler (error) { | ||
debug('streamErrorHandler :: error', error.message) | ||
if (socketErrors.includes(error.code)) { | ||
// handle error | ||
debug('streamErrorHandler :: emitting error') | ||
that.emit('error', error) | ||
} else { | ||
nop(error) | ||
} | ||
} | ||
const nextTickWork = () => { | ||
if (packets.length) { | ||
nextTick(work) | ||
} else { | ||
const done = completeParse | ||
completeParse = null | ||
done() | ||
} | ||
} | ||
debug('_setupStream :: pipe stream to writable stream') | ||
this.stream.pipe(writable) | ||
writable._write = (buf, enc, done) => { | ||
completeParse = done | ||
this.log('writable stream :: parsing buffer') | ||
parser.parse(buf) | ||
work() | ||
} | ||
// Suppress connection errors | ||
this.stream.on('error', streamErrorHandler) | ||
const streamErrorHandler = (error) => { | ||
this.log('streamErrorHandler :: error', error.message) | ||
if (socketErrors.includes(error.code)) { | ||
// handle error | ||
this.log('streamErrorHandler :: emitting error') | ||
this.emit('error', error) | ||
} else { | ||
this.noop(error) | ||
} | ||
} | ||
// Echo stream close | ||
this.stream.on('close', function () { | ||
debug('(%s)stream :: on close', that.options.clientId) | ||
flushVolatile(that.outgoing) | ||
debug('stream: emit close to MqttClient') | ||
that.emit('close') | ||
}) | ||
this.log('connect :: pipe stream to writable stream') | ||
this.stream.pipe(writable) | ||
// Send a connect packet | ||
debug('_setupStream: sending packet `connect`') | ||
const connectPacket = Object.create(this.options) | ||
connectPacket.cmd = 'connect' | ||
if (this.topicAliasRecv) { | ||
if (!connectPacket.properties) { | ||
connectPacket.properties = {} | ||
} | ||
if (this.topicAliasRecv) { | ||
connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max | ||
} | ||
} | ||
// avoid message queue | ||
sendPacket(this, connectPacket) | ||
// Suppress connection errors | ||
this.stream.on('error', streamErrorHandler) | ||
// Echo connection errors | ||
parser.on('error', this.emit.bind(this, 'error')) | ||
// Echo stream close | ||
this.stream.on('close', () => { | ||
this.log('(%s)stream :: on close', this.options.clientId) | ||
this._flushVolatile(this.outgoing) | ||
this.log('stream: emit close to MqttClient') | ||
this.emit('close') | ||
}) | ||
// auth | ||
if (this.options.properties) { | ||
if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) { | ||
that.end(() => | ||
this.emit('error', new Error('Packet has no Authentication Method') | ||
)) | ||
return this | ||
} | ||
if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') { | ||
const authPacket = xtend({ cmd: 'auth', reasonCode: 0 }, this.options.authPacket) | ||
sendPacket(this, authPacket) | ||
} | ||
} | ||
// Send a connect packet | ||
this.log('connect: sending packet `connect`') | ||
const connectPacket = Object.create(this.options) | ||
connectPacket.cmd = 'connect' | ||
if (this.topicAliasRecv) { | ||
if (!connectPacket.properties) { | ||
connectPacket.properties = {} | ||
} | ||
if (this.topicAliasRecv) { | ||
connectPacket.properties.topicAliasMaximum = | ||
this.topicAliasRecv.max | ||
} | ||
} | ||
// avoid message queue | ||
this._writePacket(connectPacket) | ||
// many drain listeners are needed for qos 1 callbacks if the connection is intermittent | ||
this.stream.setMaxListeners(1000) | ||
// Echo connection errors | ||
parser.on('error', this.emit.bind(this, 'error')) | ||
clearTimeout(this.connackTimer) | ||
this.connackTimer = setTimeout(function () { | ||
debug('!!connectTimeout hit!! Calling _cleanUp with force `true`') | ||
that._cleanUp(true) | ||
}, this.options.connectTimeout) | ||
} | ||
// auth | ||
if (this.options.properties) { | ||
if ( | ||
!this.options.properties.authenticationMethod && | ||
this.options.properties.authenticationData | ||
) { | ||
this.end(() => | ||
this.emit( | ||
'error', | ||
new Error('Packet has no Authentication Method'), | ||
), | ||
) | ||
return this | ||
} | ||
if ( | ||
this.options.properties.authenticationMethod && | ||
this.options.authPacket && | ||
typeof this.options.authPacket === 'object' | ||
) { | ||
const authPacket = { | ||
cmd: 'auth', | ||
reasonCode: 0, | ||
...this.options.authPacket, | ||
} | ||
this._writePacket(authPacket) | ||
} | ||
} | ||
MqttClient.prototype._handlePacket = function (packet, done) { | ||
const options = this.options | ||
// many drain listeners are needed for qos 1 callbacks if the connection is intermittent | ||
this.stream.setMaxListeners(1000) | ||
if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) { | ||
this.emit('error', new Error('exceeding packets size ' + packet.cmd)) | ||
this.end({ reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' } }) | ||
return this | ||
} | ||
debug('_handlePacket :: emitting packetreceive') | ||
this.emit('packetreceive', packet) | ||
clearTimeout(this.connackTimer) | ||
this.connackTimer = setTimeout(() => { | ||
this.log( | ||
'!!connectTimeout hit!! Calling _cleanUp with force `true`', | ||
) | ||
this._cleanUp(true) | ||
}, this.options.connectTimeout) | ||
switch (packet.cmd) { | ||
case 'publish': | ||
this._handlePublish(packet, done) | ||
break | ||
case 'puback': | ||
case 'pubrec': | ||
case 'pubcomp': | ||
case 'suback': | ||
case 'unsuback': | ||
this._handleAck(packet) | ||
done() | ||
break | ||
case 'pubrel': | ||
this._handlePubrel(packet, done) | ||
break | ||
case 'connack': | ||
this._handleConnack(packet) | ||
done() | ||
break | ||
case 'auth': | ||
this._handleAuth(packet) | ||
done() | ||
break | ||
case 'pingresp': | ||
this._handlePingresp(packet) | ||
done() | ||
break | ||
case 'disconnect': | ||
this._handleDisconnect(packet) | ||
done() | ||
break | ||
default: | ||
// do nothing | ||
// maybe we should do an error handling | ||
// or just log it | ||
break | ||
} | ||
} | ||
return this | ||
} | ||
MqttClient.prototype._checkDisconnecting = function (callback) { | ||
if (this.disconnecting) { | ||
if (callback && callback !== nop) { | ||
callback(new Error('client disconnecting')) | ||
} else { | ||
this.emit('error', new Error('client disconnecting')) | ||
} | ||
} | ||
return this.disconnecting | ||
} | ||
_flushVolatile(queue) { | ||
if (queue) { | ||
this.log( | ||
'_flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function', | ||
) | ||
Object.keys(queue).forEach((messageId) => { | ||
if ( | ||
queue[messageId].volatile && | ||
typeof queue[messageId].cb === 'function' | ||
) { | ||
queue[messageId].cb(new Error('Connection closed')) | ||
delete queue[messageId] | ||
} | ||
}) | ||
} | ||
} | ||
/** | ||
* publish - publish <message> to <topic> | ||
* | ||
* @param {String} topic - topic to publish to | ||
* @param {String, Buffer} message - message to publish | ||
* @param {Object} [opts] - publish options, includes: | ||
* {Number} qos - qos level to publish on | ||
* {Boolean} retain - whether or not to retain the message | ||
* {Boolean} dup - whether or not mark a message as duplicate | ||
* {Function} cbStorePut - function(){} called when message is put into `outgoingStore` | ||
* @param {Function} [callback] - function(err){} | ||
* called when publish succeeds or fails | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* | ||
* @example client.publish('topic', 'message'); | ||
* @example | ||
* client.publish('topic', 'message', {qos: 1, retain: true, dup: true}); | ||
* @example client.publish('topic', 'message', console.log); | ||
*/ | ||
MqttClient.prototype.publish = function (topic, message, opts, callback) { | ||
debug('publish :: message `%s` to topic `%s`', message, topic) | ||
const options = this.options | ||
_flush(queue) { | ||
if (queue) { | ||
this.log('_flush: queue exists? %b', !!queue) | ||
Object.keys(queue).forEach((messageId) => { | ||
if (typeof queue[messageId].cb === 'function') { | ||
queue[messageId].cb(new Error('Connection closed')) | ||
// This is suspicious. Why do we only delete this if we have a callback? | ||
// If this is by-design, then adding no as callback would cause this to get deleted unintentionally. | ||
delete queue[messageId] | ||
} | ||
}) | ||
} | ||
} | ||
// .publish(topic, payload, cb); | ||
if (typeof opts === 'function') { | ||
callback = opts | ||
opts = null | ||
} | ||
_removeTopicAliasAndRecoverTopicName(packet) { | ||
let alias | ||
if (packet.properties) { | ||
alias = packet.properties.topicAlias | ||
} | ||
// default opts | ||
const defaultOpts = { qos: 0, retain: false, dup: false } | ||
opts = xtend(defaultOpts, opts) | ||
let topic = packet.topic.toString() | ||
if (this._checkDisconnecting(callback)) { | ||
return this | ||
} | ||
this.log( | ||
'_removeTopicAliasAndRecoverTopicName :: alias %d, topic %o', | ||
alias, | ||
topic, | ||
) | ||
const that = this | ||
const publishProc = function () { | ||
let messageId = 0 | ||
if (opts.qos === 1 || opts.qos === 2) { | ||
messageId = that._nextId() | ||
if (messageId === null) { | ||
debug('No messageId left') | ||
return false | ||
} | ||
} | ||
const packet = { | ||
cmd: 'publish', | ||
topic: topic, | ||
payload: message, | ||
qos: opts.qos, | ||
retain: opts.retain, | ||
messageId: messageId, | ||
dup: opts.dup | ||
} | ||
if (topic.length === 0) { | ||
// restore topic from alias | ||
if (typeof alias === 'undefined') { | ||
return new Error('Unregistered Topic Alias') | ||
} | ||
topic = this.topicAliasSend.getTopicByAlias(alias) | ||
if (typeof topic === 'undefined') { | ||
return new Error('Unregistered Topic Alias') | ||
} | ||
packet.topic = topic | ||
} | ||
if (alias) { | ||
delete packet.properties.topicAlias | ||
} | ||
} | ||
if (options.protocolVersion === 5) { | ||
packet.properties = opts.properties | ||
} | ||
_checkDisconnecting(callback) { | ||
if (this.disconnecting) { | ||
if (callback && callback !== this.noop) { | ||
callback(new Error('client disconnecting')) | ||
} else { | ||
this.emit('error', new Error('client disconnecting')) | ||
} | ||
} | ||
return this.disconnecting | ||
} | ||
debug('publish :: qos', opts.qos) | ||
switch (opts.qos) { | ||
case 1: | ||
case 2: | ||
// Add to callbacks | ||
that.outgoing[packet.messageId] = { | ||
volatile: false, | ||
cb: callback || nop | ||
} | ||
debug('MqttClient:publish: packet cmd: %s', packet.cmd) | ||
that._sendPacket(packet, undefined, opts.cbStorePut) | ||
break | ||
default: | ||
debug('MqttClient:publish: packet cmd: %s', packet.cmd) | ||
that._sendPacket(packet, callback, opts.cbStorePut) | ||
break | ||
} | ||
return true | ||
} | ||
/** | ||
* publish - publish <message> to <topic> | ||
* | ||
* @param {String} topic - topic to publish to | ||
* @param {String, Buffer} message - message to publish | ||
* @param {Object} [opts] - publish options, includes: | ||
* {Number} qos - qos level to publish on | ||
* {Boolean} retain - whether or not to retain the message | ||
* {Boolean} dup - whether or not mark a message as duplicate | ||
* {Function} cbStorePut - function(){} called when message is put into `outgoingStore` | ||
* @param {Function} [callback] - function(err){} | ||
* called when publish succeeds or fails | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* | ||
* @example client.publish('topic', 'message'); | ||
* @example | ||
* client.publish('topic', 'message', {qos: 1, retain: true, dup: true}); | ||
* @example client.publish('topic', 'message', console.log); | ||
*/ | ||
publish(topic, message, opts, callback) { | ||
this.log('publish :: message `%s` to topic `%s`', message, topic) | ||
const { options } = this | ||
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !publishProc()) { | ||
this._storeProcessingQueue.push( | ||
{ | ||
invoke: publishProc, | ||
cbStorePut: opts.cbStorePut, | ||
callback: callback | ||
} | ||
) | ||
} | ||
return this | ||
} | ||
// .publish(topic, payload, cb); | ||
if (typeof opts === 'function') { | ||
callback = opts | ||
opts = null | ||
} | ||
/** | ||
* subscribe - subscribe to <topic> | ||
* | ||
* @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos} | ||
* @param {Object} [opts] - optional subscription options, includes: | ||
* {Number} qos - subscribe qos level | ||
* @param {Function} [callback] - function(err, granted){} where: | ||
* {Error} err - subscription error (none at the moment!) | ||
* {Array} granted - array of {topic: 't', qos: 0} | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* @example client.subscribe('topic'); | ||
* @example client.subscribe('topic', {qos: 1}); | ||
* @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log); | ||
* @example client.subscribe('topic', console.log); | ||
*/ | ||
MqttClient.prototype.subscribe = function () { | ||
const that = this | ||
const args = new Array(arguments.length) | ||
for (let i = 0; i < arguments.length; i++) { | ||
args[i] = arguments[i] | ||
} | ||
const subs = [] | ||
let obj = args.shift() | ||
const resubscribe = obj.resubscribe | ||
let callback = args.pop() || nop | ||
let opts = args.pop() | ||
const version = this.options.protocolVersion | ||
// default opts | ||
const defaultOpts = { qos: 0, retain: false, dup: false } | ||
opts = { ...defaultOpts, ...opts } | ||
delete obj.resubscribe | ||
if (this._checkDisconnecting(callback)) { | ||
return this | ||
} | ||
if (typeof obj === 'string') { | ||
obj = [obj] | ||
} | ||
const publishProc = () => { | ||
let messageId = 0 | ||
if (opts.qos === 1 || opts.qos === 2) { | ||
messageId = this._nextId() | ||
if (messageId === null) { | ||
this.log('No messageId left') | ||
return false | ||
} | ||
} | ||
const packet = { | ||
cmd: 'publish', | ||
topic, | ||
payload: message, | ||
qos: opts.qos, | ||
retain: opts.retain, | ||
messageId, | ||
dup: opts.dup, | ||
} | ||
if (typeof callback !== 'function') { | ||
opts = callback | ||
callback = nop | ||
} | ||
if (options.protocolVersion === 5) { | ||
packet.properties = opts.properties | ||
} | ||
const invalidTopic = validations.validateTopics(obj) | ||
if (invalidTopic !== null) { | ||
setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) | ||
return this | ||
} | ||
this.log('publish :: qos', opts.qos) | ||
switch (opts.qos) { | ||
case 1: | ||
case 2: | ||
// Add to callbacks | ||
this.outgoing[packet.messageId] = { | ||
volatile: false, | ||
cb: callback || this.noop, | ||
} | ||
this.log('MqttClient:publish: packet cmd: %s', packet.cmd) | ||
this._sendPacket(packet, undefined, opts.cbStorePut) | ||
break | ||
default: | ||
this.log('MqttClient:publish: packet cmd: %s', packet.cmd) | ||
this._sendPacket(packet, callback, opts.cbStorePut) | ||
break | ||
} | ||
return true | ||
} | ||
if (this._checkDisconnecting(callback)) { | ||
debug('subscribe: discconecting true') | ||
return this | ||
} | ||
if ( | ||
this._storeProcessing || | ||
this._storeProcessingQueue.length > 0 || | ||
!publishProc() | ||
) { | ||
this._storeProcessingQueue.push({ | ||
invoke: publishProc, | ||
cbStorePut: opts.cbStorePut, | ||
callback, | ||
}) | ||
} | ||
return this | ||
} | ||
const defaultOpts = { | ||
qos: 0 | ||
} | ||
if (version === 5) { | ||
defaultOpts.nl = false | ||
defaultOpts.rap = false | ||
defaultOpts.rh = 0 | ||
} | ||
opts = xtend(defaultOpts, opts) | ||
/** | ||
* subscribe - subscribe to <topic> | ||
* | ||
* @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos} | ||
* @param {Object} [opts] - optional subscription options, includes: | ||
* {Number} qos - subscribe qos level | ||
* @param {Function} [callback] - function(err, granted){} where: | ||
* {Error} err - subscription error (none at the moment!) | ||
* {Array} granted - array of {topic: 't', qos: 0} | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* @example client.subscribe('topic'); | ||
* @example client.subscribe('topic', {qos: 1}); | ||
* @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log); | ||
* @example client.subscribe('topic', console.log); | ||
*/ | ||
subscribe(...args) { | ||
const subs = [] | ||
let obj = args.shift() | ||
const { resubscribe } = obj | ||
let callback = args.pop() || this.noop | ||
let opts = args.pop() | ||
const version = this.options.protocolVersion | ||
if (Array.isArray(obj)) { | ||
obj.forEach(function (topic) { | ||
debug('subscribe: array topic %s', topic) | ||
if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, topic) || | ||
that._resubscribeTopics[topic].qos < opts.qos || | ||
resubscribe) { | ||
const currentOpts = { | ||
topic: topic, | ||
qos: opts.qos | ||
} | ||
if (version === 5) { | ||
currentOpts.nl = opts.nl | ||
currentOpts.rap = opts.rap | ||
currentOpts.rh = opts.rh | ||
currentOpts.properties = opts.properties | ||
} | ||
debug('subscribe: pushing topic `%s` and qos `%s` to subs list', currentOpts.topic, currentOpts.qos) | ||
subs.push(currentOpts) | ||
} | ||
}) | ||
} else { | ||
Object | ||
.keys(obj) | ||
.forEach(function (k) { | ||
debug('subscribe: object topic %s', k) | ||
if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, k) || | ||
that._resubscribeTopics[k].qos < obj[k].qos || | ||
resubscribe) { | ||
const currentOpts = { | ||
topic: k, | ||
qos: obj[k].qos | ||
} | ||
if (version === 5) { | ||
currentOpts.nl = obj[k].nl | ||
currentOpts.rap = obj[k].rap | ||
currentOpts.rh = obj[k].rh | ||
currentOpts.properties = opts.properties | ||
} | ||
debug('subscribe: pushing `%s` to subs list', currentOpts) | ||
subs.push(currentOpts) | ||
} | ||
}) | ||
} | ||
delete obj.resubscribe | ||
if (!subs.length) { | ||
callback(null, []) | ||
return this | ||
} | ||
if (typeof obj === 'string') { | ||
obj = [obj] | ||
} | ||
const subscribeProc = function () { | ||
const messageId = that._nextId() | ||
if (messageId === null) { | ||
debug('No messageId left') | ||
return false | ||
} | ||
if (typeof callback !== 'function') { | ||
opts = callback | ||
callback = this.noop | ||
} | ||
const packet = { | ||
cmd: 'subscribe', | ||
subscriptions: subs, | ||
qos: 1, | ||
retain: false, | ||
dup: false, | ||
messageId: messageId | ||
} | ||
const invalidTopic = validations.validateTopics(obj) | ||
if (invalidTopic !== null) { | ||
setImmediate(callback, new Error(`Invalid topic ${invalidTopic}`)) | ||
return this | ||
} | ||
if (opts.properties) { | ||
packet.properties = opts.properties | ||
} | ||
if (this._checkDisconnecting(callback)) { | ||
this.log('subscribe: discconecting true') | ||
return this | ||
} | ||
// subscriptions to resubscribe to in case of disconnect | ||
if (that.options.resubscribe) { | ||
debug('subscribe :: resubscribe true') | ||
const topics = [] | ||
subs.forEach(function (sub) { | ||
if (that.options.reconnectPeriod > 0) { | ||
const topic = { qos: sub.qos } | ||
if (version === 5) { | ||
topic.nl = sub.nl || false | ||
topic.rap = sub.rap || false | ||
topic.rh = sub.rh || 0 | ||
topic.properties = sub.properties | ||
} | ||
that._resubscribeTopics[sub.topic] = topic | ||
topics.push(sub.topic) | ||
} | ||
}) | ||
that.messageIdToTopic[packet.messageId] = topics | ||
} | ||
const defaultOpts = { | ||
qos: 0, | ||
} | ||
if (version === 5) { | ||
defaultOpts.nl = false | ||
defaultOpts.rap = false | ||
defaultOpts.rh = 0 | ||
} | ||
opts = { ...defaultOpts, ...opts } | ||
that.outgoing[packet.messageId] = { | ||
volatile: true, | ||
cb: function (err, packet) { | ||
if (!err) { | ||
const granted = packet.granted | ||
for (let i = 0; i < granted.length; i += 1) { | ||
subs[i].qos = granted[i] | ||
} | ||
} | ||
const parseSub = (topic, subOptions) => { | ||
// subOptions is defined only when providing a subs map, use opts otherwise | ||
subOptions = subOptions || opts | ||
if ( | ||
!Object.prototype.hasOwnProperty.call( | ||
this._resubscribeTopics, | ||
topic, | ||
) || | ||
this._resubscribeTopics[topic].qos < subOptions.qos || | ||
resubscribe | ||
) { | ||
const currentOpts = { | ||
topic, | ||
qos: subOptions.qos, | ||
} | ||
if (version === 5) { | ||
currentOpts.nl = subOptions.nl | ||
currentOpts.rap = subOptions.rap | ||
currentOpts.rh = subOptions.rh | ||
// use opts.properties | ||
currentOpts.properties = opts.properties | ||
} | ||
this.log( | ||
'subscribe: pushing topic `%s` and qos `%s` to subs list', | ||
currentOpts.topic, | ||
currentOpts.qos, | ||
) | ||
subs.push(currentOpts) | ||
} | ||
} | ||
callback(err, subs) | ||
} | ||
} | ||
debug('subscribe :: call _sendPacket') | ||
that._sendPacket(packet) | ||
return true | ||
} | ||
if (Array.isArray(obj)) { | ||
// array of topics | ||
obj.forEach((topic) => { | ||
this.log('subscribe: array topic %s', topic) | ||
parseSub(topic) | ||
}) | ||
} else { | ||
// object topic --> subOptions (no properties) | ||
Object.keys(obj).forEach((topic) => { | ||
this.log('subscribe: object topic %s, %o', topic, obj[topic]) | ||
parseSub(topic, obj[topic]) | ||
}) | ||
} | ||
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !subscribeProc()) { | ||
this._storeProcessingQueue.push( | ||
{ | ||
invoke: subscribeProc, | ||
callback: callback | ||
} | ||
) | ||
} | ||
if (!subs.length) { | ||
callback(null, []) | ||
return this | ||
} | ||
return this | ||
} | ||
const subscribeProc = () => { | ||
const messageId = this._nextId() | ||
if (messageId === null) { | ||
this.log('No messageId left') | ||
return false | ||
} | ||
/** | ||
* unsubscribe - unsubscribe from topic(s) | ||
* | ||
* @param {String, Array} topic - topics to unsubscribe from | ||
* @param {Object} [opts] - optional subscription options, includes: | ||
* {Object} properties - properties of unsubscribe packet | ||
* @param {Function} [callback] - callback fired on unsuback | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* @example client.unsubscribe('topic'); | ||
* @example client.unsubscribe('topic', console.log); | ||
*/ | ||
MqttClient.prototype.unsubscribe = function () { | ||
const that = this | ||
const args = new Array(arguments.length) | ||
for (let i = 0; i < arguments.length; i++) { | ||
args[i] = arguments[i] | ||
} | ||
let topic = args.shift() | ||
let callback = args.pop() || nop | ||
let opts = args.pop() | ||
if (typeof topic === 'string') { | ||
topic = [topic] | ||
} | ||
const packet = { | ||
cmd: 'subscribe', | ||
subscriptions: subs, | ||
qos: 1, | ||
retain: false, | ||
dup: false, | ||
messageId, | ||
} | ||
if (typeof callback !== 'function') { | ||
opts = callback | ||
callback = nop | ||
} | ||
if (opts.properties) { | ||
packet.properties = opts.properties | ||
} | ||
const invalidTopic = validations.validateTopics(topic) | ||
if (invalidTopic !== null) { | ||
setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) | ||
return this | ||
} | ||
// subscriptions to resubscribe to in case of disconnect | ||
if (this.options.resubscribe) { | ||
this.log('subscribe :: resubscribe true') | ||
const topics = [] | ||
subs.forEach((sub) => { | ||
if (this.options.reconnectPeriod > 0) { | ||
const topic = { qos: sub.qos } | ||
if (version === 5) { | ||
topic.nl = sub.nl || false | ||
topic.rap = sub.rap || false | ||
topic.rh = sub.rh || 0 | ||
topic.properties = sub.properties | ||
} | ||
this._resubscribeTopics[sub.topic] = topic | ||
topics.push(sub.topic) | ||
} | ||
}) | ||
this.messageIdToTopic[packet.messageId] = topics | ||
} | ||
if (that._checkDisconnecting(callback)) { | ||
return this | ||
} | ||
this.outgoing[packet.messageId] = { | ||
volatile: true, | ||
cb(err, packet2) { | ||
if (!err) { | ||
const { granted } = packet2 | ||
for (let i = 0; i < granted.length; i += 1) { | ||
subs[i].qos = granted[i] | ||
} | ||
} | ||
const unsubscribeProc = function () { | ||
const messageId = that._nextId() | ||
if (messageId === null) { | ||
debug('No messageId left') | ||
return false | ||
} | ||
const packet = { | ||
cmd: 'unsubscribe', | ||
qos: 1, | ||
messageId: messageId | ||
} | ||
callback(err, subs) | ||
}, | ||
} | ||
this.log('subscribe :: call _sendPacket') | ||
this._sendPacket(packet) | ||
return true | ||
} | ||
if (typeof topic === 'string') { | ||
packet.unsubscriptions = [topic] | ||
} else if (Array.isArray(topic)) { | ||
packet.unsubscriptions = topic | ||
} | ||
if ( | ||
this._storeProcessing || | ||
this._storeProcessingQueue.length > 0 || | ||
!subscribeProc() | ||
) { | ||
this._storeProcessingQueue.push({ | ||
invoke: subscribeProc, | ||
callback, | ||
}) | ||
} | ||
if (that.options.resubscribe) { | ||
packet.unsubscriptions.forEach(function (topic) { | ||
delete that._resubscribeTopics[topic] | ||
}) | ||
} | ||
return this | ||
} | ||
if (typeof opts === 'object' && opts.properties) { | ||
packet.properties = opts.properties | ||
} | ||
/** | ||
* unsubscribe - unsubscribe from topic(s) | ||
* | ||
* @param {String, Array} topic - topics to unsubscribe from | ||
* @param {Object} [opts] - optional subscription options, includes: | ||
* {Object} properties - properties of unsubscribe packet | ||
* @param {Function} [callback] - callback fired on unsuback | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* @example client.unsubscribe('topic'); | ||
* @example client.unsubscribe('topic', console.log); | ||
*/ | ||
unsubscribe(...args) { | ||
let topic = args.shift() | ||
let callback = args.pop() || this.noop | ||
let opts = args.pop() | ||
if (typeof topic === 'string') { | ||
topic = [topic] | ||
} | ||
that.outgoing[packet.messageId] = { | ||
volatile: true, | ||
cb: callback | ||
} | ||
if (typeof callback !== 'function') { | ||
opts = callback | ||
callback = this.noop | ||
} | ||
debug('unsubscribe: call _sendPacket') | ||
that._sendPacket(packet) | ||
const invalidTopic = validations.validateTopics(topic) | ||
if (invalidTopic !== null) { | ||
setImmediate(callback, new Error(`Invalid topic ${invalidTopic}`)) | ||
return this | ||
} | ||
return true | ||
} | ||
if (this._checkDisconnecting(callback)) { | ||
return this | ||
} | ||
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !unsubscribeProc()) { | ||
this._storeProcessingQueue.push( | ||
{ | ||
invoke: unsubscribeProc, | ||
callback: callback | ||
} | ||
) | ||
} | ||
const unsubscribeProc = () => { | ||
const messageId = this._nextId() | ||
if (messageId === null) { | ||
this.log('No messageId left') | ||
return false | ||
} | ||
const packet = { | ||
cmd: 'unsubscribe', | ||
qos: 1, | ||
messageId, | ||
} | ||
return this | ||
} | ||
if (typeof topic === 'string') { | ||
packet.unsubscriptions = [topic] | ||
} else if (Array.isArray(topic)) { | ||
packet.unsubscriptions = topic | ||
} | ||
/** | ||
* end - close connection | ||
* | ||
* @returns {MqttClient} this - for chaining | ||
* @param {Boolean} force - do not wait for all in-flight messages to be acked | ||
* @param {Object} opts - added to the disconnect packet | ||
* @param {Function} cb - called when the client has been closed | ||
* | ||
* @api public | ||
*/ | ||
MqttClient.prototype.end = function (force, opts, cb) { | ||
const that = this | ||
if (this.options.resubscribe) { | ||
packet.unsubscriptions.forEach((topic2) => { | ||
delete this._resubscribeTopics[topic2] | ||
}) | ||
} | ||
debug('end :: (%s)', this.options.clientId) | ||
if (typeof opts === 'object' && opts.properties) { | ||
packet.properties = opts.properties | ||
} | ||
if (force == null || typeof force !== 'boolean') { | ||
cb = opts || nop | ||
opts = force | ||
force = false | ||
if (typeof opts !== 'object') { | ||
cb = opts | ||
opts = null | ||
if (typeof cb !== 'function') { | ||
cb = nop | ||
} | ||
} | ||
} | ||
this.outgoing[packet.messageId] = { | ||
volatile: true, | ||
cb: callback, | ||
} | ||
if (typeof opts !== 'object') { | ||
cb = opts | ||
opts = null | ||
} | ||
this.log('unsubscribe: call _sendPacket') | ||
this._sendPacket(packet) | ||
debug('end :: cb? %s', !!cb) | ||
cb = cb || nop | ||
return true | ||
} | ||
function closeStores () { | ||
debug('end :: closeStores: closing incoming and outgoing stores') | ||
that.disconnected = true | ||
that.incomingStore.close(function (e1) { | ||
that.outgoingStore.close(function (e2) { | ||
debug('end :: closeStores: emitting end') | ||
that.emit('end') | ||
if (cb) { | ||
const err = e1 || e2 | ||
debug('end :: closeStores: invoking callback with args') | ||
cb(err) | ||
} | ||
}) | ||
}) | ||
if (that._deferredReconnect) { | ||
that._deferredReconnect() | ||
} | ||
} | ||
if ( | ||
this._storeProcessing || | ||
this._storeProcessingQueue.length > 0 || | ||
!unsubscribeProc() | ||
) { | ||
this._storeProcessingQueue.push({ | ||
invoke: unsubscribeProc, | ||
callback, | ||
}) | ||
} | ||
function finish () { | ||
// defer closesStores of an I/O cycle, | ||
// just to make sure things are | ||
// ok for websockets | ||
debug('end :: (%s) :: finish :: calling _cleanUp with force %s', that.options.clientId, force) | ||
that._cleanUp(force, () => { | ||
debug('end :: finish :: calling process.nextTick on closeStores') | ||
// const boundProcess = nextTick.bind(null, closeStores) | ||
nextTick(closeStores.bind(that)) | ||
}, opts) | ||
} | ||
return this | ||
} | ||
if (this.disconnecting) { | ||
cb() | ||
return this | ||
} | ||
/** | ||
* end - close connection | ||
* | ||
* @returns {MqttClient} this - for chaining | ||
* @param {Boolean} force - do not wait for all in-flight messages to be acked | ||
* @param {Object} opts - added to the disconnect packet | ||
* @param {Function} cb - called when the client has been closed | ||
* | ||
* @api public | ||
*/ | ||
end(force, opts, cb) { | ||
this.log('end :: (%s)', this.options.clientId) | ||
this._clearReconnect() | ||
if (force == null || typeof force !== 'boolean') { | ||
cb = opts || this.noop | ||
opts = force | ||
force = false | ||
if (typeof opts !== 'object') { | ||
cb = opts | ||
opts = null | ||
if (typeof cb !== 'function') { | ||
cb = this.noop | ||
} | ||
} | ||
} | ||
this.disconnecting = true | ||
if (typeof opts !== 'object') { | ||
cb = opts | ||
opts = null | ||
} | ||
if (!force && Object.keys(this.outgoing).length > 0) { | ||
// wait 10ms, just to be sure we received all of it | ||
debug('end :: (%s) :: calling finish in 10ms once outgoing is empty', that.options.clientId) | ||
this.once('outgoingEmpty', setTimeout.bind(null, finish, 10)) | ||
} else { | ||
debug('end :: (%s) :: immediately calling finish', that.options.clientId) | ||
finish() | ||
} | ||
this.log('end :: cb? %s', !!cb) | ||
cb = cb || this.noop | ||
return this | ||
} | ||
const closeStores = () => { | ||
this.log('end :: closeStores: closing incoming and outgoing stores') | ||
this.disconnected = true | ||
this.incomingStore.close((e1) => { | ||
this.outgoingStore.close((e2) => { | ||
this.log('end :: closeStores: emitting end') | ||
this.emit('end') | ||
if (cb) { | ||
const err = e1 || e2 | ||
this.log( | ||
'end :: closeStores: invoking callback with args', | ||
) | ||
cb(err) | ||
} | ||
}) | ||
}) | ||
if (this._deferredReconnect) { | ||
this._deferredReconnect() | ||
} | ||
} | ||
/** | ||
* removeOutgoingMessage - remove a message in outgoing store | ||
* the outgoing callback will be called withe Error('Message removed') if the message is removed | ||
* | ||
* @param {Number} messageId - messageId to remove message | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* | ||
* @example client.removeOutgoingMessage(client.getLastAllocated()); | ||
*/ | ||
MqttClient.prototype.removeOutgoingMessage = function (messageId) { | ||
const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null | ||
delete this.outgoing[messageId] | ||
this.outgoingStore.del({ messageId: messageId }, function () { | ||
cb(new Error('Message removed')) | ||
}) | ||
return this | ||
} | ||
const finish = () => { | ||
// defer closesStores of an I/O cycle, | ||
// just to make sure things are | ||
// ok for websockets | ||
this.log( | ||
'end :: (%s) :: finish :: calling _cleanUp with force %s', | ||
this.options.clientId, | ||
force, | ||
) | ||
this._cleanUp( | ||
force, | ||
() => { | ||
this.log( | ||
'end :: finish :: calling process.nextTick on closeStores', | ||
) | ||
// const boundProcess = nextTick.bind(null, closeStores) | ||
nextTick(closeStores) | ||
}, | ||
opts, | ||
) | ||
} | ||
/** | ||
* reconnect - connect again using the same options as connect() | ||
* | ||
* @param {Object} [opts] - optional reconnect options, includes: | ||
* {Store} incomingStore - a store for the incoming packets | ||
* {Store} outgoingStore - a store for the outgoing packets | ||
* if opts is not given, current stores are used | ||
* @returns {MqttClient} this - for chaining | ||
* | ||
* @api public | ||
*/ | ||
MqttClient.prototype.reconnect = function (opts) { | ||
debug('client reconnect') | ||
const that = this | ||
const f = function () { | ||
if (opts) { | ||
that.options.incomingStore = opts.incomingStore | ||
that.options.outgoingStore = opts.outgoingStore | ||
} else { | ||
that.options.incomingStore = null | ||
that.options.outgoingStore = null | ||
} | ||
that.incomingStore = that.options.incomingStore || new Store() | ||
that.outgoingStore = that.options.outgoingStore || new Store() | ||
that.disconnecting = false | ||
that.disconnected = false | ||
that._deferredReconnect = null | ||
that._reconnect() | ||
} | ||
if (this.disconnecting) { | ||
cb() | ||
return this | ||
} | ||
if (this.disconnecting && !this.disconnected) { | ||
this._deferredReconnect = f | ||
} else { | ||
f() | ||
} | ||
return this | ||
} | ||
this._clearReconnect() | ||
/** | ||
* _reconnect - implement reconnection | ||
* @api privateish | ||
*/ | ||
MqttClient.prototype._reconnect = function () { | ||
debug('_reconnect: emitting reconnect to client') | ||
this.emit('reconnect') | ||
if (this.connected) { | ||
this.end(() => { this._setupStream() }) | ||
debug('client already connected. disconnecting first.') | ||
} else { | ||
debug('_reconnect: calling _setupStream') | ||
this._setupStream() | ||
} | ||
} | ||
this.disconnecting = true | ||
/** | ||
* _setupReconnect - setup reconnect timer | ||
*/ | ||
MqttClient.prototype._setupReconnect = function () { | ||
const that = this | ||
if (!force && Object.keys(this.outgoing).length > 0) { | ||
// wait 10ms, just to be sure we received all of it | ||
this.log( | ||
'end :: (%s) :: calling finish in 10ms once outgoing is empty', | ||
this.options.clientId, | ||
) | ||
this.once('outgoingEmpty', setTimeout.bind(null, finish, 10)) | ||
} else { | ||
this.log( | ||
'end :: (%s) :: immediately calling finish', | ||
this.options.clientId, | ||
) | ||
finish() | ||
} | ||
if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) { | ||
if (!this.reconnecting) { | ||
debug('_setupReconnect :: emit `offline` state') | ||
this.emit('offline') | ||
debug('_setupReconnect :: set `reconnecting` to `true`') | ||
this.reconnecting = true | ||
} | ||
debug('_setupReconnect :: setting reconnectTimer for %d ms', that.options.reconnectPeriod) | ||
that.reconnectTimer = setInterval(function () { | ||
debug('reconnectTimer :: reconnect triggered!') | ||
that._reconnect() | ||
}, that.options.reconnectPeriod) | ||
} else { | ||
debug('_setupReconnect :: doing nothing...') | ||
} | ||
} | ||
return this | ||
} | ||
/** | ||
* _clearReconnect - clear the reconnect timer | ||
*/ | ||
MqttClient.prototype._clearReconnect = function () { | ||
debug('_clearReconnect : clearing reconnect timer') | ||
if (this.reconnectTimer) { | ||
clearInterval(this.reconnectTimer) | ||
this.reconnectTimer = null | ||
} | ||
} | ||
/** | ||
* removeOutgoingMessage - remove a message in outgoing store | ||
* the outgoing callback will be called withe Error('Message removed') if the message is removed | ||
* | ||
* @param {Number} messageId - messageId to remove message | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* | ||
* @example client.removeOutgoingMessage(client.getLastAllocated()); | ||
*/ | ||
removeOutgoingMessage(messageId) { | ||
if (this.outgoing[messageId]) { | ||
const { cb } = this.outgoing[messageId] | ||
this._removeOutgoingAndStoreMessage(messageId, () => { | ||
cb(new Error('Message removed')) | ||
}) | ||
} | ||
return this | ||
} | ||
/** | ||
* _cleanUp - clean up on connection end | ||
* @api private | ||
*/ | ||
MqttClient.prototype._cleanUp = function (forced, done) { | ||
const opts = arguments[2] | ||
if (done) { | ||
debug('_cleanUp :: done callback provided for on stream close') | ||
this.stream.on('close', done) | ||
} | ||
/** | ||
* reconnect - connect again using the same options as connect() | ||
* | ||
* @param {Object} [opts] - optional reconnect options, includes: | ||
* {Store} incomingStore - a store for the incoming packets | ||
* {Store} outgoingStore - a store for the outgoing packets | ||
* if opts is not given, current stores are used | ||
* @returns {MqttClient} this - for chaining | ||
* | ||
* @api public | ||
*/ | ||
reconnect(opts) { | ||
this.log('client reconnect') | ||
const f = () => { | ||
if (opts) { | ||
this.options.incomingStore = opts.incomingStore | ||
this.options.outgoingStore = opts.outgoingStore | ||
} else { | ||
this.options.incomingStore = null | ||
this.options.outgoingStore = null | ||
} | ||
this.incomingStore = this.options.incomingStore || new Store() | ||
this.outgoingStore = this.options.outgoingStore || new Store() | ||
this.disconnecting = false | ||
this.disconnected = false | ||
this._deferredReconnect = null | ||
this._reconnect() | ||
} | ||
debug('_cleanUp :: forced? %s', forced) | ||
if (forced) { | ||
if ((this.options.reconnectPeriod === 0) && this.options.clean) { | ||
flush(this.outgoing) | ||
} | ||
debug('_cleanUp :: (%s) :: destroying stream', this.options.clientId) | ||
this.stream.destroy() | ||
} else { | ||
const packet = xtend({ cmd: 'disconnect' }, opts) | ||
debug('_cleanUp :: (%s) :: call _sendPacket with disconnect packet', this.options.clientId) | ||
this._sendPacket( | ||
packet, | ||
setImmediate.bind( | ||
null, | ||
this.stream.end.bind(this.stream) | ||
) | ||
) | ||
} | ||
if (this.disconnecting && !this.disconnected) { | ||
this._deferredReconnect = f | ||
} else { | ||
f() | ||
} | ||
return this | ||
} | ||
if (!this.disconnecting) { | ||
debug('_cleanUp :: client not disconnecting. Clearing and resetting reconnect.') | ||
this._clearReconnect() | ||
this._setupReconnect() | ||
} | ||
/** | ||
* _reconnect - implement reconnection | ||
* @api privateish | ||
*/ | ||
_reconnect() { | ||
this.log('_reconnect: emitting reconnect to client') | ||
this.emit('reconnect') | ||
if (this.connected) { | ||
this.end(() => { | ||
this.connect() | ||
}) | ||
this.log('client already connected. disconnecting first.') | ||
} else { | ||
this.log('_reconnect: calling connect') | ||
this.connect() | ||
} | ||
} | ||
if (this.pingTimer !== null) { | ||
debug('_cleanUp :: clearing pingTimer') | ||
this.pingTimer.clear() | ||
this.pingTimer = null | ||
} | ||
/** | ||
* _setupReconnect - setup reconnect timer | ||
*/ | ||
_setupReconnect() { | ||
if ( | ||
!this.disconnecting && | ||
!this.reconnectTimer && | ||
this.options.reconnectPeriod > 0 | ||
) { | ||
if (!this.reconnecting) { | ||
this.log('_setupReconnect :: emit `offline` state') | ||
this.emit('offline') | ||
this.log('_setupReconnect :: set `reconnecting` to `true`') | ||
this.reconnecting = true | ||
} | ||
this.log( | ||
'_setupReconnect :: setting reconnectTimer for %d ms', | ||
this.options.reconnectPeriod, | ||
) | ||
this.reconnectTimer = setInterval(() => { | ||
this.log('reconnectTimer :: reconnect triggered!') | ||
this._reconnect() | ||
}, this.options.reconnectPeriod) | ||
} else { | ||
this.log('_setupReconnect :: doing nothing...') | ||
} | ||
} | ||
if (done && !this.connected) { | ||
debug('_cleanUp :: (%s) :: removing stream `done` callback `close` listener', this.options.clientId) | ||
this.stream.removeListener('close', done) | ||
done() | ||
} | ||
} | ||
/** | ||
* _clearReconnect - clear the reconnect timer | ||
*/ | ||
_clearReconnect() { | ||
this.log('_clearReconnect : clearing reconnect timer') | ||
if (this.reconnectTimer) { | ||
clearInterval(this.reconnectTimer) | ||
this.reconnectTimer = null | ||
} | ||
} | ||
/** | ||
* _sendPacket - send or queue a packet | ||
* @param {Object} packet - packet options | ||
* @param {Function} cb - callback when the packet is sent | ||
* @param {Function} cbStorePut - called when message is put into outgoingStore | ||
* @param {Boolean} noStore - send without put to the store | ||
* @api private | ||
*/ | ||
MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut, noStore) { | ||
debug('_sendPacket :: (%s) :: start', this.options.clientId) | ||
cbStorePut = cbStorePut || nop | ||
cb = cb || nop | ||
/** | ||
* _cleanUp - clean up on connection end | ||
* @api private | ||
*/ | ||
_cleanUp(forced, done, opts = {}) { | ||
if (done) { | ||
this.log('_cleanUp :: done callback provided for on stream close') | ||
this.stream.on('close', done) | ||
} | ||
const err = applyTopicAlias(this, packet) | ||
if (err) { | ||
cb(err) | ||
return | ||
} | ||
this.log('_cleanUp :: forced? %s', forced) | ||
if (forced) { | ||
if (this.options.reconnectPeriod === 0 && this.options.clean) { | ||
this._flush(this.outgoing) | ||
} | ||
this.log( | ||
'_cleanUp :: (%s) :: destroying stream', | ||
this.options.clientId, | ||
) | ||
this.stream.destroy() | ||
} else { | ||
const packet = { cmd: 'disconnect', ...opts } | ||
this.log( | ||
'_cleanUp :: (%s) :: call _sendPacket with disconnect packet', | ||
this.options.clientId, | ||
) | ||
this._sendPacket(packet, () => { | ||
this.log( | ||
'_cleanUp :: (%s) :: destroying stream', | ||
this.options.clientId, | ||
) | ||
setImmediate(() => { | ||
this.stream.end(() => { | ||
this.log( | ||
'_cleanUp :: (%s) :: stream destroyed', | ||
this.options.clientId, | ||
) | ||
// once stream is closed the 'close' event will fire and that will | ||
// emit client `close` event and call `done` callback if done is provided | ||
}) | ||
}) | ||
}) | ||
} | ||
if (!this.connected) { | ||
// allow auth packets to be sent while authenticating with the broker (mqtt5 enhanced auth) | ||
if (packet.cmd === 'auth') { | ||
this._shiftPingInterval() | ||
sendPacket(this, packet, cb) | ||
return | ||
} | ||
if (!this.disconnecting) { | ||
this.log( | ||
'_cleanUp :: client not disconnecting. Clearing and resetting reconnect.', | ||
) | ||
this._clearReconnect() | ||
this._setupReconnect() | ||
} | ||
debug('_sendPacket :: client not connected. Storing packet offline.') | ||
this._storePacket(packet, cb, cbStorePut) | ||
return | ||
} | ||
if (this.pingTimer !== null) { | ||
this.log('_cleanUp :: clearing pingTimer') | ||
this.pingTimer.clear() | ||
this.pingTimer = null | ||
} | ||
// When sending a packet, reschedule the ping timer | ||
this._shiftPingInterval() | ||
if (done && !this.connected) { | ||
this.log( | ||
'_cleanUp :: (%s) :: removing stream `done` callback `close` listener', | ||
this.options.clientId, | ||
) | ||
this.stream.removeListener('close', done) | ||
done() | ||
} | ||
} | ||
// If "noStore" is true, the message is sent without being recorded in the store. | ||
// Messages that have not received puback or pubcomp remain in the store after disconnection | ||
// and are resent from the store upon reconnection. | ||
// For resend upon reconnection, "noStore" is set to true. This is because the message is already stored in the store. | ||
// This is to avoid interrupting other processes while recording to the store. | ||
if (noStore) { | ||
sendPacket(this, packet, cb) | ||
return | ||
} | ||
_storeAndSend(packet, cb, cbStorePut) { | ||
this.log( | ||
'storeAndSend :: store packet with cmd %s to outgoingStore', | ||
packet.cmd, | ||
) | ||
let storePacket = packet | ||
let err | ||
if (storePacket.cmd === 'publish') { | ||
// The original packet is for sending. | ||
// The cloned storePacket is for storing to resend on reconnect. | ||
// Topic Alias must not be used after disconnected. | ||
storePacket = clone(packet) | ||
err = this._removeTopicAliasAndRecoverTopicName(storePacket) | ||
if (err) { | ||
return cb && cb(err) | ||
} | ||
} | ||
this.outgoingStore.put(storePacket, (err2) => { | ||
if (err2) { | ||
return cb && cb(err2) | ||
} | ||
cbStorePut() | ||
this._writePacket(packet, cb) | ||
}) | ||
} | ||
switch (packet.cmd) { | ||
case 'publish': | ||
break | ||
case 'pubrel': | ||
storeAndSend(this, packet, cb, cbStorePut) | ||
return | ||
default: | ||
sendPacket(this, packet, cb) | ||
return | ||
} | ||
_applyTopicAlias(packet) { | ||
if (this.options.protocolVersion === 5) { | ||
if (packet.cmd === 'publish') { | ||
let alias | ||
if (packet.properties) { | ||
alias = packet.properties.topicAlias | ||
} | ||
const topic = packet.topic.toString() | ||
if (this.topicAliasSend) { | ||
if (alias) { | ||
if (topic.length !== 0) { | ||
// register topic alias | ||
this.log( | ||
'applyTopicAlias :: register topic: %s - alias: %d', | ||
topic, | ||
alias, | ||
) | ||
if (!this.topicAliasSend.put(topic, alias)) { | ||
this.log( | ||
'applyTopicAlias :: error out of range. topic: %s - alias: %d', | ||
topic, | ||
alias, | ||
) | ||
return new Error( | ||
'Sending Topic Alias out of range', | ||
) | ||
} | ||
} | ||
} else if (topic.length !== 0) { | ||
if (this.options.autoAssignTopicAlias) { | ||
alias = this.topicAliasSend.getAliasByTopic(topic) | ||
if (alias) { | ||
packet.topic = '' | ||
packet.properties = { | ||
...packet.properties, | ||
topicAlias: alias, | ||
} | ||
this.log( | ||
'applyTopicAlias :: auto assign(use) topic: %s - alias: %d', | ||
topic, | ||
alias, | ||
) | ||
} else { | ||
alias = this.topicAliasSend.getLruAlias() | ||
this.topicAliasSend.put(topic, alias) | ||
packet.properties = { | ||
...packet.properties, | ||
topicAlias: alias, | ||
} | ||
this.log( | ||
'applyTopicAlias :: auto assign topic: %s - alias: %d', | ||
topic, | ||
alias, | ||
) | ||
} | ||
} else if (this.options.autoUseTopicAlias) { | ||
alias = this.topicAliasSend.getAliasByTopic(topic) | ||
if (alias) { | ||
packet.topic = '' | ||
packet.properties = { | ||
...packet.properties, | ||
topicAlias: alias, | ||
} | ||
this.log( | ||
'applyTopicAlias :: auto use topic: %s - alias: %d', | ||
topic, | ||
alias, | ||
) | ||
} | ||
} | ||
} | ||
} else if (alias) { | ||
this.log( | ||
'applyTopicAlias :: error out of range. topic: %s - alias: %d', | ||
topic, | ||
alias, | ||
) | ||
return new Error('Sending Topic Alias out of range') | ||
} | ||
} | ||
} | ||
} | ||
switch (packet.qos) { | ||
case 2: | ||
case 1: | ||
storeAndSend(this, packet, cb, cbStorePut) | ||
break | ||
/** | ||
* no need of case here since it will be caught by default | ||
* and jshint comply that before default it must be a break | ||
* anyway it will result in -1 evaluation | ||
*/ | ||
case 0: | ||
/* falls through */ | ||
default: | ||
sendPacket(this, packet, cb) | ||
break | ||
} | ||
debug('_sendPacket :: (%s) :: end', this.options.clientId) | ||
} | ||
_noop(err) { | ||
this.log('noop ::', err) | ||
} | ||
/** | ||
* _storePacket - queue a packet | ||
* @param {Object} packet - packet options | ||
* @param {Function} cb - callback when the packet is sent | ||
* @param {Function} cbStorePut - called when message is put into outgoingStore | ||
* @api private | ||
*/ | ||
MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) { | ||
debug('_storePacket :: packet: %o', packet) | ||
debug('_storePacket :: cb? %s', !!cb) | ||
cbStorePut = cbStorePut || nop | ||
/** Writes the packet to stream and emits events */ | ||
_writePacket(packet, cb) { | ||
this.log('_writePacket :: packet: %O', packet) | ||
this.log('_writePacket :: emitting `packetsend`') | ||
let storePacket = packet | ||
if (storePacket.cmd === 'publish') { | ||
// The original packet is for sending. | ||
// The cloned storePacket is for storing to resend on reconnect. | ||
// Topic Alias must not be used after disconnected. | ||
storePacket = clone(packet) | ||
const err = removeTopicAliasAndRecoverTopicName(this, storePacket) | ||
if (err) { | ||
return cb && cb(err) | ||
} | ||
} | ||
// check that the packet is not a qos of 0, or that the command is not a publish | ||
if (((storePacket.qos || 0) === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') { | ||
this.queue.push({ packet: storePacket, cb: cb }) | ||
} else if (storePacket.qos > 0) { | ||
cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null | ||
this.outgoingStore.put(storePacket, function (err) { | ||
if (err) { | ||
return cb && cb(err) | ||
} | ||
cbStorePut() | ||
}) | ||
} else if (cb) { | ||
cb(new Error('No connection to broker')) | ||
} | ||
} | ||
this.emit('packetsend', packet) | ||
/** | ||
* _setupPingTimer - setup the ping timer | ||
* | ||
* @api private | ||
*/ | ||
MqttClient.prototype._setupPingTimer = function () { | ||
debug('_setupPingTimer :: keepalive %d (seconds)', this.options.keepalive) | ||
const that = this | ||
// When writing a packet, reschedule the ping timer | ||
this._shiftPingInterval() | ||
if (!this.pingTimer && this.options.keepalive) { | ||
this.pingResp = true | ||
this.pingTimer = reInterval(function () { | ||
that._checkPing() | ||
}, this.options.keepalive * 1000) | ||
} | ||
} | ||
this.log('_writePacket :: writing to stream') | ||
const result = mqttPacket.writeToStream( | ||
packet, | ||
this.stream, | ||
this.options, | ||
) | ||
this.log('_writePacket :: writeToStream result %s', result) | ||
if (!result && cb && cb !== this.noop) { | ||
this.log( | ||
'_writePacket :: handle events on `drain` once through callback.', | ||
) | ||
this.stream.once('drain', cb) | ||
} else if (cb) { | ||
this.log('_writePacket :: invoking cb') | ||
cb() | ||
} | ||
} | ||
/** | ||
* _shiftPingInterval - reschedule the ping interval | ||
* | ||
* @api private | ||
*/ | ||
MqttClient.prototype._shiftPingInterval = function () { | ||
if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) { | ||
this.pingTimer.reschedule(this.options.keepalive * 1000) | ||
} | ||
} | ||
/** | ||
* _checkPing - check if a pingresp has come back, and ping the server again | ||
* | ||
* @api private | ||
*/ | ||
MqttClient.prototype._checkPing = function () { | ||
debug('_checkPing :: checking ping...') | ||
if (this.pingResp) { | ||
debug('_checkPing :: ping response received. Clearing flag and sending `pingreq`') | ||
this.pingResp = false | ||
this._sendPacket({ cmd: 'pingreq' }) | ||
} else { | ||
// do a forced cleanup since socket will be in bad shape | ||
debug('_checkPing :: calling _cleanUp with force true') | ||
this._cleanUp(true) | ||
} | ||
} | ||
/** | ||
* _sendPacket - send or queue a packet | ||
* @param {Object} packet - packet options | ||
* @param {Function} cb - callback when the packet is sent | ||
* @param {Function} cbStorePut - called when message is put into outgoingStore | ||
* @param {Boolean} noStore - send without put to the store | ||
* @api private | ||
*/ | ||
_sendPacket(packet, cb, cbStorePut, noStore) { | ||
this.log('_sendPacket :: (%s) :: start', this.options.clientId) | ||
cbStorePut = cbStorePut || this.noop | ||
cb = cb || this.noop | ||
/** | ||
* _handlePingresp - handle a pingresp | ||
* | ||
* @api private | ||
*/ | ||
MqttClient.prototype._handlePingresp = function () { | ||
this.pingResp = true | ||
} | ||
const err = this._applyTopicAlias(packet) | ||
if (err) { | ||
cb(err) | ||
return | ||
} | ||
/** | ||
* _handleConnack | ||
* | ||
* @param {Object} packet | ||
* @api private | ||
*/ | ||
MqttClient.prototype._handleConnack = function (packet) { | ||
debug('_handleConnack') | ||
const options = this.options | ||
const version = options.protocolVersion | ||
const rc = version === 5 ? packet.reasonCode : packet.returnCode | ||
if (!this.connected) { | ||
// allow auth packets to be sent while authenticating with the broker (mqtt5 enhanced auth) | ||
if (packet.cmd === 'auth') { | ||
this._writePacket(this, packet, cb) | ||
return | ||
} | ||
clearTimeout(this.connackTimer) | ||
delete this.topicAliasSend | ||
this.log( | ||
'_sendPacket :: client not connected. Storing packet offline.', | ||
) | ||
this._storePacket(packet, cb, cbStorePut) | ||
return | ||
} | ||
if (packet.properties) { | ||
if (packet.properties.topicAliasMaximum) { | ||
if (packet.properties.topicAliasMaximum > 0xffff) { | ||
this.emit('error', new Error('topicAliasMaximum from broker is out of range')) | ||
return | ||
} | ||
if (packet.properties.topicAliasMaximum > 0) { | ||
this.topicAliasSend = new TopicAliasSend(packet.properties.topicAliasMaximum) | ||
} | ||
} | ||
if (packet.properties.serverKeepAlive && options.keepalive) { | ||
options.keepalive = packet.properties.serverKeepAlive | ||
this._shiftPingInterval() | ||
} | ||
if (packet.properties.maximumPacketSize) { | ||
if (!options.properties) { options.properties = {} } | ||
options.properties.maximumPacketSize = packet.properties.maximumPacketSize | ||
} | ||
} | ||
// If "noStore" is true, the message is sent without being recorded in the store. | ||
// Messages that have not received puback or pubcomp remain in the store after disconnection | ||
// and are resent from the store upon reconnection. | ||
// For resend upon reconnection, "noStore" is set to true. This is because the message is already stored in the store. | ||
// This is to avoid interrupting other processes while recording to the store. | ||
if (noStore) { | ||
this._writePacket(packet, cb) | ||
return | ||
} | ||
if (rc === 0) { | ||
this.reconnecting = false | ||
this._onConnect(packet) | ||
} else if (rc > 0) { | ||
const err = new Error('Connection refused: ' + errors[rc]) | ||
err.code = rc | ||
this.emit('error', err) | ||
} | ||
} | ||
switch (packet.cmd) { | ||
case 'publish': | ||
break | ||
case 'pubrel': | ||
this._storeAndSend(packet, cb, cbStorePut) | ||
return | ||
default: | ||
this._writePacket(packet, cb) | ||
return | ||
} | ||
MqttClient.prototype._handleAuth = function (packet) { | ||
const options = this.options | ||
const version = options.protocolVersion | ||
const rc = version === 5 ? packet.reasonCode : packet.returnCode | ||
switch (packet.qos) { | ||
case 2: | ||
case 1: | ||
this._storeAndSend(packet, cb, cbStorePut) | ||
break | ||
/** | ||
* no need of case here since it will be caught by default | ||
* and jshint comply that before default it must be a break | ||
* anyway it will result in -1 evaluation | ||
*/ | ||
case 0: | ||
/* falls through */ | ||
default: | ||
this._writePacket(packet, cb) | ||
break | ||
} | ||
this.log('_sendPacket :: (%s) :: end', this.options.clientId) | ||
} | ||
if (version !== 5) { | ||
const err = new Error('Protocol error: Auth packets are only supported in MQTT 5. Your version:' + version) | ||
err.code = rc | ||
this.emit('error', err) | ||
return | ||
} | ||
/** | ||
* _storePacket - queue a packet | ||
* @param {Object} packet - packet options | ||
* @param {Function} cb - callback when the packet is sent | ||
* @param {Function} cbStorePut - called when message is put into outgoingStore | ||
* @api private | ||
*/ | ||
_storePacket(packet, cb, cbStorePut) { | ||
this.log('_storePacket :: packet: %o', packet) | ||
this.log('_storePacket :: cb? %s', !!cb) | ||
cbStorePut = cbStorePut || this.noop | ||
const that = this | ||
this.handleAuth(packet, function (err, packet) { | ||
if (err) { | ||
that.emit('error', err) | ||
return | ||
} | ||
let storePacket = packet | ||
if (storePacket.cmd === 'publish') { | ||
// The original packet is for sending. | ||
// The cloned storePacket is for storing to resend on reconnect. | ||
// Topic Alias must not be used after disconnected. | ||
storePacket = clone(packet) | ||
const err = this._removeTopicAliasAndRecoverTopicName(storePacket) | ||
if (err) { | ||
return cb && cb(err) | ||
} | ||
} | ||
// check that the packet is not a qos of 0, or that the command is not a publish | ||
if ( | ||
((storePacket.qos || 0) === 0 && this.queueQoSZero) || | ||
storePacket.cmd !== 'publish' | ||
) { | ||
this.queue.push({ packet: storePacket, cb }) | ||
} else if (storePacket.qos > 0) { | ||
cb = this.outgoing[storePacket.messageId] | ||
? this.outgoing[storePacket.messageId].cb | ||
: null | ||
this.outgoingStore.put(storePacket, (err) => { | ||
if (err) { | ||
return cb && cb(err) | ||
} | ||
cbStorePut() | ||
}) | ||
} else if (cb) { | ||
cb(new Error('No connection to broker')) | ||
} | ||
} | ||
if (rc === 24) { | ||
that.reconnecting = false | ||
that._sendPacket(packet) | ||
} else { | ||
const error = new Error('Connection refused: ' + errors[rc]) | ||
err.code = rc | ||
that.emit('error', error) | ||
} | ||
}) | ||
} | ||
/** | ||
* _setupPingTimer - setup the ping timer | ||
* | ||
* @api private | ||
*/ | ||
_setupPingTimer() { | ||
this.log( | ||
'_setupPingTimer :: keepalive %d (seconds)', | ||
this.options.keepalive, | ||
) | ||
/** | ||
* @param packet the packet received by the broker | ||
* @return the auth packet to be returned to the broker | ||
* @api public | ||
*/ | ||
MqttClient.prototype.handleAuth = function (packet, callback) { | ||
callback() | ||
} | ||
if (!this.pingTimer && this.options.keepalive) { | ||
this.pingResp = true | ||
this.pingTimer = reInterval(() => { | ||
this._checkPing() | ||
}, this.options.keepalive * 1000) | ||
} | ||
} | ||
/** | ||
* _handlePublish | ||
* | ||
* @param {Object} packet | ||
* @api private | ||
*/ | ||
/* | ||
those late 2 case should be rewrite to comply with coding style: | ||
/** | ||
* _shiftPingInterval - reschedule the ping interval | ||
* | ||
* @api private | ||
*/ | ||
_shiftPingInterval() { | ||
if ( | ||
this.pingTimer && | ||
this.options.keepalive && | ||
this.options.reschedulePings | ||
) { | ||
this.pingTimer.reschedule(this.options.keepalive * 1000) | ||
} | ||
} | ||
case 1: | ||
case 0: | ||
// do not wait sending a puback | ||
// no callback passed | ||
if (1 === qos) { | ||
this._sendPacket({ | ||
cmd: 'puback', | ||
messageId: messageId | ||
}); | ||
} | ||
// emit the message event for both qos 1 and 0 | ||
this.emit('message', topic, message, packet); | ||
this.handleMessage(packet, done); | ||
break; | ||
default: | ||
// do nothing but every switch mus have a default | ||
// log or throw an error about unknown qos | ||
break; | ||
/** | ||
* _checkPing - check if a pingresp has come back, and ping the server again | ||
* | ||
* @api private | ||
*/ | ||
_checkPing() { | ||
this.log('_checkPing :: checking ping...') | ||
if (this.pingResp) { | ||
this.log( | ||
'_checkPing :: ping response received. Clearing flag and sending `pingreq`', | ||
) | ||
this.pingResp = false | ||
this._sendPacket({ cmd: 'pingreq' }) | ||
} else { | ||
// do a forced cleanup since socket will be in bad shape | ||
this.log('_checkPing :: calling _cleanUp with force true') | ||
this._cleanUp(true) | ||
} | ||
} | ||
for now i just suppressed the warnings | ||
*/ | ||
MqttClient.prototype._handlePublish = function (packet, done) { | ||
debug('_handlePublish: packet %o', packet) | ||
done = typeof done !== 'undefined' ? done : nop | ||
let topic = packet.topic.toString() | ||
const message = packet.payload | ||
const qos = packet.qos | ||
const messageId = packet.messageId | ||
const that = this | ||
const options = this.options | ||
const validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153] | ||
if (this.options.protocolVersion === 5) { | ||
let alias | ||
if (packet.properties) { | ||
alias = packet.properties.topicAlias | ||
} | ||
if (typeof alias !== 'undefined') { | ||
if (topic.length === 0) { | ||
if (alias > 0 && alias <= 0xffff) { | ||
const gotTopic = this.topicAliasRecv.getTopicByAlias(alias) | ||
if (gotTopic) { | ||
topic = gotTopic | ||
debug('_handlePublish :: topic complemented by alias. topic: %s - alias: %d', topic, alias) | ||
} else { | ||
debug('_handlePublish :: unregistered topic alias. alias: %d', alias) | ||
this.emit('error', new Error('Received unregistered Topic Alias')) | ||
return | ||
} | ||
} else { | ||
debug('_handlePublish :: topic alias out of range. alias: %d', alias) | ||
this.emit('error', new Error('Received Topic Alias is out of range')) | ||
return | ||
} | ||
} else { | ||
if (this.topicAliasRecv.put(topic, alias)) { | ||
debug('_handlePublish :: registered topic: %s - alias: %d', topic, alias) | ||
} else { | ||
debug('_handlePublish :: topic alias out of range. alias: %d', alias) | ||
this.emit('error', new Error('Received Topic Alias is out of range')) | ||
return | ||
} | ||
} | ||
} | ||
} | ||
debug('_handlePublish: qos %d', qos) | ||
switch (qos) { | ||
case 2: { | ||
options.customHandleAcks(topic, message, packet, function (error, code) { | ||
if (!(error instanceof Error)) { | ||
code = error | ||
error = null | ||
} | ||
if (error) { return that.emit('error', error) } | ||
if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) } | ||
if (code) { | ||
that._sendPacket({ cmd: 'pubrec', messageId: messageId, reasonCode: code }, done) | ||
} else { | ||
that.incomingStore.put(packet, function () { | ||
that._sendPacket({ cmd: 'pubrec', messageId: messageId }, done) | ||
}) | ||
} | ||
}) | ||
break | ||
} | ||
case 1: { | ||
// emit the message event | ||
options.customHandleAcks(topic, message, packet, function (error, code) { | ||
if (!(error instanceof Error)) { | ||
code = error | ||
error = null | ||
} | ||
if (error) { return that.emit('error', error) } | ||
if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) } | ||
if (!code) { that.emit('message', topic, message, packet) } | ||
that.handleMessage(packet, function (err) { | ||
if (err) { | ||
return done && done(err) | ||
} | ||
that._sendPacket({ cmd: 'puback', messageId: messageId, reasonCode: code }, done) | ||
}) | ||
}) | ||
break | ||
} | ||
case 0: | ||
// emit the message event | ||
this.emit('message', topic, message, packet) | ||
this.handleMessage(packet, done) | ||
break | ||
default: | ||
// do nothing | ||
debug('_handlePublish: unknown QoS. Doing nothing.') | ||
// log or throw an error about unknown qos | ||
break | ||
} | ||
} | ||
/** | ||
* @param packet the packet received by the broker | ||
* @return the auth packet to be returned to the broker | ||
* @api public | ||
*/ | ||
handleAuth(packet, callback) { | ||
callback() | ||
} | ||
/** | ||
* Handle messages with backpressure support, one at a time. | ||
* Override at will. | ||
* | ||
* @param Packet packet the packet | ||
* @param Function callback call when finished | ||
* @api public | ||
*/ | ||
MqttClient.prototype.handleMessage = function (packet, callback) { | ||
callback() | ||
} | ||
/** | ||
* Handle messages with backpressure support, one at a time. | ||
* Override at will. | ||
* | ||
* @param Packet packet the packet | ||
* @param Function callback call when finished | ||
* @api public | ||
*/ | ||
handleMessage(packet, callback) { | ||
callback() | ||
} | ||
/** | ||
* _handleAck | ||
* | ||
* @param {Object} packet | ||
* @api private | ||
*/ | ||
/** | ||
* _nextId | ||
* @return unsigned int | ||
*/ | ||
_nextId() { | ||
return this.messageIdProvider.allocate() | ||
} | ||
MqttClient.prototype._handleAck = function (packet) { | ||
/* eslint no-fallthrough: "off" */ | ||
const messageId = packet.messageId | ||
const type = packet.cmd | ||
let response = null | ||
const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null | ||
const that = this | ||
let err | ||
/** | ||
* getLastMessageId | ||
* @return unsigned int | ||
*/ | ||
getLastMessageId() { | ||
return this.messageIdProvider.getLastAllocated() | ||
} | ||
// Checking `!cb` happens to work, but it's not technically "correct". | ||
// | ||
// Why? This code assumes that "no callback" is the same as that "we're not | ||
// waiting for responses" (puback, pubrec, pubcomp, suback, or unsuback). | ||
// | ||
// It would be better to check `if (!this.outgoing[messageId])` here, but | ||
// there's no reason to change it and risk (another) regression. | ||
// | ||
// The only reason this code works is becaues code in MqttClient.publish, | ||
// MqttClinet.subscribe, and MqttClient.unsubscribe ensures that we will | ||
// have a callback even if the user doesn't pass one in.) | ||
if (!cb) { | ||
debug('_handleAck :: Server sent an ack in error. Ignoring.') | ||
// Server sent an ack in error, ignore it. | ||
return | ||
} | ||
/** | ||
* _resubscribe | ||
* @api private | ||
*/ | ||
_resubscribe() { | ||
this.log('_resubscribe') | ||
const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics) | ||
if ( | ||
!this._firstConnection && | ||
(this.options.clean || | ||
(this.options.protocolVersion === 5 && | ||
!this.connackPacket.sessionPresent)) && | ||
_resubscribeTopicsKeys.length > 0 | ||
) { | ||
if (this.options.resubscribe) { | ||
if (this.options.protocolVersion === 5) { | ||
this.log('_resubscribe: protocolVersion 5') | ||
for ( | ||
let topicI = 0; | ||
topicI < _resubscribeTopicsKeys.length; | ||
topicI++ | ||
) { | ||
const resubscribeTopic = {} | ||
resubscribeTopic[_resubscribeTopicsKeys[topicI]] = | ||
this._resubscribeTopics[ | ||
_resubscribeTopicsKeys[topicI] | ||
] | ||
resubscribeTopic.resubscribe = true | ||
this.subscribe(resubscribeTopic, { | ||
properties: | ||
resubscribeTopic[_resubscribeTopicsKeys[topicI]] | ||
.properties, | ||
}) | ||
} | ||
} else { | ||
this._resubscribeTopics.resubscribe = true | ||
this.subscribe(this._resubscribeTopics) | ||
} | ||
} else { | ||
this._resubscribeTopics = {} | ||
} | ||
} | ||
// Process | ||
debug('_handleAck :: packet type', type) | ||
switch (type) { | ||
case 'pubcomp': | ||
// same thing as puback for QoS 2 | ||
case 'puback': { | ||
const pubackRC = packet.reasonCode | ||
// Callback - we're done | ||
if (pubackRC && pubackRC > 0 && pubackRC !== 16) { | ||
err = new Error('Publish error: ' + errors[pubackRC]) | ||
err.code = pubackRC | ||
cb(err, packet) | ||
} | ||
delete this.outgoing[messageId] | ||
this.outgoingStore.del(packet, cb) | ||
this.messageIdProvider.deallocate(messageId) | ||
this._invokeStoreProcessingQueue() | ||
break | ||
} | ||
case 'pubrec': { | ||
response = { | ||
cmd: 'pubrel', | ||
qos: 2, | ||
messageId: messageId | ||
} | ||
const pubrecRC = packet.reasonCode | ||
this._firstConnection = false | ||
} | ||
if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { | ||
err = new Error('Publish error: ' + errors[pubrecRC]) | ||
err.code = pubrecRC | ||
cb(err, packet) | ||
} else { | ||
this._sendPacket(response) | ||
} | ||
break | ||
} | ||
case 'suback': { | ||
delete this.outgoing[messageId] | ||
this.messageIdProvider.deallocate(messageId) | ||
for (let grantedI = 0; grantedI < packet.granted.length; grantedI++) { | ||
if ((packet.granted[grantedI] & 0x80) !== 0) { | ||
// suback with Failure status | ||
const topics = this.messageIdToTopic[messageId] | ||
if (topics) { | ||
topics.forEach(function (topic) { | ||
delete that._resubscribeTopics[topic] | ||
}) | ||
} | ||
} | ||
} | ||
delete this.messageIdToTopic[messageId] | ||
this._invokeStoreProcessingQueue() | ||
cb(null, packet) | ||
break | ||
} | ||
case 'unsuback': { | ||
delete this.outgoing[messageId] | ||
this.messageIdProvider.deallocate(messageId) | ||
this._invokeStoreProcessingQueue() | ||
cb(null) | ||
break | ||
} | ||
default: | ||
that.emit('error', new Error('unrecognized packet type')) | ||
} | ||
/** | ||
* _onConnect | ||
* | ||
* @api private | ||
*/ | ||
_onConnect(packet) { | ||
if (this.disconnected) { | ||
this.emit('connect', packet) | ||
return | ||
} | ||
if (this.disconnecting && | ||
Object.keys(this.outgoing).length === 0) { | ||
this.emit('outgoingEmpty') | ||
} | ||
} | ||
this.connackPacket = packet | ||
this.messageIdProvider.clear() | ||
this._setupPingTimer() | ||
/** | ||
* _handlePubrel | ||
* | ||
* @param {Object} packet | ||
* @api private | ||
*/ | ||
MqttClient.prototype._handlePubrel = function (packet, callback) { | ||
debug('handling pubrel packet') | ||
callback = typeof callback !== 'undefined' ? callback : nop | ||
const messageId = packet.messageId | ||
const that = this | ||
this.connected = true | ||
const comp = { cmd: 'pubcomp', messageId: messageId } | ||
const startStreamProcess = () => { | ||
let outStore = this.outgoingStore.createStream() | ||
that.incomingStore.get(packet, function (err, pub) { | ||
if (!err) { | ||
that.emit('message', pub.topic, pub.payload, pub) | ||
that.handleMessage(pub, function (err) { | ||
if (err) { | ||
return callback(err) | ||
} | ||
that.incomingStore.del(pub, nop) | ||
that._sendPacket(comp, callback) | ||
}) | ||
} else { | ||
that._sendPacket(comp, callback) | ||
} | ||
}) | ||
} | ||
const remove = () => { | ||
outStore.destroy() | ||
outStore = null | ||
this._flushStoreProcessingQueue() | ||
clearStoreProcessing() | ||
} | ||
/** | ||
* _handleDisconnect | ||
* | ||
* @param {Object} packet | ||
* @api private | ||
*/ | ||
MqttClient.prototype._handleDisconnect = function (packet) { | ||
this.emit('disconnect', packet) | ||
} | ||
const clearStoreProcessing = () => { | ||
this._storeProcessing = false | ||
this._packetIdsDuringStoreProcessing = {} | ||
} | ||
/** | ||
* _nextId | ||
* @return unsigned int | ||
*/ | ||
MqttClient.prototype._nextId = function () { | ||
return this.messageIdProvider.allocate() | ||
} | ||
this.once('close', remove) | ||
outStore.on('error', (err) => { | ||
clearStoreProcessing() | ||
this._flushStoreProcessingQueue() | ||
this.removeListener('close', remove) | ||
this.emit('error', err) | ||
}) | ||
/** | ||
* getLastMessageId | ||
* @return unsigned int | ||
*/ | ||
MqttClient.prototype.getLastMessageId = function () { | ||
return this.messageIdProvider.getLastAllocated() | ||
} | ||
const storeDeliver = () => { | ||
// edge case, we wrapped this twice | ||
if (!outStore) { | ||
return | ||
} | ||
/** | ||
* _resubscribe | ||
* @api private | ||
*/ | ||
MqttClient.prototype._resubscribe = function () { | ||
debug('_resubscribe') | ||
const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics) | ||
if (!this._firstConnection && | ||
(this.options.clean || (this.options.protocolVersion === 5 && !this.connackPacket.sessionPresent)) && | ||
_resubscribeTopicsKeys.length > 0) { | ||
if (this.options.resubscribe) { | ||
if (this.options.protocolVersion === 5) { | ||
debug('_resubscribe: protocolVersion 5') | ||
for (let topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) { | ||
const resubscribeTopic = {} | ||
resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]] | ||
resubscribeTopic.resubscribe = true | ||
this.subscribe(resubscribeTopic, { properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties }) | ||
} | ||
} else { | ||
this._resubscribeTopics.resubscribe = true | ||
this.subscribe(this._resubscribeTopics) | ||
} | ||
} else { | ||
this._resubscribeTopics = {} | ||
} | ||
} | ||
const packet2 = outStore.read(1) | ||
this._firstConnection = false | ||
} | ||
let cb | ||
/** | ||
* _onConnect | ||
* | ||
* @api private | ||
*/ | ||
MqttClient.prototype._onConnect = function (packet) { | ||
if (this.disconnected) { | ||
this.emit('connect', packet) | ||
return | ||
} | ||
if (!packet2) { | ||
// read when data is available in the future | ||
outStore.once('readable', storeDeliver) | ||
return | ||
} | ||
const that = this | ||
this._storeProcessing = true | ||
this.connackPacket = packet | ||
this.messageIdProvider.clear() | ||
this._setupPingTimer() | ||
// Skip already processed store packets | ||
if (this._packetIdsDuringStoreProcessing[packet2.messageId]) { | ||
storeDeliver() | ||
return | ||
} | ||
this.connected = true | ||
// Avoid unnecessary stream read operations when disconnected | ||
if (!this.disconnecting && !this.reconnectTimer) { | ||
cb = this.outgoing[packet2.messageId] | ||
? this.outgoing[packet2.messageId].cb | ||
: null | ||
this.outgoing[packet2.messageId] = { | ||
volatile: false, | ||
cb(err, status) { | ||
// Ensure that the original callback passed in to publish gets invoked | ||
if (cb) { | ||
cb(err, status) | ||
} | ||
function startStreamProcess () { | ||
let outStore = that.outgoingStore.createStream() | ||
storeDeliver() | ||
}, | ||
} | ||
this._packetIdsDuringStoreProcessing[ | ||
packet2.messageId | ||
] = true | ||
if (this.messageIdProvider.register(packet2.messageId)) { | ||
this._sendPacket(packet2, undefined, undefined, true) | ||
} else { | ||
this.log( | ||
'messageId: %d has already used.', | ||
packet2.messageId, | ||
) | ||
} | ||
} else if (outStore.destroy) { | ||
outStore.destroy() | ||
} | ||
} | ||
function clearStoreProcessing () { | ||
that._storeProcessing = false | ||
that._packetIdsDuringStoreProcessing = {} | ||
} | ||
outStore.on('end', () => { | ||
let allProcessed = true | ||
for (const id in this._packetIdsDuringStoreProcessing) { | ||
if (!this._packetIdsDuringStoreProcessing[id]) { | ||
allProcessed = false | ||
break | ||
} | ||
} | ||
if (allProcessed) { | ||
clearStoreProcessing() | ||
this.removeListener('close', remove) | ||
this._invokeAllStoreProcessingQueue() | ||
this.emit('connect', packet) | ||
} else { | ||
startStreamProcess() | ||
} | ||
}) | ||
storeDeliver() | ||
} | ||
// start flowing | ||
startStreamProcess() | ||
} | ||
that.once('close', remove) | ||
outStore.on('error', function (err) { | ||
clearStoreProcessing() | ||
that._flushStoreProcessingQueue() | ||
that.removeListener('close', remove) | ||
that.emit('error', err) | ||
}) | ||
_invokeStoreProcessingQueue() { | ||
// If _storeProcessing is true, the message is resending. | ||
// During resend, processing is skipped to prevent new messages from interrupting. #1635 | ||
if (!this._storeProcessing && this._storeProcessingQueue.length > 0) { | ||
const f = this._storeProcessingQueue[0] | ||
if (f && f.invoke()) { | ||
this._storeProcessingQueue.shift() | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
function remove () { | ||
outStore.destroy() | ||
outStore = null | ||
that._flushStoreProcessingQueue() | ||
clearStoreProcessing() | ||
} | ||
_invokeAllStoreProcessingQueue() { | ||
while (this._invokeStoreProcessingQueue()) { | ||
/* empty */ | ||
} | ||
} | ||
function storeDeliver () { | ||
// edge case, we wrapped this twice | ||
if (!outStore) { | ||
return | ||
} | ||
_flushStoreProcessingQueue() { | ||
for (const f of this._storeProcessingQueue) { | ||
if (f.cbStorePut) f.cbStorePut(new Error('Connection closed')) | ||
if (f.callback) f.callback(new Error('Connection closed')) | ||
} | ||
this._storeProcessingQueue.splice(0) | ||
} | ||
const packet = outStore.read(1) | ||
let cb | ||
if (!packet) { | ||
// read when data is available in the future | ||
outStore.once('readable', storeDeliver) | ||
return | ||
} | ||
that._storeProcessing = true | ||
// Skip already processed store packets | ||
if (that._packetIdsDuringStoreProcessing[packet.messageId]) { | ||
storeDeliver() | ||
return | ||
} | ||
// Avoid unnecessary stream read operations when disconnected | ||
if (!that.disconnecting && !that.reconnectTimer) { | ||
cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null | ||
that.outgoing[packet.messageId] = { | ||
volatile: false, | ||
cb: function (err, status) { | ||
// Ensure that the original callback passed in to publish gets invoked | ||
if (cb) { | ||
cb(err, status) | ||
} | ||
storeDeliver() | ||
} | ||
} | ||
that._packetIdsDuringStoreProcessing[packet.messageId] = true | ||
if (that.messageIdProvider.register(packet.messageId)) { | ||
that._sendPacket(packet, undefined, undefined, true) | ||
} else { | ||
debug('messageId: %d has already used.', packet.messageId) | ||
} | ||
} else if (outStore.destroy) { | ||
outStore.destroy() | ||
} | ||
} | ||
outStore.on('end', function () { | ||
let allProcessed = true | ||
for (const id in that._packetIdsDuringStoreProcessing) { | ||
if (!that._packetIdsDuringStoreProcessing[id]) { | ||
allProcessed = false | ||
break | ||
} | ||
} | ||
if (allProcessed) { | ||
clearStoreProcessing() | ||
that.removeListener('close', remove) | ||
that._invokeAllStoreProcessingQueue() | ||
that.emit('connect', packet) | ||
} else { | ||
startStreamProcess() | ||
} | ||
}) | ||
storeDeliver() | ||
} | ||
// start flowing | ||
startStreamProcess() | ||
/** | ||
* _removeOutgoingAndStoreMessage | ||
* @param {Number} messageId - messageId to remove message | ||
* @param {Function} cb - called when the message removed | ||
* @api private | ||
*/ | ||
_removeOutgoingAndStoreMessage(messageId, cb) { | ||
const self = this | ||
delete this.outgoing[messageId] | ||
self.outgoingStore.del({ messageId }, (err, packet) => { | ||
cb(err, packet) | ||
self.messageIdProvider.deallocate(messageId) | ||
self._invokeStoreProcessingQueue() | ||
}) | ||
} | ||
} | ||
MqttClient.prototype._invokeStoreProcessingQueue = function () { | ||
if (this._storeProcessingQueue.length > 0) { | ||
const f = this._storeProcessingQueue[0] | ||
if (f && f.invoke()) { | ||
this._storeProcessingQueue.shift() | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
MqttClient.prototype._invokeAllStoreProcessingQueue = function () { | ||
while (this._invokeStoreProcessingQueue()) { /* empty */ } | ||
} | ||
MqttClient.prototype._flushStoreProcessingQueue = function () { | ||
for (const f of this._storeProcessingQueue) { | ||
if (f.cbStorePut) f.cbStorePut(new Error('Connection closed')) | ||
if (f.callback) f.callback(new Error('Connection closed')) | ||
} | ||
this._storeProcessingQueue.splice(0) | ||
} | ||
module.exports = MqttClient |
@@ -1,8 +0,5 @@ | ||
'use strict' | ||
const { Buffer } = require('buffer') | ||
const Transform = require('readable-stream').Transform | ||
const { Transform } = require('readable-stream') | ||
const duplexify = require('duplexify') | ||
/* global FileReader */ | ||
let my | ||
@@ -13,118 +10,118 @@ let proxy | ||
function buildProxy () { | ||
const proxy = new Transform() | ||
proxy._write = function (chunk, encoding, next) { | ||
my.sendSocketMessage({ | ||
data: chunk.buffer, | ||
success: function () { | ||
next() | ||
}, | ||
fail: function () { | ||
next(new Error()) | ||
} | ||
}) | ||
} | ||
proxy._flush = function socketEnd (done) { | ||
my.closeSocket({ | ||
success: function () { | ||
done() | ||
} | ||
}) | ||
} | ||
function buildProxy() { | ||
const _proxy = new Transform() | ||
_proxy._write = (chunk, encoding, next) => { | ||
my.sendSocketMessage({ | ||
data: chunk.buffer, | ||
success() { | ||
next() | ||
}, | ||
fail() { | ||
next(new Error()) | ||
}, | ||
}) | ||
} | ||
_proxy._flush = (done) => { | ||
my.closeSocket({ | ||
success() { | ||
done() | ||
}, | ||
}) | ||
} | ||
return proxy | ||
return _proxy | ||
} | ||
function setDefaultOpts (opts) { | ||
if (!opts.hostname) { | ||
opts.hostname = 'localhost' | ||
} | ||
if (!opts.path) { | ||
opts.path = '/' | ||
} | ||
function setDefaultOpts(opts) { | ||
if (!opts.hostname) { | ||
opts.hostname = 'localhost' | ||
} | ||
if (!opts.path) { | ||
opts.path = '/' | ||
} | ||
if (!opts.wsOptions) { | ||
opts.wsOptions = {} | ||
} | ||
if (!opts.wsOptions) { | ||
opts.wsOptions = {} | ||
} | ||
} | ||
function buildUrl (opts, client) { | ||
const protocol = opts.protocol === 'alis' ? 'wss' : 'ws' | ||
let url = protocol + '://' + opts.hostname + opts.path | ||
if (opts.port && opts.port !== 80 && opts.port !== 443) { | ||
url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path | ||
} | ||
if (typeof (opts.transformWsUrl) === 'function') { | ||
url = opts.transformWsUrl(url, opts, client) | ||
} | ||
return url | ||
function buildUrl(opts, client) { | ||
const protocol = opts.protocol === 'alis' ? 'wss' : 'ws' | ||
let url = `${protocol}://${opts.hostname}${opts.path}` | ||
if (opts.port && opts.port !== 80 && opts.port !== 443) { | ||
url = `${protocol}://${opts.hostname}:${opts.port}${opts.path}` | ||
} | ||
if (typeof opts.transformWsUrl === 'function') { | ||
url = opts.transformWsUrl(url, opts, client) | ||
} | ||
return url | ||
} | ||
function bindEventHandler () { | ||
if (isInitialized) return | ||
function bindEventHandler() { | ||
if (isInitialized) return | ||
isInitialized = true | ||
isInitialized = true | ||
my.onSocketOpen(function () { | ||
stream.setReadable(proxy) | ||
stream.setWritable(proxy) | ||
stream.emit('connect') | ||
}) | ||
my.onSocketOpen(() => { | ||
stream.setReadable(proxy) | ||
stream.setWritable(proxy) | ||
stream.emit('connect') | ||
}) | ||
my.onSocketMessage(function (res) { | ||
if (typeof res.data === 'string') { | ||
const buffer = Buffer.from(res.data, 'base64') | ||
proxy.push(buffer) | ||
} else { | ||
const reader = new FileReader() | ||
reader.addEventListener('load', function () { | ||
let data = reader.result | ||
my.onSocketMessage((res) => { | ||
if (typeof res.data === 'string') { | ||
const buffer = Buffer.from(res.data, 'base64') | ||
proxy.push(buffer) | ||
} else { | ||
const reader = new FileReader() | ||
reader.addEventListener('load', () => { | ||
let data = reader.result | ||
if (data instanceof ArrayBuffer) data = Buffer.from(data) | ||
else data = Buffer.from(data, 'utf8') | ||
proxy.push(data) | ||
}) | ||
reader.readAsArrayBuffer(res.data) | ||
} | ||
}) | ||
if (data instanceof ArrayBuffer) data = Buffer.from(data) | ||
else data = Buffer.from(data, 'utf8') | ||
proxy.push(data) | ||
}) | ||
reader.readAsArrayBuffer(res.data) | ||
} | ||
}) | ||
my.onSocketClose(function () { | ||
stream.end() | ||
stream.destroy() | ||
}) | ||
my.onSocketClose(() => { | ||
stream.end() | ||
stream.destroy() | ||
}) | ||
my.onSocketError(function (res) { | ||
stream.destroy(res) | ||
}) | ||
my.onSocketError((res) => { | ||
stream.destroy(res) | ||
}) | ||
} | ||
function buildStream (client, opts) { | ||
opts.hostname = opts.hostname || opts.host | ||
function buildStream(client, opts) { | ||
opts.hostname = opts.hostname || opts.host | ||
if (!opts.hostname) { | ||
throw new Error('Could not determine host. Specify host manually.') | ||
} | ||
if (!opts.hostname) { | ||
throw new Error('Could not determine host. Specify host manually.') | ||
} | ||
const websocketSubProtocol = | ||
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
const websocketSubProtocol = | ||
opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3 | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
setDefaultOpts(opts) | ||
setDefaultOpts(opts) | ||
const url = buildUrl(opts, client) | ||
my = opts.my | ||
my.connectSocket({ | ||
url: url, | ||
protocols: websocketSubProtocol | ||
}) | ||
const url = buildUrl(opts, client) | ||
my = opts.my | ||
my.connectSocket({ | ||
url, | ||
protocols: websocketSubProtocol, | ||
}) | ||
proxy = buildProxy() | ||
stream = duplexify.obj() | ||
proxy = buildProxy() | ||
stream = duplexify.obj() | ||
bindEventHandler() | ||
bindEventHandler() | ||
return stream | ||
return stream | ||
} | ||
module.exports = buildStream |
@@ -1,3 +0,2 @@ | ||
'use strict' | ||
const url = require('url') | ||
const MqttClient = require('../client') | ||
@@ -7,5 +6,3 @@ const Store = require('../store') | ||
const UniqueMessageIdProvider = require('../unique-message-id-provider') | ||
const IS_BROWSER = require('../is-browser').IS_BROWSER | ||
const url = require('url') | ||
const xtend = require('xtend') | ||
const { IS_BROWSER } = require('../is-browser') | ||
const debug = require('debug')('mqttjs') | ||
@@ -16,13 +13,13 @@ | ||
if (!IS_BROWSER) { | ||
protocols.mqtt = require('./tcp') | ||
protocols.tcp = require('./tcp') | ||
protocols.ssl = require('./tls') | ||
protocols.tls = require('./tls') | ||
protocols.mqtts = require('./tls') | ||
protocols.mqtt = require('./tcp') | ||
protocols.tcp = require('./tcp') | ||
protocols.ssl = require('./tls') | ||
protocols.tls = require('./tls') | ||
protocols.mqtts = require('./tls') | ||
} else { | ||
protocols.wx = require('./wx') | ||
protocols.wxs = require('./wx') | ||
protocols.wx = require('./wx') | ||
protocols.wxs = require('./wx') | ||
protocols.ali = require('./ali') | ||
protocols.alis = require('./ali') | ||
protocols.ali = require('./ali') | ||
protocols.alis = require('./ali') | ||
} | ||
@@ -38,13 +35,13 @@ | ||
*/ | ||
function parseAuthOptions (opts) { | ||
let matches | ||
if (opts.auth) { | ||
matches = opts.auth.match(/^(.+):(.+)$/) | ||
if (matches) { | ||
opts.username = matches[1] | ||
opts.password = matches[2] | ||
} else { | ||
opts.username = opts.auth | ||
} | ||
} | ||
function parseAuthOptions(opts) { | ||
let matches | ||
if (opts.auth) { | ||
matches = opts.auth.match(/^(.+):(.+)$/) | ||
if (matches) { | ||
opts.username = matches[1] | ||
opts.password = matches[2] | ||
} else { | ||
opts.username = opts.auth | ||
} | ||
} | ||
} | ||
@@ -58,109 +55,118 @@ | ||
*/ | ||
function connect (brokerUrl, opts) { | ||
debug('connecting to an MQTT broker...') | ||
if ((typeof brokerUrl === 'object') && !opts) { | ||
opts = brokerUrl | ||
brokerUrl = null | ||
} | ||
function connect(brokerUrl, opts) { | ||
debug('connecting to an MQTT broker...') | ||
if (typeof brokerUrl === 'object' && !opts) { | ||
opts = brokerUrl | ||
brokerUrl = null | ||
} | ||
opts = opts || {} | ||
opts = opts || {} | ||
if (brokerUrl) { | ||
// eslint-disable-next-line | ||
if (brokerUrl) { | ||
// eslint-disable-next-line | ||
const parsed = url.parse(brokerUrl, true) | ||
if (parsed.port != null) { | ||
parsed.port = Number(parsed.port) | ||
} | ||
if (parsed.port != null) { | ||
parsed.port = Number(parsed.port) | ||
} | ||
opts = xtend(parsed, opts) | ||
opts = { ...parsed, ...opts } | ||
if (opts.protocol === null) { | ||
throw new Error('Missing protocol') | ||
} | ||
if (opts.protocol === null) { | ||
throw new Error('Missing protocol') | ||
} | ||
opts.protocol = opts.protocol.replace(/:$/, '') | ||
} | ||
opts.protocol = opts.protocol.replace(/:$/, '') | ||
} | ||
// merge in the auth options if supplied | ||
parseAuthOptions(opts) | ||
// merge in the auth options if supplied | ||
parseAuthOptions(opts) | ||
// support clientId passed in the query string of the url | ||
if (opts.query && typeof opts.query.clientId === 'string') { | ||
opts.clientId = opts.query.clientId | ||
} | ||
// support clientId passed in the query string of the url | ||
if (opts.query && typeof opts.query.clientId === 'string') { | ||
opts.clientId = opts.query.clientId | ||
} | ||
if (opts.cert && opts.key) { | ||
if (opts.protocol) { | ||
if (['mqtts', 'wss', 'wxs', 'alis'].indexOf(opts.protocol) === -1) { | ||
switch (opts.protocol) { | ||
case 'mqtt': | ||
opts.protocol = 'mqtts' | ||
break | ||
case 'ws': | ||
opts.protocol = 'wss' | ||
break | ||
case 'wx': | ||
opts.protocol = 'wxs' | ||
break | ||
case 'ali': | ||
opts.protocol = 'alis' | ||
break | ||
default: | ||
throw new Error('Unknown protocol for secure connection: "' + opts.protocol + '"!') | ||
} | ||
} | ||
} else { | ||
// A cert and key was provided, however no protocol was specified, so we will throw an error. | ||
throw new Error('Missing secure protocol key') | ||
} | ||
} | ||
if (opts.cert && opts.key) { | ||
if (opts.protocol) { | ||
if (['mqtts', 'wss', 'wxs', 'alis'].indexOf(opts.protocol) === -1) { | ||
switch (opts.protocol) { | ||
case 'mqtt': | ||
opts.protocol = 'mqtts' | ||
break | ||
case 'ws': | ||
opts.protocol = 'wss' | ||
break | ||
case 'wx': | ||
opts.protocol = 'wxs' | ||
break | ||
case 'ali': | ||
opts.protocol = 'alis' | ||
break | ||
default: | ||
throw new Error( | ||
`Unknown protocol for secure connection: "${opts.protocol}"!`, | ||
) | ||
} | ||
} | ||
} else { | ||
// A cert and key was provided, however no protocol was specified, so we will throw an error. | ||
throw new Error('Missing secure protocol key') | ||
} | ||
} | ||
if (!protocols[opts.protocol]) { | ||
const isSecure = ['mqtts', 'wss'].indexOf(opts.protocol) !== -1 | ||
opts.protocol = [ | ||
'mqtt', | ||
'mqtts', | ||
'ws', | ||
'wss', | ||
'wx', | ||
'wxs', | ||
'ali', | ||
'alis' | ||
].filter(function (key, index) { | ||
if (isSecure && index % 2 === 0) { | ||
// Skip insecure protocols when requesting a secure one. | ||
return false | ||
} | ||
return (typeof protocols[key] === 'function') | ||
})[0] | ||
} | ||
if (!protocols[opts.protocol]) { | ||
const isSecure = ['mqtts', 'wss'].indexOf(opts.protocol) !== -1 | ||
opts.protocol = [ | ||
'mqtt', | ||
'mqtts', | ||
'ws', | ||
'wss', | ||
'wx', | ||
'wxs', | ||
'ali', | ||
'alis', | ||
].filter((key, index) => { | ||
if (isSecure && index % 2 === 0) { | ||
// Skip insecure protocols when requesting a secure one. | ||
return false | ||
} | ||
return typeof protocols[key] === 'function' | ||
})[0] | ||
} | ||
if (opts.clean === false && !opts.clientId) { | ||
throw new Error('Missing clientId for unclean clients') | ||
} | ||
if (opts.clean === false && !opts.clientId) { | ||
throw new Error('Missing clientId for unclean clients') | ||
} | ||
if (opts.protocol) { | ||
opts.defaultProtocol = opts.protocol | ||
} | ||
if (opts.protocol) { | ||
opts.defaultProtocol = opts.protocol | ||
} | ||
function wrapper (client) { | ||
if (opts.servers) { | ||
if (!client._reconnectCount || client._reconnectCount === opts.servers.length) { | ||
client._reconnectCount = 0 | ||
} | ||
function wrapper(client) { | ||
if (opts.servers) { | ||
if ( | ||
!client._reconnectCount || | ||
client._reconnectCount === opts.servers.length | ||
) { | ||
client._reconnectCount = 0 | ||
} | ||
opts.host = opts.servers[client._reconnectCount].host | ||
opts.port = opts.servers[client._reconnectCount].port | ||
opts.protocol = (!opts.servers[client._reconnectCount].protocol ? opts.defaultProtocol : opts.servers[client._reconnectCount].protocol) | ||
opts.hostname = opts.host | ||
opts.host = opts.servers[client._reconnectCount].host | ||
opts.port = opts.servers[client._reconnectCount].port | ||
opts.protocol = !opts.servers[client._reconnectCount].protocol | ||
? opts.defaultProtocol | ||
: opts.servers[client._reconnectCount].protocol | ||
opts.hostname = opts.host | ||
client._reconnectCount++ | ||
} | ||
client._reconnectCount++ | ||
} | ||
debug('calling streambuilder for', opts.protocol) | ||
return protocols[opts.protocol](client, opts) | ||
} | ||
const client = new MqttClient(wrapper, opts) | ||
client.on('error', function () { /* Automatically set up client error handling */ }) | ||
return client | ||
debug('calling streambuilder for', opts.protocol) | ||
return protocols[opts.protocol](client, opts) | ||
} | ||
const client = new MqttClient(wrapper, opts) | ||
client.on('error', () => { | ||
/* Automatically set up client error handling */ | ||
}) | ||
return client | ||
} | ||
@@ -167,0 +173,0 @@ |
@@ -1,2 +0,1 @@ | ||
'use strict' | ||
const net = require('net') | ||
@@ -9,13 +8,13 @@ const debug = require('debug')('mqttjs:tcp') | ||
*/ | ||
function streamBuilder (client, opts) { | ||
opts.port = opts.port || 1883 | ||
opts.hostname = opts.hostname || opts.host || 'localhost' | ||
function streamBuilder(client, opts) { | ||
opts.port = opts.port || 1883 | ||
opts.hostname = opts.hostname || opts.host || 'localhost' | ||
const port = opts.port | ||
const host = opts.hostname | ||
const { port } = opts | ||
const host = opts.hostname | ||
debug('port %d and host %s', port, host) | ||
return net.createConnection(port, host) | ||
debug('port %d and host %s', port, host) | ||
return net.createConnection(port, host) | ||
} | ||
module.exports = streamBuilder |
@@ -1,2 +0,1 @@ | ||
'use strict' | ||
const tls = require('tls') | ||
@@ -6,44 +5,49 @@ const net = require('net') | ||
function buildBuilder (mqttClient, opts) { | ||
opts.port = opts.port || 8883 | ||
opts.host = opts.hostname || opts.host || 'localhost' | ||
function buildBuilder(mqttClient, opts) { | ||
opts.port = opts.port || 8883 | ||
opts.host = opts.hostname || opts.host || 'localhost' | ||
if (net.isIP(opts.host) === 0) { | ||
opts.servername = opts.host | ||
} | ||
if (net.isIP(opts.host) === 0) { | ||
opts.servername = opts.host | ||
} | ||
opts.rejectUnauthorized = opts.rejectUnauthorized !== false | ||
opts.rejectUnauthorized = opts.rejectUnauthorized !== false | ||
delete opts.path | ||
delete opts.path | ||
debug('port %d host %s rejectUnauthorized %b', opts.port, opts.host, opts.rejectUnauthorized) | ||
debug( | ||
'port %d host %s rejectUnauthorized %b', | ||
opts.port, | ||
opts.host, | ||
opts.rejectUnauthorized, | ||
) | ||
const connection = tls.connect(opts) | ||
/* eslint no-use-before-define: [2, "nofunc"] */ | ||
connection.on('secureConnect', function () { | ||
if (opts.rejectUnauthorized && !connection.authorized) { | ||
connection.emit('error', new Error('TLS not authorized')) | ||
} else { | ||
connection.removeListener('error', handleTLSerrors) | ||
} | ||
}) | ||
const connection = tls.connect(opts) | ||
/* eslint no-use-before-define: [2, "nofunc"] */ | ||
connection.on('secureConnect', () => { | ||
if (opts.rejectUnauthorized && !connection.authorized) { | ||
connection.emit('error', new Error('TLS not authorized')) | ||
} else { | ||
connection.removeListener('error', handleTLSerrors) | ||
} | ||
}) | ||
function handleTLSerrors (err) { | ||
// How can I get verify this error is a tls error? | ||
if (opts.rejectUnauthorized) { | ||
mqttClient.emit('error', err) | ||
} | ||
function handleTLSerrors(err) { | ||
// How can I get verify this error is a tls error? | ||
if (opts.rejectUnauthorized) { | ||
mqttClient.emit('error', err) | ||
} | ||
// close this connection to match the behaviour of net | ||
// otherwise all we get is an error from the connection | ||
// and close event doesn't fire. This is a work around | ||
// to enable the reconnect code to work the same as with | ||
// net.createConnection | ||
connection.end() | ||
} | ||
// close this connection to match the behaviour of net | ||
// otherwise all we get is an error from the connection | ||
// and close event doesn't fire. This is a work around | ||
// to enable the reconnect code to work the same as with | ||
// net.createConnection | ||
connection.end() | ||
} | ||
connection.on('error', handleTLSerrors) | ||
return connection | ||
connection.on('error', handleTLSerrors) | ||
return connection | ||
} | ||
module.exports = buildBuilder |
@@ -1,3 +0,1 @@ | ||
'use strict' | ||
const { Buffer } = require('buffer') | ||
@@ -7,252 +5,262 @@ const WS = require('ws') | ||
const duplexify = require('duplexify') | ||
const Transform = require('readable-stream').Transform | ||
const IS_BROWSER = require('../is-browser').IS_BROWSER | ||
const { Transform } = require('readable-stream') | ||
const { IS_BROWSER } = require('../is-browser') | ||
const WSS_OPTIONS = [ | ||
'rejectUnauthorized', | ||
'ca', | ||
'cert', | ||
'key', | ||
'pfx', | ||
'passphrase' | ||
'rejectUnauthorized', | ||
'ca', | ||
'cert', | ||
'key', | ||
'pfx', | ||
'passphrase', | ||
] | ||
function buildUrl (opts, client) { | ||
let url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path | ||
if (typeof (opts.transformWsUrl) === 'function') { | ||
url = opts.transformWsUrl(url, opts, client) | ||
} | ||
return url | ||
function buildUrl(opts, client) { | ||
let url = `${opts.protocol}://${opts.hostname}:${opts.port}${opts.path}` | ||
if (typeof opts.transformWsUrl === 'function') { | ||
url = opts.transformWsUrl(url, opts, client) | ||
} | ||
return url | ||
} | ||
function setDefaultOpts (opts) { | ||
const options = opts | ||
if (!opts.hostname) { | ||
options.hostname = 'localhost' | ||
} | ||
if (!opts.port) { | ||
if (opts.protocol === 'wss') { | ||
options.port = 443 | ||
} else { | ||
options.port = 80 | ||
} | ||
} | ||
if (!opts.path) { | ||
options.path = '/' | ||
} | ||
function setDefaultOpts(opts) { | ||
const options = opts | ||
if (!opts.hostname) { | ||
options.hostname = 'localhost' | ||
} | ||
if (!opts.port) { | ||
if (opts.protocol === 'wss') { | ||
options.port = 443 | ||
} else { | ||
options.port = 80 | ||
} | ||
} | ||
if (!opts.path) { | ||
options.path = '/' | ||
} | ||
if (!opts.wsOptions) { | ||
options.wsOptions = {} | ||
} | ||
if (!IS_BROWSER && opts.protocol === 'wss') { | ||
// Add cert/key/ca etc options | ||
WSS_OPTIONS.forEach(function (prop) { | ||
if (Object.prototype.hasOwnProperty.call(opts, prop) && !Object.prototype.hasOwnProperty.call(opts.wsOptions, prop)) { | ||
options.wsOptions[prop] = opts[prop] | ||
} | ||
}) | ||
} | ||
if (!opts.wsOptions) { | ||
options.wsOptions = {} | ||
} | ||
if (!IS_BROWSER && opts.protocol === 'wss') { | ||
// Add cert/key/ca etc options | ||
WSS_OPTIONS.forEach((prop) => { | ||
if ( | ||
Object.prototype.hasOwnProperty.call(opts, prop) && | ||
!Object.prototype.hasOwnProperty.call(opts.wsOptions, prop) | ||
) { | ||
options.wsOptions[prop] = opts[prop] | ||
} | ||
}) | ||
} | ||
return options | ||
return options | ||
} | ||
function setDefaultBrowserOpts (opts) { | ||
const options = setDefaultOpts(opts) | ||
function setDefaultBrowserOpts(opts) { | ||
const options = setDefaultOpts(opts) | ||
if (!options.hostname) { | ||
options.hostname = options.host | ||
} | ||
if (!options.hostname) { | ||
options.hostname = options.host | ||
} | ||
if (!options.hostname) { | ||
// Throwing an error in a Web Worker if no `hostname` is given, because we | ||
// can not determine the `hostname` automatically. If connecting to | ||
// localhost, please supply the `hostname` as an argument. | ||
if (typeof (document) === 'undefined') { | ||
throw new Error('Could not determine host. Specify host manually.') | ||
} | ||
const parsed = new URL(document.URL) | ||
options.hostname = parsed.hostname | ||
if (!options.hostname) { | ||
// Throwing an error in a Web Worker if no `hostname` is given, because we | ||
// can not determine the `hostname` automatically. If connecting to | ||
// localhost, please supply the `hostname` as an argument. | ||
if (typeof document === 'undefined') { | ||
throw new Error('Could not determine host. Specify host manually.') | ||
} | ||
const parsed = new URL(document.URL) | ||
options.hostname = parsed.hostname | ||
if (!options.port) { | ||
options.port = parsed.port | ||
} | ||
} | ||
if (!options.port) { | ||
options.port = parsed.port | ||
} | ||
} | ||
// objectMode should be defined for logic | ||
if (options.objectMode === undefined) { | ||
options.objectMode = !(options.binary === true || options.binary === undefined) | ||
} | ||
// objectMode should be defined for logic | ||
if (options.objectMode === undefined) { | ||
options.objectMode = !( | ||
options.binary === true || options.binary === undefined | ||
) | ||
} | ||
return options | ||
return options | ||
} | ||
function createWebSocket (client, url, opts) { | ||
debug('createWebSocket') | ||
debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion) | ||
const websocketSubProtocol = | ||
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
function createWebSocket(client, url, opts) { | ||
debug('createWebSocket') | ||
debug(`protocol: ${opts.protocolId} ${opts.protocolVersion}`) | ||
const websocketSubProtocol = | ||
opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3 | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol) | ||
const socket = new WS(url, [websocketSubProtocol], opts.wsOptions) | ||
return socket | ||
debug( | ||
`creating new Websocket for url: ${url} and protocol: ${websocketSubProtocol}`, | ||
) | ||
const socket = new WS(url, [websocketSubProtocol], opts.wsOptions) | ||
return socket | ||
} | ||
function createBrowserWebSocket (client, opts) { | ||
const websocketSubProtocol = | ||
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
function createBrowserWebSocket(client, opts) { | ||
const websocketSubProtocol = | ||
opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3 | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
const url = buildUrl(opts, client) | ||
/* global WebSocket */ | ||
const socket = new WebSocket(url, [websocketSubProtocol]) | ||
socket.binaryType = 'arraybuffer' | ||
return socket | ||
const url = buildUrl(opts, client) | ||
const socket = new WebSocket(url, [websocketSubProtocol]) | ||
socket.binaryType = 'arraybuffer' | ||
return socket | ||
} | ||
function streamBuilder (client, opts) { | ||
debug('streamBuilder') | ||
const options = setDefaultOpts(opts) | ||
const url = buildUrl(options, client) | ||
const socket = createWebSocket(client, url, options) | ||
const webSocketStream = WS.createWebSocketStream(socket, options.wsOptions) | ||
webSocketStream.url = url | ||
socket.on('close', () => { webSocketStream.destroy() }) | ||
return webSocketStream | ||
function streamBuilder(client, opts) { | ||
debug('streamBuilder') | ||
const options = setDefaultOpts(opts) | ||
const url = buildUrl(options, client) | ||
const socket = createWebSocket(client, url, options) | ||
const webSocketStream = WS.createWebSocketStream(socket, options.wsOptions) | ||
webSocketStream.url = url | ||
socket.on('close', () => { | ||
webSocketStream.destroy() | ||
}) | ||
return webSocketStream | ||
} | ||
function browserStreamBuilder (client, opts) { | ||
debug('browserStreamBuilder') | ||
let stream | ||
const options = setDefaultBrowserOpts(opts) | ||
// sets the maximum socket buffer size before throttling | ||
const bufferSize = options.browserBufferSize || 1024 * 512 | ||
function browserStreamBuilder(client, opts) { | ||
debug('browserStreamBuilder') | ||
let stream | ||
const options = setDefaultBrowserOpts(opts) | ||
// sets the maximum socket buffer size before throttling | ||
const bufferSize = options.browserBufferSize || 1024 * 512 | ||
const bufferTimeout = opts.browserBufferTimeout || 1000 | ||
const bufferTimeout = opts.browserBufferTimeout || 1000 | ||
const coerceToBuffer = !opts.objectMode | ||
const coerceToBuffer = !opts.objectMode | ||
const socket = createBrowserWebSocket(client, opts) | ||
const socket = createBrowserWebSocket(client, opts) | ||
const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser) | ||
const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser) | ||
if (!opts.objectMode) { | ||
proxy._writev = writev | ||
} | ||
proxy.on('close', () => { socket.close() }) | ||
if (!opts.objectMode) { | ||
proxy._writev = writev | ||
} | ||
proxy.on('close', () => { | ||
socket.close() | ||
}) | ||
const eventListenerSupport = (typeof socket.addEventListener !== 'undefined') | ||
const eventListenerSupport = typeof socket.addEventListener !== 'undefined' | ||
// was already open when passed in | ||
if (socket.readyState === socket.OPEN) { | ||
stream = proxy | ||
} else { | ||
stream = stream = duplexify(undefined, undefined, opts) | ||
if (!opts.objectMode) { | ||
stream._writev = writev | ||
} | ||
// was already open when passed in | ||
if (socket.readyState === socket.OPEN) { | ||
stream = proxy | ||
} else { | ||
stream = duplexify(undefined, undefined, opts) | ||
if (!opts.objectMode) { | ||
stream._writev = writev | ||
} | ||
if (eventListenerSupport) { | ||
socket.addEventListener('open', onopen) | ||
} else { | ||
socket.onopen = onopen | ||
} | ||
} | ||
if (eventListenerSupport) { | ||
socket.addEventListener('open', onOpen) | ||
} else { | ||
socket.onopen = onOpen | ||
} | ||
} | ||
stream.socket = socket | ||
stream.socket = socket | ||
if (eventListenerSupport) { | ||
socket.addEventListener('close', onclose) | ||
socket.addEventListener('error', onerror) | ||
socket.addEventListener('message', onmessage) | ||
} else { | ||
socket.onclose = onclose | ||
socket.onerror = onerror | ||
socket.onmessage = onmessage | ||
} | ||
if (eventListenerSupport) { | ||
socket.addEventListener('close', onclose) | ||
socket.addEventListener('error', onerror) | ||
socket.addEventListener('message', onmessage) | ||
} else { | ||
socket.onclose = onclose | ||
socket.onerror = onerror | ||
socket.onmessage = onmessage | ||
} | ||
// methods for browserStreamBuilder | ||
// methods for browserStreamBuilder | ||
function buildProxy (options, socketWrite, socketEnd) { | ||
const proxy = new Transform({ | ||
objectModeMode: options.objectMode | ||
}) | ||
function buildProxy(pOptions, socketWrite, socketEnd) { | ||
const _proxy = new Transform({ | ||
objectModeMode: pOptions.objectMode, | ||
}) | ||
proxy._write = socketWrite | ||
proxy._flush = socketEnd | ||
_proxy._write = socketWrite | ||
_proxy._flush = socketEnd | ||
return proxy | ||
} | ||
return _proxy | ||
} | ||
function onopen () { | ||
stream.setReadable(proxy) | ||
stream.setWritable(proxy) | ||
stream.emit('connect') | ||
} | ||
function onOpen() { | ||
stream.setReadable(proxy) | ||
stream.setWritable(proxy) | ||
stream.emit('connect') | ||
} | ||
function onclose () { | ||
stream.end() | ||
stream.destroy() | ||
} | ||
function onclose() { | ||
stream.end() | ||
stream.destroy() | ||
} | ||
function onerror (err) { | ||
stream.destroy(err) | ||
} | ||
function onerror(err) { | ||
stream.destroy(err) | ||
} | ||
function onmessage (event) { | ||
let data = event.data | ||
if (data instanceof ArrayBuffer) data = Buffer.from(data) | ||
else data = Buffer.from(data, 'utf8') | ||
proxy.push(data) | ||
} | ||
function onmessage(event) { | ||
let { data } = event | ||
if (data instanceof ArrayBuffer) data = Buffer.from(data) | ||
else data = Buffer.from(data, 'utf8') | ||
proxy.push(data) | ||
} | ||
// this is to be enabled only if objectMode is false | ||
function writev (chunks, cb) { | ||
const buffers = new Array(chunks.length) | ||
for (let i = 0; i < chunks.length; i++) { | ||
if (typeof chunks[i].chunk === 'string') { | ||
buffers[i] = Buffer.from(chunks[i], 'utf8') | ||
} else { | ||
buffers[i] = chunks[i].chunk | ||
} | ||
} | ||
// this is to be enabled only if objectMode is false | ||
function writev(chunks, cb) { | ||
const buffers = new Array(chunks.length) | ||
for (let i = 0; i < chunks.length; i++) { | ||
if (typeof chunks[i].chunk === 'string') { | ||
buffers[i] = Buffer.from(chunks[i], 'utf8') | ||
} else { | ||
buffers[i] = chunks[i].chunk | ||
} | ||
} | ||
this._write(Buffer.concat(buffers), 'binary', cb) | ||
} | ||
this._write(Buffer.concat(buffers), 'binary', cb) | ||
} | ||
function socketWriteBrowser (chunk, enc, next) { | ||
if (socket.bufferedAmount > bufferSize) { | ||
// throttle data until buffered amount is reduced. | ||
setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next) | ||
} | ||
function socketWriteBrowser(chunk, enc, next) { | ||
if (socket.bufferedAmount > bufferSize) { | ||
// throttle data until buffered amount is reduced. | ||
setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next) | ||
} | ||
if (coerceToBuffer && typeof chunk === 'string') { | ||
chunk = Buffer.from(chunk, 'utf8') | ||
} | ||
if (coerceToBuffer && typeof chunk === 'string') { | ||
chunk = Buffer.from(chunk, 'utf8') | ||
} | ||
try { | ||
socket.send(chunk) | ||
} catch (err) { | ||
return next(err) | ||
} | ||
try { | ||
socket.send(chunk) | ||
} catch (err) { | ||
return next(err) | ||
} | ||
next() | ||
} | ||
next() | ||
} | ||
function socketEndBrowser (done) { | ||
socket.close() | ||
done() | ||
} | ||
function socketEndBrowser(done) { | ||
socket.close() | ||
done() | ||
} | ||
// end methods for browserStreamBuilder | ||
// end methods for browserStreamBuilder | ||
return stream | ||
return stream | ||
} | ||
if (IS_BROWSER) { | ||
module.exports = browserStreamBuilder | ||
module.exports = browserStreamBuilder | ||
} else { | ||
module.exports = streamBuilder | ||
module.exports = streamBuilder | ||
} |
@@ -1,133 +0,132 @@ | ||
'use strict' | ||
const { Buffer } = require('buffer') | ||
const Transform = require('readable-stream').Transform | ||
const { Transform } = require('readable-stream') | ||
const duplexify = require('duplexify') | ||
/* global wx */ | ||
let socketTask, proxy, stream | ||
let socketTask | ||
let proxy | ||
let stream | ||
function buildProxy () { | ||
const proxy = new Transform() | ||
proxy._write = function (chunk, encoding, next) { | ||
socketTask.send({ | ||
data: chunk.buffer, | ||
success: function () { | ||
next() | ||
}, | ||
fail: function (errMsg) { | ||
next(new Error(errMsg)) | ||
} | ||
}) | ||
} | ||
proxy._flush = function socketEnd (done) { | ||
socketTask.close({ | ||
success: function () { | ||
done() | ||
} | ||
}) | ||
} | ||
function buildProxy() { | ||
const _proxy = new Transform() | ||
_proxy._write = (chunk, encoding, next) => { | ||
socketTask.send({ | ||
data: chunk.buffer, | ||
success() { | ||
next() | ||
}, | ||
fail(errMsg) { | ||
next(new Error(errMsg)) | ||
}, | ||
}) | ||
} | ||
_proxy._flush = (done) => { | ||
socketTask.close({ | ||
success() { | ||
done() | ||
}, | ||
}) | ||
} | ||
return proxy | ||
return _proxy | ||
} | ||
function setDefaultOpts (opts) { | ||
if (!opts.hostname) { | ||
opts.hostname = 'localhost' | ||
} | ||
if (!opts.path) { | ||
opts.path = '/' | ||
} | ||
function setDefaultOpts(opts) { | ||
if (!opts.hostname) { | ||
opts.hostname = 'localhost' | ||
} | ||
if (!opts.path) { | ||
opts.path = '/' | ||
} | ||
if (!opts.wsOptions) { | ||
opts.wsOptions = {} | ||
} | ||
if (!opts.wsOptions) { | ||
opts.wsOptions = {} | ||
} | ||
} | ||
function buildUrl (opts, client) { | ||
const protocol = opts.protocol === 'wxs' ? 'wss' : 'ws' | ||
let url = protocol + '://' + opts.hostname + opts.path | ||
if (opts.port && opts.port !== 80 && opts.port !== 443) { | ||
url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path | ||
} | ||
if (typeof (opts.transformWsUrl) === 'function') { | ||
url = opts.transformWsUrl(url, opts, client) | ||
} | ||
return url | ||
function buildUrl(opts, client) { | ||
const protocol = opts.protocol === 'wxs' ? 'wss' : 'ws' | ||
let url = `${protocol}://${opts.hostname}${opts.path}` | ||
if (opts.port && opts.port !== 80 && opts.port !== 443) { | ||
url = `${protocol}://${opts.hostname}:${opts.port}${opts.path}` | ||
} | ||
if (typeof opts.transformWsUrl === 'function') { | ||
url = opts.transformWsUrl(url, opts, client) | ||
} | ||
return url | ||
} | ||
function bindEventHandler () { | ||
socketTask.onOpen(function () { | ||
stream.setReadable(proxy) | ||
stream.setWritable(proxy) | ||
stream.emit('connect') | ||
}) | ||
function bindEventHandler() { | ||
socketTask.onOpen(() => { | ||
stream.setReadable(proxy) | ||
stream.setWritable(proxy) | ||
stream.emit('connect') | ||
}) | ||
socketTask.onMessage(function (res) { | ||
let data = res.data | ||
socketTask.onMessage((res) => { | ||
let { data } = res | ||
if (data instanceof ArrayBuffer) data = Buffer.from(data) | ||
else data = Buffer.from(data, 'utf8') | ||
proxy.push(data) | ||
}) | ||
if (data instanceof ArrayBuffer) data = Buffer.from(data) | ||
else data = Buffer.from(data, 'utf8') | ||
proxy.push(data) | ||
}) | ||
socketTask.onClose(function () { | ||
stream.end() | ||
stream.destroy() | ||
}) | ||
socketTask.onClose(() => { | ||
stream.end() | ||
stream.destroy() | ||
}) | ||
socketTask.onError(function (res) { | ||
stream.destroy(new Error(res.errMsg)) | ||
}) | ||
socketTask.onError((res) => { | ||
stream.destroy(new Error(res.errMsg)) | ||
}) | ||
} | ||
function buildStream (client, opts) { | ||
opts.hostname = opts.hostname || opts.host | ||
function buildStream(client, opts) { | ||
opts.hostname = opts.hostname || opts.host | ||
if (!opts.hostname) { | ||
throw new Error('Could not determine host. Specify host manually.') | ||
} | ||
if (!opts.hostname) { | ||
throw new Error('Could not determine host. Specify host manually.') | ||
} | ||
const websocketSubProtocol = | ||
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
const websocketSubProtocol = | ||
opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3 | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
setDefaultOpts(opts) | ||
setDefaultOpts(opts) | ||
const url = buildUrl(opts, client) | ||
socketTask = wx.connectSocket({ | ||
url: url, | ||
protocols: [websocketSubProtocol] | ||
}) | ||
const url = buildUrl(opts, client) | ||
socketTask = wx.connectSocket({ | ||
url, | ||
protocols: [websocketSubProtocol], | ||
}) | ||
proxy = buildProxy() | ||
stream = duplexify.obj() | ||
stream._destroy = function (err, cb) { | ||
socketTask.close({ | ||
success: function () { | ||
cb && cb(err) | ||
} | ||
}) | ||
} | ||
proxy = buildProxy() | ||
stream = duplexify.obj() | ||
stream._destroy = (err, cb) => { | ||
socketTask.close({ | ||
success() { | ||
if (cb) cb(err) | ||
}, | ||
}) | ||
} | ||
const destroyRef = stream.destroy | ||
stream.destroy = function () { | ||
stream.destroy = destroyRef | ||
const destroyRef = stream.destroy | ||
stream.destroy = () => { | ||
stream.destroy = destroyRef | ||
const self = this | ||
setTimeout(function () { | ||
socketTask.close({ | ||
fail: function () { | ||
self._destroy(new Error()) | ||
} | ||
}) | ||
}, 0) | ||
}.bind(stream) | ||
setTimeout(() => { | ||
socketTask.close({ | ||
fail() { | ||
stream._destroy(new Error()) | ||
}, | ||
}) | ||
}, 0) | ||
} | ||
bindEventHandler() | ||
bindEventHandler() | ||
return stream | ||
return stream | ||
} | ||
module.exports = buildStream |
@@ -1,3 +0,1 @@ | ||
'use strict' | ||
/** | ||
@@ -7,64 +5,60 @@ * DefaultMessageAllocator constructor | ||
*/ | ||
function DefaultMessageIdProvider () { | ||
if (!(this instanceof DefaultMessageIdProvider)) { | ||
return new DefaultMessageIdProvider() | ||
} | ||
class DefaultMessageIdProvider { | ||
constructor() { | ||
/** | ||
* MessageIDs starting with 1 | ||
* ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 | ||
*/ | ||
this.nextId = Math.max(1, Math.floor(Math.random() * 65535)) | ||
} | ||
/** | ||
* MessageIDs starting with 1 | ||
* ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 | ||
*/ | ||
this.nextId = Math.max(1, Math.floor(Math.random() * 65535)) | ||
} | ||
/** | ||
* allocate | ||
* | ||
* Get the next messageId. | ||
* @return unsigned int | ||
*/ | ||
allocate() { | ||
// id becomes current state of this.nextId and increments afterwards | ||
const id = this.nextId++ | ||
// Ensure 16 bit unsigned int (max 65535, nextId got one higher) | ||
if (this.nextId === 65536) { | ||
this.nextId = 1 | ||
} | ||
return id | ||
} | ||
/** | ||
* allocate | ||
* | ||
* Get the next messageId. | ||
* @return unsigned int | ||
*/ | ||
DefaultMessageIdProvider.prototype.allocate = function () { | ||
// id becomes current state of this.nextId and increments afterwards | ||
const id = this.nextId++ | ||
// Ensure 16 bit unsigned int (max 65535, nextId got one higher) | ||
if (this.nextId === 65536) { | ||
this.nextId = 1 | ||
} | ||
return id | ||
} | ||
/** | ||
* getLastAllocated | ||
* Get the last allocated messageId. | ||
* @return unsigned int | ||
*/ | ||
getLastAllocated() { | ||
return this.nextId === 1 ? 65535 : this.nextId - 1 | ||
} | ||
/** | ||
* getLastAllocated | ||
* Get the last allocated messageId. | ||
* @return unsigned int | ||
*/ | ||
DefaultMessageIdProvider.prototype.getLastAllocated = function () { | ||
return (this.nextId === 1) ? 65535 : (this.nextId - 1) | ||
} | ||
/** | ||
* register | ||
* Register messageId. If success return true, otherwise return false. | ||
* @param { unsigned int } - messageId to register, | ||
* @return boolean | ||
*/ | ||
register(messageId) { | ||
return true | ||
} | ||
/** | ||
* register | ||
* Register messageId. If success return true, otherwise return false. | ||
* @param { unsigned int } - messageId to register, | ||
* @return boolean | ||
*/ | ||
DefaultMessageIdProvider.prototype.register = function (messageId) { | ||
return true | ||
} | ||
/** | ||
* deallocate | ||
* Deallocate messageId. | ||
* @param { unsigned int } - messageId to deallocate, | ||
*/ | ||
deallocate(messageId) {} | ||
/** | ||
* deallocate | ||
* Deallocate messageId. | ||
* @param { unsigned int } - messageId to deallocate, | ||
*/ | ||
DefaultMessageIdProvider.prototype.deallocate = function (messageId) { | ||
/** | ||
* clear | ||
* Deallocate all messageIds. | ||
*/ | ||
clear() {} | ||
} | ||
/** | ||
* clear | ||
* Deallocate all messageIds. | ||
*/ | ||
DefaultMessageIdProvider.prototype.clear = function () { | ||
} | ||
module.exports = DefaultMessageIdProvider |
const legacyIsBrowser = | ||
(typeof process !== 'undefined' && process.title === 'browser') || | ||
// eslint-disable-next-line camelcase | ||
typeof __webpack_require__ === 'function' | ||
(typeof process !== 'undefined' && process.title === 'browser') || | ||
// eslint-disable-next-line camelcase | ||
typeof __webpack_require__ === 'function' | ||
const isBrowser = | ||
typeof window !== 'undefined' && typeof document !== 'undefined' | ||
typeof window !== 'undefined' && typeof document !== 'undefined' | ||
module.exports = { | ||
IS_BROWSER: isBrowser || legacyIsBrowser | ||
IS_BROWSER: isBrowser || legacyIsBrowser, | ||
} |
181
lib/store.js
@@ -1,12 +0,9 @@ | ||
'use strict' | ||
/** | ||
* Module dependencies | ||
*/ | ||
const xtend = require('xtend') | ||
const { Readable } = require('readable-stream') | ||
const Readable = require('readable-stream').Readable | ||
const streamsOpts = { objectMode: true } | ||
const defaultStoreOptions = { | ||
clean: true | ||
clean: true, | ||
} | ||
@@ -20,110 +17,106 @@ | ||
*/ | ||
function Store (options) { | ||
if (!(this instanceof Store)) { | ||
return new Store(options) | ||
} | ||
class Store { | ||
constructor(options) { | ||
this.options = options || {} | ||
this.options = options || {} | ||
// Defaults | ||
this.options = { ...defaultStoreOptions, ...options } | ||
// Defaults | ||
this.options = xtend(defaultStoreOptions, options) | ||
this._inflights = new Map() | ||
} | ||
this._inflights = new Map() | ||
} | ||
/** | ||
* Adds a packet to the store, a packet is | ||
* anything that has a messageId property. | ||
* | ||
*/ | ||
put(packet, cb) { | ||
this._inflights.set(packet.messageId, packet) | ||
/** | ||
* Adds a packet to the store, a packet is | ||
* anything that has a messageId property. | ||
* | ||
*/ | ||
Store.prototype.put = function (packet, cb) { | ||
this._inflights.set(packet.messageId, packet) | ||
if (cb) { | ||
cb() | ||
} | ||
if (cb) { | ||
cb() | ||
} | ||
return this | ||
} | ||
return this | ||
} | ||
/** | ||
* Creates a stream with all the packets in the store | ||
* | ||
*/ | ||
createStream() { | ||
const stream = new Readable(streamsOpts) | ||
const values = [] | ||
let destroyed = false | ||
let i = 0 | ||
/** | ||
* Creates a stream with all the packets in the store | ||
* | ||
*/ | ||
Store.prototype.createStream = function () { | ||
const stream = new Readable(streamsOpts) | ||
const values = [] | ||
let destroyed = false | ||
let i = 0 | ||
this._inflights.forEach((value, key) => { | ||
values.push(value) | ||
}) | ||
this._inflights.forEach(function (value, key) { | ||
values.push(value) | ||
}) | ||
stream._read = () => { | ||
if (!destroyed && i < values.length) { | ||
stream.push(values[i++]) | ||
} else { | ||
stream.push(null) | ||
} | ||
} | ||
stream._read = function () { | ||
if (!destroyed && i < values.length) { | ||
this.push(values[i++]) | ||
} else { | ||
this.push(null) | ||
} | ||
} | ||
stream.destroy = () => { | ||
if (destroyed) { | ||
return | ||
} | ||
stream.destroy = function () { | ||
if (destroyed) { | ||
return | ||
} | ||
destroyed = true | ||
const self = this | ||
setTimeout(() => { | ||
stream.emit('close') | ||
}, 0) | ||
} | ||
destroyed = true | ||
return stream | ||
} | ||
setTimeout(function () { | ||
self.emit('close') | ||
}, 0) | ||
} | ||
/** | ||
* deletes a packet from the store. | ||
*/ | ||
del(packet, cb) { | ||
packet = this._inflights.get(packet.messageId) | ||
if (packet) { | ||
this._inflights.delete(packet.messageId) | ||
cb(null, packet) | ||
} else if (cb) { | ||
cb(new Error('missing packet')) | ||
} | ||
return stream | ||
} | ||
return this | ||
} | ||
/** | ||
* deletes a packet from the store. | ||
*/ | ||
Store.prototype.del = function (packet, cb) { | ||
packet = this._inflights.get(packet.messageId) | ||
if (packet) { | ||
this._inflights.delete(packet.messageId) | ||
cb(null, packet) | ||
} else if (cb) { | ||
cb(new Error('missing packet')) | ||
} | ||
/** | ||
* get a packet from the store. | ||
*/ | ||
get(packet, cb) { | ||
packet = this._inflights.get(packet.messageId) | ||
if (packet) { | ||
cb(null, packet) | ||
} else if (cb) { | ||
cb(new Error('missing packet')) | ||
} | ||
return this | ||
} | ||
return this | ||
} | ||
/** | ||
* get a packet from the store. | ||
*/ | ||
Store.prototype.get = function (packet, cb) { | ||
packet = this._inflights.get(packet.messageId) | ||
if (packet) { | ||
cb(null, packet) | ||
} else if (cb) { | ||
cb(new Error('missing packet')) | ||
} | ||
return this | ||
/** | ||
* Close the store | ||
*/ | ||
close(cb) { | ||
if (this.options.clean) { | ||
this._inflights = null | ||
} | ||
if (cb) { | ||
cb() | ||
} | ||
} | ||
} | ||
/** | ||
* Close the store | ||
*/ | ||
Store.prototype.close = function (cb) { | ||
if (this.options.clean) { | ||
this._inflights = null | ||
} | ||
if (cb) { | ||
cb() | ||
} | ||
} | ||
module.exports = Store |
@@ -1,3 +0,1 @@ | ||
'use strict' | ||
/** | ||
@@ -8,41 +6,40 @@ * Topic Alias receiving manager | ||
*/ | ||
function TopicAliasRecv (max) { | ||
if (!(this instanceof TopicAliasRecv)) { | ||
return new TopicAliasRecv(max) | ||
} | ||
this.aliasToTopic = {} | ||
this.max = max | ||
} | ||
class TopicAliasRecv { | ||
constructor(max) { | ||
this.aliasToTopic = {} | ||
this.max = max | ||
} | ||
/** | ||
* Insert or update topic - alias entry. | ||
* @param {String} [topic] - topic | ||
* @param {Number} [alias] - topic alias | ||
* @returns {Boolean} - if success return true otherwise false | ||
*/ | ||
TopicAliasRecv.prototype.put = function (topic, alias) { | ||
if (alias === 0 || alias > this.max) { | ||
return false | ||
} | ||
this.aliasToTopic[alias] = topic | ||
this.length = Object.keys(this.aliasToTopic).length | ||
return true | ||
} | ||
/** | ||
* Insert or update topic - alias entry. | ||
* @param {String} [topic] - topic | ||
* @param {Number} [alias] - topic alias | ||
* @returns {Boolean} - if success return true otherwise false | ||
*/ | ||
put(topic, alias) { | ||
if (alias === 0 || alias > this.max) { | ||
return false | ||
} | ||
this.aliasToTopic[alias] = topic | ||
this.length = Object.keys(this.aliasToTopic).length | ||
return true | ||
} | ||
/** | ||
* Get topic by alias | ||
* @param {String} [topic] - topic | ||
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined | ||
*/ | ||
TopicAliasRecv.prototype.getTopicByAlias = function (alias) { | ||
return this.aliasToTopic[alias] | ||
} | ||
/** | ||
* Get topic by alias | ||
* @param {String} [topic] - topic | ||
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined | ||
*/ | ||
getTopicByAlias(alias) { | ||
return this.aliasToTopic[alias] | ||
} | ||
/** | ||
* Clear all entries | ||
*/ | ||
TopicAliasRecv.prototype.clear = function () { | ||
this.aliasToTopic = {} | ||
/** | ||
* Clear all entries | ||
*/ | ||
clear() { | ||
this.aliasToTopic = {} | ||
} | ||
} | ||
module.exports = TopicAliasRecv |
@@ -1,8 +0,6 @@ | ||
'use strict' | ||
/** | ||
* Module dependencies | ||
*/ | ||
const LruMap = require('lru-cache') | ||
const NumberAllocator = require('number-allocator').NumberAllocator | ||
const LRUCache = require('lru-cache') | ||
const { NumberAllocator } = require('number-allocator') | ||
@@ -14,79 +12,78 @@ /** | ||
*/ | ||
function TopicAliasSend (max) { | ||
if (!(this instanceof TopicAliasSend)) { | ||
return new TopicAliasSend(max) | ||
} | ||
class TopicAliasSend { | ||
constructor(max) { | ||
if (max > 0) { | ||
this.aliasToTopic = new LRUCache({ max }) | ||
this.topicToAlias = {} | ||
this.numberAllocator = new NumberAllocator(1, max) | ||
this.max = max | ||
this.length = 0 | ||
} | ||
} | ||
if (max > 0) { | ||
this.aliasToTopic = new LruMap({ max: max }) | ||
this.topicToAlias = {} | ||
this.numberAllocator = new NumberAllocator(1, max) | ||
this.max = max | ||
this.length = 0 | ||
} | ||
} | ||
/** | ||
* Insert or update topic - alias entry. | ||
* @param {String} [topic] - topic | ||
* @param {Number} [alias] - topic alias | ||
* @returns {Boolean} - if success return true otherwise false | ||
*/ | ||
put(topic, alias) { | ||
if (alias === 0 || alias > this.max) { | ||
return false | ||
} | ||
const entry = this.aliasToTopic.get(alias) | ||
if (entry) { | ||
delete this.topicToAlias[entry] | ||
} | ||
this.aliasToTopic.set(alias, topic) | ||
this.topicToAlias[topic] = alias | ||
this.numberAllocator.use(alias) | ||
this.length = this.aliasToTopic.size | ||
return true | ||
} | ||
/** | ||
* Insert or update topic - alias entry. | ||
* @param {String} [topic] - topic | ||
* @param {Number} [alias] - topic alias | ||
* @returns {Boolean} - if success return true otherwise false | ||
*/ | ||
TopicAliasSend.prototype.put = function (topic, alias) { | ||
if (alias === 0 || alias > this.max) { | ||
return false | ||
} | ||
const entry = this.aliasToTopic.get(alias) | ||
if (entry) { | ||
delete this.topicToAlias[entry] | ||
} | ||
this.aliasToTopic.set(alias, topic) | ||
this.topicToAlias[topic] = alias | ||
this.numberAllocator.use(alias) | ||
this.length = this.aliasToTopic.length | ||
return true | ||
} | ||
/** | ||
* Get topic by alias | ||
* @param {Number} [alias] - topic alias | ||
* @returns {String} - if mapped topic exists return topic, otherwise return undefined | ||
*/ | ||
getTopicByAlias(alias) { | ||
return this.aliasToTopic.get(alias) | ||
} | ||
/** | ||
* Get topic by alias | ||
* @param {Number} [alias] - topic alias | ||
* @returns {String} - if mapped topic exists return topic, otherwise return undefined | ||
*/ | ||
TopicAliasSend.prototype.getTopicByAlias = function (alias) { | ||
return this.aliasToTopic.get(alias) | ||
} | ||
/** | ||
* Get topic by alias | ||
* @param {String} [topic] - topic | ||
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined | ||
*/ | ||
getAliasByTopic(topic) { | ||
const alias = this.topicToAlias[topic] | ||
if (typeof alias !== 'undefined') { | ||
this.aliasToTopic.get(alias) // LRU update | ||
} | ||
return alias | ||
} | ||
/** | ||
* Get topic by alias | ||
* @param {String} [topic] - topic | ||
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined | ||
*/ | ||
TopicAliasSend.prototype.getAliasByTopic = function (topic) { | ||
const alias = this.topicToAlias[topic] | ||
if (typeof alias !== 'undefined') { | ||
this.aliasToTopic.get(alias) // LRU update | ||
} | ||
return alias | ||
} | ||
/** | ||
* Clear all entries | ||
*/ | ||
clear() { | ||
this.aliasToTopic.clear() | ||
this.topicToAlias = {} | ||
this.numberAllocator.clear() | ||
this.length = 0 | ||
} | ||
/** | ||
* Clear all entries | ||
*/ | ||
TopicAliasSend.prototype.clear = function () { | ||
this.aliasToTopic.reset() | ||
this.topicToAlias = {} | ||
this.numberAllocator.clear() | ||
this.length = 0 | ||
/** | ||
* Get Least Recently Used (LRU) topic alias | ||
* @returns {Number} - if vacant alias exists then return it, otherwise then return LRU alias | ||
*/ | ||
getLruAlias() { | ||
const alias = this.numberAllocator.firstVacant() | ||
if (alias) return alias | ||
// get last alias (key) from LRU cache | ||
return [...this.aliasToTopic.keys()][this.aliasToTopic.size - 1] | ||
} | ||
} | ||
/** | ||
* Get Least Recently Used (LRU) topic alias | ||
* @returns {Number} - if vacant alias exists then return it, otherwise then return LRU alias | ||
*/ | ||
TopicAliasSend.prototype.getLruAlias = function () { | ||
const alias = this.numberAllocator.firstVacant() | ||
if (alias) return alias | ||
return this.aliasToTopic.keys()[this.aliasToTopic.length - 1] | ||
} | ||
module.exports = TopicAliasSend |
@@ -1,5 +0,3 @@ | ||
'use strict' | ||
const { NumberAllocator } = require('number-allocator') | ||
const NumberAllocator = require('number-allocator').NumberAllocator | ||
/** | ||
@@ -9,58 +7,56 @@ * UniqueMessageAllocator constructor | ||
*/ | ||
function UniqueMessageIdProvider () { | ||
if (!(this instanceof UniqueMessageIdProvider)) { | ||
return new UniqueMessageIdProvider() | ||
} | ||
class UniqueMessageIdProvider { | ||
constructor() { | ||
this.numberAllocator = new NumberAllocator(1, 65535) | ||
} | ||
this.numberAllocator = new NumberAllocator(1, 65535) | ||
} | ||
/** | ||
* allocate | ||
* | ||
* Get the next messageId. | ||
* @return if messageId is fully allocated then return null, | ||
* otherwise return the smallest usable unsigned int messageId. | ||
*/ | ||
allocate() { | ||
this.lastId = this.numberAllocator.alloc() | ||
return this.lastId | ||
} | ||
/** | ||
* allocate | ||
* | ||
* Get the next messageId. | ||
* @return if messageId is fully allocated then return null, | ||
* otherwise return the smallest usable unsigned int messageId. | ||
*/ | ||
UniqueMessageIdProvider.prototype.allocate = function () { | ||
this.lastId = this.numberAllocator.alloc() | ||
return this.lastId | ||
} | ||
/** | ||
* getLastAllocated | ||
* Get the last allocated messageId. | ||
* @return unsigned int | ||
*/ | ||
getLastAllocated() { | ||
return this.lastId | ||
} | ||
/** | ||
* getLastAllocated | ||
* Get the last allocated messageId. | ||
* @return unsigned int | ||
*/ | ||
UniqueMessageIdProvider.prototype.getLastAllocated = function () { | ||
return this.lastId | ||
} | ||
/** | ||
* register | ||
* Register messageId. If success return true, otherwise return false. | ||
* @param { unsigned int } - messageId to register, | ||
* @return boolean | ||
*/ | ||
register(messageId) { | ||
return this.numberAllocator.use(messageId) | ||
} | ||
/** | ||
* register | ||
* Register messageId. If success return true, otherwise return false. | ||
* @param { unsigned int } - messageId to register, | ||
* @return boolean | ||
*/ | ||
UniqueMessageIdProvider.prototype.register = function (messageId) { | ||
return this.numberAllocator.use(messageId) | ||
} | ||
/** | ||
* deallocate | ||
* Deallocate messageId. | ||
* @param { unsigned int } - messageId to deallocate, | ||
*/ | ||
deallocate(messageId) { | ||
this.numberAllocator.free(messageId) | ||
} | ||
/** | ||
* deallocate | ||
* Deallocate messageId. | ||
* @param { unsigned int } - messageId to deallocate, | ||
*/ | ||
UniqueMessageIdProvider.prototype.deallocate = function (messageId) { | ||
this.numberAllocator.free(messageId) | ||
/** | ||
* clear | ||
* Deallocate all messageIds. | ||
*/ | ||
clear() { | ||
this.numberAllocator.clear() | ||
} | ||
} | ||
/** | ||
* clear | ||
* Deallocate all messageIds. | ||
*/ | ||
UniqueMessageIdProvider.prototype.clear = function () { | ||
this.numberAllocator.clear() | ||
} | ||
module.exports = UniqueMessageIdProvider |
@@ -1,3 +0,1 @@ | ||
'use strict' | ||
/** | ||
@@ -12,21 +10,21 @@ * Validate a topic to see if it's valid or not. | ||
*/ | ||
function validateTopic (topic) { | ||
const parts = topic.split('/') | ||
function validateTopic(topic) { | ||
const parts = topic.split('/') | ||
for (let i = 0; i < parts.length; i++) { | ||
if (parts[i] === '+') { | ||
continue | ||
} | ||
for (let i = 0; i < parts.length; i++) { | ||
if (parts[i] === '+') { | ||
continue | ||
} | ||
if (parts[i] === '#') { | ||
// for Rule #2 | ||
return i === parts.length - 1 | ||
} | ||
if (parts[i] === '#') { | ||
// for Rule #2 | ||
return i === parts.length - 1 | ||
} | ||
if (parts[i].indexOf('+') !== -1 || parts[i].indexOf('#') !== -1) { | ||
return false | ||
} | ||
} | ||
if (parts[i].indexOf('+') !== -1 || parts[i].indexOf('#') !== -1) { | ||
return false | ||
} | ||
} | ||
return true | ||
return true | ||
} | ||
@@ -36,19 +34,19 @@ | ||
* Validate an array of topics to see if any of them is valid or not | ||
* @param {Array} topics - Array of topics | ||
* @param {Array} topics - Array of topics | ||
* @returns {String} If the topics is valid, returns null. Otherwise, returns the invalid one | ||
*/ | ||
function validateTopics (topics) { | ||
if (topics.length === 0) { | ||
return 'empty_topic_list' | ||
} | ||
for (let i = 0; i < topics.length; i++) { | ||
if (!validateTopic(topics[i])) { | ||
return topics[i] | ||
} | ||
} | ||
return null | ||
function validateTopics(topics) { | ||
if (topics.length === 0) { | ||
return 'empty_topic_list' | ||
} | ||
for (let i = 0; i < topics.length; i++) { | ||
if (!validateTopic(topics[i])) { | ||
return topics[i] | ||
} | ||
} | ||
return null | ||
} | ||
module.exports = { | ||
validateTopics: validateTopics | ||
validateTopics, | ||
} |
{ | ||
"name": "mqtt", | ||
"description": "A library for the MQTT protocol", | ||
"version": "5.0.0-beta.2", | ||
"version": "5.0.0-beta.3", | ||
"contributors": [ | ||
@@ -26,3 +26,4 @@ "Adam Rudd <adamvrr@gmail.com>", | ||
"scripts": { | ||
"pretest": "standard | snazzy", | ||
"lint": "eslint --ext .js .", | ||
"lint-fix": "eslint --fix --ext .js .", | ||
"typescript-compile-test": "tsc -p test/typescript/tsconfig.json", | ||
@@ -66,3 +67,3 @@ "typescript-compile-execute": "node test/typescript/broker-connect-subscribe-and-publish.js", | ||
"pre-commit": [ | ||
"pretest" | ||
"lint" | ||
], | ||
@@ -93,55 +94,52 @@ "bin": { | ||
"dependencies": { | ||
"commist": "^1.0.0", | ||
"commist": "^3.2.0", | ||
"concat-stream": "^2.0.0", | ||
"debug": "^4.1.1", | ||
"duplexify": "^4.1.1", | ||
"help-me": "^3.0.0", | ||
"inherits": "^2.0.3", | ||
"lru-cache": "^6.0.0", | ||
"minimist": "^1.2.6", | ||
"mqtt-packet": "^8.1.2", | ||
"debug": "^4.3.4", | ||
"duplexify": "^4.1.2", | ||
"help-me": "^4.2.0", | ||
"lru-cache": "^7.18.3", | ||
"minimist": "^1.2.8", | ||
"mqtt-packet": "^8.2.0", | ||
"number-allocator": "^1.0.14", | ||
"pump": "^3.0.0", | ||
"readable-stream": "^4.1.0", | ||
"readable-stream": "^4.4.2", | ||
"reinterval": "^1.1.0", | ||
"rfdc": "^1.3.0", | ||
"split2": "^3.1.0", | ||
"ws": "^7.5.5", | ||
"xtend": "^4.0.2" | ||
"split2": "^4.2.0", | ||
"ws": "^8.13.0" | ||
}, | ||
"devDependencies": { | ||
"@release-it/conventional-changelog": "^5.1.1", | ||
"@types/node": "^12.20.55", | ||
"@types/tape": "^4.13.2", | ||
"@types/ws": "^7.4.7", | ||
"aedes": "^0.46.2", | ||
"@release-it/conventional-changelog": "^6.0.0", | ||
"@types/node": "^20.4.0", | ||
"@types/tape": "^5.6.0", | ||
"@types/ws": "^8.5.5", | ||
"airtap": "^4.0.4", | ||
"airtap-playwright": "^1.0.1", | ||
"browserify": "^17.0.0", | ||
"chai": "^4.2.0", | ||
"chai": "^4.3.7", | ||
"chokidar": "^3.5.3", | ||
"codecov": "^3.0.4", | ||
"codecov": "^3.8.2", | ||
"conventional-changelog-cli": "^3.0.0", | ||
"end-of-stream": "^1.4.1", | ||
"global": "^4.3.2", | ||
"mkdirp": "^0.5.1", | ||
"mocha": "^9.2.0", | ||
"mqtt-connection": "^4.0.0", | ||
"end-of-stream": "^1.4.4", | ||
"eslint": "^8.45.0", | ||
"eslint-config-airbnb-base": "^15.0.0", | ||
"eslint-config-prettier": "^8.8.0", | ||
"eslint-plugin-import": "^2.27.5", | ||
"eslint-plugin-prettier": "^5.0.0", | ||
"global": "^4.4.0", | ||
"mkdirp": "^3.0.1", | ||
"mocha": "^10.2.0", | ||
"mqtt-connection": "^4.1.0", | ||
"mqtt-level-store": "^3.1.0", | ||
"nyc": "^15.0.1", | ||
"nyc": "^15.1.0", | ||
"pre-commit": "^1.2.2", | ||
"prettier": "^3.0.0", | ||
"release-it": "^15.11.0", | ||
"rimraf": "^3.0.2", | ||
"should": "^13.2.1", | ||
"sinon": "^9.0.0", | ||
"rimraf": "^5.0.1", | ||
"should": "^13.2.3", | ||
"sinon": "^15.2.0", | ||
"snazzy": "^9.0.0", | ||
"standard": "^16.0.4", | ||
"tape": "^5.5.2", | ||
"terser": "^5.14.2", | ||
"typescript": "^4.5.5" | ||
}, | ||
"standard": { | ||
"env": [ | ||
"mocha" | ||
] | ||
"tape": "^5.6.4", | ||
"terser": "^5.18.2", | ||
"typescript": "^5.1.6" | ||
} | ||
} |
@@ -5,6 +5,11 @@ #  | ||
[](http://standardjs.com/) [](https://github.com/mqttjs/MQTT.js/graphs/commit-activity) | ||
[](https://github.com/mqttjs/MQTT.js/pulls)\ | ||
  | ||
MQTT.js is a client library for the [MQTT](http://mqtt.org/) protocol, written | ||
in JavaScript for node.js and the browser. | ||
> MQTT [5.0.0-beta.0](https://github.com/mqttjs/MQTT.js/releases/tag/v5.0.0-beta.0) is now available! Try it out and give us feedback! `npm i mqtt@5.0.0-beta.0` | ||
> MQTT [5.0.0 BETA](https://www.npmjs.com/package/mqtt/v/beta) is now available! Try it out and give us [feedback](https://github.com/mqttjs/MQTT.js/issues/1639): `npm i mqtt@beta` | ||
@@ -100,4 +105,4 @@ ## Table of Contents | ||
client.on("connect", function () { | ||
client.subscribe("presence", function (err) { | ||
client.on("connect", () => { | ||
client.subscribe("presence", (err) => { | ||
if (!err) { | ||
@@ -109,3 +114,3 @@ client.publish("presence", "Hello mqtt"); | ||
client.on("message", function (topic, message) { | ||
client.on("message", (topic, message) => { | ||
// message is Buffer | ||
@@ -306,19 +311,20 @@ console.log(message.toString()); | ||
- <a href="#connect"><code>mqtt.<b>connect()</b></code></a> | ||
- <a href="#client"><code>mqtt.<b>Client()</b></code></a> | ||
- <a href="#publish"><code>mqtt.Client#<b>publish()</b></code></a> | ||
- <a href="#subscribe"><code>mqtt.Client#<b>subscribe()</b></code></a> | ||
- <a href="#unsubscribe"><code>mqtt.Client#<b>unsubscribe()</b></code></a> | ||
- <a href="#end"><code>mqtt.Client#<b>end()</b></code></a> | ||
- <a href="#removeOutgoingMessage"><code>mqtt.Client#<b>removeOutgoingMessage()</b></code></a> | ||
- <a href="#reconnect"><code>mqtt.Client#<b>reconnect()</b></code></a> | ||
- <a href="#handleMessage"><code>mqtt.Client#<b>handleMessage()</b></code></a> | ||
- <a href="#connected"><code>mqtt.Client#<b>connected</b></code></a> | ||
- <a href="#reconnecting"><code>mqtt.Client#<b>reconnecting</b></code></a> | ||
- <a href="#getLastMessageId"><code>mqtt.Client#<b>getLastMessageId()</b></code></a> | ||
- <a href="#store"><code>mqtt.<b>Store()</b></code></a> | ||
- <a href="#put"><code>mqtt.Store#<b>put()</b></code></a> | ||
- <a href="#del"><code>mqtt.Store#<b>del()</b></code></a> | ||
- <a href="#createStream"><code>mqtt.Store#<b>createStream()</b></code></a> | ||
- <a href="#close"><code>mqtt.Store#<b>close()</b></code></a> | ||
- [`mqtt.connect()`](#connect) | ||
- [`mqtt.Client()`](#client) | ||
- [`mqtt.Client#connect()`](#client-connect) | ||
- [`mqtt.Client#publish()`](#publish) | ||
- [`mqtt.Client#subscribe()`](#subscribe) | ||
- [`mqtt.Client#unsubscribe()`](#unsubscribe) | ||
- [`mqtt.Client#end()`](#end) | ||
- [`mqtt.Client#removeOutgoingMessage()`](#removeOutgoingMessage) | ||
- [`mqtt.Client#reconnect()`](#reconnect) | ||
- [`mqtt.Client#handleMessage()`](#handleMessage) | ||
- [`mqtt.Client#connected`](#connected) | ||
- [`mqtt.Client#reconnecting`](#reconnecting) | ||
- [`mqtt.Client#getLastMessageId()`](#getLastMessageId) | ||
- [`mqtt.Store()`](#store) | ||
- [`mqtt.Store#put()`](#put) | ||
- [`mqtt.Store#del()`](#del) | ||
- [`mqtt.Store#createStream()`](#createStream) | ||
- [`mqtt.Store#close()`](#close) | ||
@@ -370,3 +376,3 @@ --- | ||
- `wsOptions`: is the WebSocket connection options. Default is `{}`. | ||
It's specific for WebSockets. For possible options have a look at: https://github.com/websockets/ws/blob/master/doc/ws.md. | ||
It's specific for WebSockets. For possible options have a look at: <https://github.com/websockets/ws/blob/master/doc/ws.md>. | ||
- `keepalive`: `60` seconds, set to `0` to disable | ||
@@ -389,5 +395,7 @@ - `reschedulePings`: reschedule ping messages after sending packets (default `true`) | ||
- `customHandleAcks`: MQTT 5 feature of custom handling puback and pubrec packets. Its callback: | ||
```js | ||
customHandleAcks: function(topic, message, packet, done) {/*some logic wit colling done(error, reasonCode)*/} | ||
``` | ||
- `autoUseTopicAlias`: enabling automatic Topic Alias using functionality | ||
@@ -427,2 +435,4 @@ - `autoAssignTopicAlias`: enabling automatic Topic Alias assign functionality | ||
- `messageIdProvider`: custom messageId provider. when `new UniqueMessageIdProvider()` is set, then non conflict messageId is provided. | ||
- `log`: custom log function. Default uses [debug](https://www.npmjs.com/package/debug) package. | ||
- `manualConnect`: prevents the constructor to call `connect`. In this case after the `mqtt.connect` is called you should call `client.connect` manually. | ||
@@ -501,3 +511,3 @@ In case mqtts (mqtt over tls) is required, the `options` object is | ||
Emitted when <a href="#end"><code>mqtt.Client#<b>end()</b></code></a> is called. | ||
Emitted when [`mqtt.Client#end()`](#end) is called. | ||
If a callback was passed to `mqtt.Client#end()`, this event is emitted once the | ||
@@ -540,2 +550,8 @@ callback returns. | ||
<a name="client-connect"></a> | ||
### mqtt.Client#connect() | ||
By default client connects when constructor is called. To prevent this you can set `manualConnect` option to `true` and call `client.connect()` manually. | ||
<a name="publish"></a> | ||
@@ -752,5 +768,5 @@ | ||
The MQTT.js bundle is available through http://unpkg.com, specifically | ||
at https://unpkg.com/mqtt/dist/mqtt.min.js. | ||
See http://unpkg.com for the full documentation on version ranges. | ||
The MQTT.js bundle is available through <http://unpkg.com>, specifically | ||
at <https://unpkg.com/mqtt/dist/mqtt.min.js>. | ||
See <http://unpkg.com> for the full documentation on version ranges. | ||
@@ -757,0 +773,0 @@ <a name="browserify"></a> |
@@ -64,2 +64,4 @@ import { MqttClient } from './client' | ||
log?: (...args: any[]) => void | ||
autoUseTopicAlias?: boolean | ||
@@ -109,3 +111,8 @@ autoAssignTopicAlias?: boolean | ||
userProperties?: UserProperties | ||
} | ||
}, | ||
authPacket?: any | ||
/** Prevent to call `connect` in constructor */ | ||
manualConnect?: boolean | ||
} | ||
@@ -201,2 +208,11 @@ transformWsUrl?: (url: string, options: IClientOptions, client: MqttClient) => string, | ||
} | ||
export interface IClientSubscribeProperties { | ||
/* | ||
* MQTT 5.0 properies object of subscribe | ||
* */ | ||
properties?: { | ||
subscriptionIdentifier?: number, | ||
userProperties?: UserProperties | ||
} | ||
} | ||
export interface IClientReconnectOptions { | ||
@@ -203,0 +219,0 @@ /** |
@@ -8,2 +8,3 @@ /// <reference types="node" /> | ||
IClientSubscribeOptions, | ||
IClientSubscribeProperties, | ||
IClientReconnectOptions | ||
@@ -102,2 +103,4 @@ } from './client-options' | ||
static defaultId (): string | ||
constructor (streamBuilder: (client: MqttClient) => IStream, options: IClientOptions) | ||
@@ -124,2 +127,7 @@ | ||
/** | ||
* Setup the event handlers in the inner stream, sends `connect` and `auth` packets | ||
*/ | ||
public connect(): this | ||
/** | ||
* publish - publish <message> to <topic> | ||
@@ -170,3 +178,7 @@ * | ||
string | ||
| string[], opts: IClientSubscribeOptions, callback?: ClientSubscribeCallback): this | ||
| string[] | ||
| ISubscriptionMap, | ||
opts: | ||
IClientSubscribeOptions | ||
| IClientSubscribeProperties, callback?: ClientSubscribeCallback): this | ||
public subscribe (topic: | ||
@@ -173,0 +185,0 @@ string |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
1209889
14
42
28720
958
33
+ Addedbrace-expansion@2.0.1(transitive)
+ Addedcommist@3.2.0(transitive)
+ Addedglob@8.1.0(transitive)
+ Addedhelp-me@4.2.0(transitive)
+ Addedlru-cache@7.18.3(transitive)
+ Addedminimatch@5.1.6(transitive)
+ Addedsplit2@4.2.0(transitive)
+ Addedws@8.18.0(transitive)
- Removedinherits@^2.0.3
- Removedpump@^3.0.0
- Removedxtend@^4.0.2
- Removedbrace-expansion@1.1.11(transitive)
- Removedcommist@1.1.0(transitive)
- Removedconcat-map@0.0.1(transitive)
- Removedglob@7.2.3(transitive)
- Removedhelp-me@3.0.0(transitive)
- Removedleven@2.1.0(transitive)
- Removedlru-cache@6.0.0(transitive)
- Removedminimatch@3.1.2(transitive)
- Removedpath-is-absolute@1.0.1(transitive)
- Removedpump@3.0.2(transitive)
- Removedsplit2@3.2.2(transitive)
- Removedws@7.5.10(transitive)
- Removedxtend@4.0.2(transitive)
- Removedyallist@4.0.0(transitive)
Updatedcommist@^3.2.0
Updateddebug@^4.3.4
Updatedduplexify@^4.1.2
Updatedhelp-me@^4.2.0
Updatedlru-cache@^7.18.3
Updatedminimist@^1.2.8
Updatedmqtt-packet@^8.2.0
Updatedreadable-stream@^4.4.2
Updatedsplit2@^4.2.0
Updatedws@^8.13.0