Comparing version 5.0.0-beta.2 to 5.0.0-beta.3
#!/usr/bin/env node | ||
'use strict' | ||
@@ -13,4 +12,4 @@ /* | ||
const helpMe = require('help-me')({ | ||
dir: path.join(path.dirname(require.main.filename), '/../doc'), | ||
ext: '.txt' | ||
dir: path.join(path.dirname(require.main.filename), '/../doc'), | ||
ext: '.txt', | ||
}) | ||
@@ -20,4 +19,5 @@ | ||
commist.register('subscribe', require('./sub')) | ||
commist.register('version', function () { | ||
console.log('MQTT.js version:', require('./../package.json').version) | ||
commist.register('version', () => { | ||
console.log('MQTT.js version:', require('../package.json').version) | ||
}) | ||
@@ -27,4 +27,4 @@ commist.register('help', helpMe.toStdout) | ||
if (commist.parse(process.argv.slice(2)) !== null) { | ||
console.log('No such command:', process.argv[2], '\n') | ||
helpMe.toStdout() | ||
console.log('No such command:', process.argv[2], '\n') | ||
helpMe.toStdout() | ||
} |
241
bin/pub.js
#!/usr/bin/env node | ||
'use strict' | ||
const mqtt = require('../') | ||
const pump = require('pump') | ||
const mqtt = require('..') | ||
const { pipeline, Writable } = require('readable-stream') | ||
const path = require('path') | ||
const fs = require('fs') | ||
const concat = require('concat-stream') | ||
const Writable = require('readable-stream').Writable | ||
const helpMe = require('help-me')({ | ||
dir: path.join(__dirname, '..', 'doc') | ||
dir: path.join(__dirname, '..', 'doc'), | ||
}) | ||
@@ -17,125 +14,141 @@ const minimist = require('minimist') | ||
function send (args) { | ||
const client = mqtt.connect(args) | ||
client.on('connect', function () { | ||
client.publish(args.topic, args.message, args, function (err) { | ||
if (err) { | ||
console.warn(err) | ||
} | ||
client.end() | ||
}) | ||
}) | ||
client.on('error', function (err) { | ||
console.warn(err) | ||
client.end() | ||
}) | ||
function send(args) { | ||
const client = mqtt.connect(args) | ||
client.on('connect', () => { | ||
client.publish(args.topic, args.message, args, (err) => { | ||
if (err) { | ||
console.warn(err) | ||
} | ||
client.end() | ||
}) | ||
}) | ||
client.on('error', (err) => { | ||
console.warn(err) | ||
client.end() | ||
}) | ||
} | ||
function multisend (args) { | ||
const client = mqtt.connect(args) | ||
const sender = new Writable({ | ||
objectMode: true | ||
}) | ||
sender._write = function (line, enc, cb) { | ||
client.publish(args.topic, line.trim(), args, cb) | ||
} | ||
function multisend(args) { | ||
const client = mqtt.connect(args) | ||
const sender = new Writable({ | ||
objectMode: true, | ||
}) | ||
sender._write = (line, enc, cb) => { | ||
client.publish(args.topic, line.trim(), args, cb) | ||
} | ||
client.on('connect', function () { | ||
pump(process.stdin, split2(), sender, function (err) { | ||
client.end() | ||
if (err) { | ||
throw err | ||
} | ||
}) | ||
}) | ||
client.on('connect', () => { | ||
pipeline(process.stdin, split2(), sender, (err) => { | ||
client.end() | ||
if (err) { | ||
throw err | ||
} | ||
}) | ||
}) | ||
} | ||
function start (args) { | ||
args = minimist(args, { | ||
string: ['hostname', 'username', 'password', 'key', 'cert', 'ca', 'message', 'clientId', 'i', 'id'], | ||
boolean: ['stdin', 'retain', 'help', 'insecure', 'multiline'], | ||
alias: { | ||
port: 'p', | ||
hostname: ['h', 'host'], | ||
topic: 't', | ||
message: 'm', | ||
qos: 'q', | ||
clientId: ['i', 'id'], | ||
retain: 'r', | ||
username: 'u', | ||
password: 'P', | ||
stdin: 's', | ||
multiline: 'M', | ||
protocol: ['C', 'l'], | ||
help: 'H', | ||
ca: 'cafile' | ||
}, | ||
default: { | ||
host: 'localhost', | ||
qos: 0, | ||
retain: false, | ||
topic: '', | ||
message: '' | ||
} | ||
}) | ||
function start(args) { | ||
args = minimist(args, { | ||
string: [ | ||
'hostname', | ||
'username', | ||
'password', | ||
'key', | ||
'cert', | ||
'ca', | ||
'message', | ||
'clientId', | ||
'i', | ||
'id', | ||
], | ||
boolean: ['stdin', 'retain', 'help', 'insecure', 'multiline'], | ||
alias: { | ||
port: 'p', | ||
hostname: ['h', 'host'], | ||
topic: 't', | ||
message: 'm', | ||
qos: 'q', | ||
clientId: ['i', 'id'], | ||
retain: 'r', | ||
username: 'u', | ||
password: 'P', | ||
stdin: 's', | ||
multiline: 'M', | ||
protocol: ['C', 'l'], | ||
help: 'H', | ||
ca: 'cafile', | ||
}, | ||
default: { | ||
host: 'localhost', | ||
qos: 0, | ||
retain: false, | ||
topic: '', | ||
message: '', | ||
}, | ||
}) | ||
if (args.help) { | ||
return helpMe.toStdout('publish') | ||
} | ||
if (args.help) { | ||
return helpMe.toStdout('publish') | ||
} | ||
if (args.key) { | ||
args.key = fs.readFileSync(args.key) | ||
} | ||
if (args.key) { | ||
args.key = fs.readFileSync(args.key) | ||
} | ||
if (args.cert) { | ||
args.cert = fs.readFileSync(args.cert) | ||
} | ||
if (args.cert) { | ||
args.cert = fs.readFileSync(args.cert) | ||
} | ||
if (args.ca) { | ||
args.ca = fs.readFileSync(args.ca) | ||
} | ||
if (args.ca) { | ||
args.ca = fs.readFileSync(args.ca) | ||
} | ||
if (args.key && args.cert && !args.protocol) { | ||
args.protocol = 'mqtts' | ||
} | ||
if (args.key && args.cert && !args.protocol) { | ||
args.protocol = 'mqtts' | ||
} | ||
if (args.port) { | ||
if (typeof args.port !== 'number') { | ||
console.warn('# Port: number expected, \'%s\' was given.', typeof args.port) | ||
return | ||
} | ||
} | ||
if (args.port) { | ||
if (typeof args.port !== 'number') { | ||
console.warn( | ||
"# Port: number expected, '%s' was given.", | ||
typeof args.port, | ||
) | ||
return | ||
} | ||
} | ||
if (args['will-topic']) { | ||
args.will = {} | ||
args.will.topic = args['will-topic'] | ||
args.will.payload = args['will-message'] | ||
args.will.qos = args['will-qos'] | ||
args.will.retain = args['will-retain'] | ||
} | ||
if (args['will-topic']) { | ||
args.will = {} | ||
args.will.topic = args['will-topic'] | ||
args.will.payload = args['will-message'] | ||
args.will.qos = args['will-qos'] | ||
args.will.retain = args['will-retain'] | ||
} | ||
if (args.insecure) { | ||
args.rejectUnauthorized = false | ||
} | ||
if (args.insecure) { | ||
args.rejectUnauthorized = false | ||
} | ||
args.topic = (args.topic || args._.shift()).toString() | ||
args.message = (args.message || args._.shift()).toString() | ||
args.topic = (args.topic || args._.shift()).toString() | ||
args.message = (args.message || args._.shift()).toString() | ||
if (!args.topic) { | ||
console.error('missing topic\n') | ||
return helpMe.toStdout('publish') | ||
} | ||
if (!args.topic) { | ||
console.error('missing topic\n') | ||
return helpMe.toStdout('publish') | ||
} | ||
if (args.stdin) { | ||
if (args.multiline) { | ||
multisend(args) | ||
} else { | ||
process.stdin.pipe(concat(function (data) { | ||
args.message = data | ||
send(args) | ||
})) | ||
} | ||
} else { | ||
send(args) | ||
} | ||
if (args.stdin) { | ||
if (args.multiline) { | ||
multisend(args) | ||
} else { | ||
process.stdin.pipe( | ||
concat((data) => { | ||
args.message = data | ||
send(args) | ||
}), | ||
) | ||
} | ||
} else { | ||
send(args) | ||
} | ||
} | ||
@@ -146,3 +159,3 @@ | ||
if (require.main === module) { | ||
start(process.argv.slice(2)) | ||
start(process.argv.slice(2)) | ||
} |
204
bin/sub.js
#!/usr/bin/env node | ||
const mqtt = require('../') | ||
const mqtt = require('..') | ||
const path = require('path') | ||
const fs = require('fs') | ||
const helpMe = require('help-me')({ | ||
dir: path.join(__dirname, '..', 'doc') | ||
dir: path.join(__dirname, '..', 'doc'), | ||
}) | ||
const minimist = require('minimist') | ||
function start (args) { | ||
args = minimist(args, { | ||
string: ['hostname', 'username', 'password', 'key', 'cert', 'ca', 'clientId', 'i', 'id'], | ||
boolean: ['stdin', 'help', 'clean', 'insecure'], | ||
alias: { | ||
port: 'p', | ||
hostname: ['h', 'host'], | ||
topic: 't', | ||
qos: 'q', | ||
clean: 'c', | ||
keepalive: 'k', | ||
clientId: ['i', 'id'], | ||
username: 'u', | ||
password: 'P', | ||
protocol: ['C', 'l'], | ||
verbose: 'v', | ||
help: '-H', | ||
ca: 'cafile' | ||
}, | ||
default: { | ||
host: 'localhost', | ||
qos: 0, | ||
retain: false, | ||
clean: true, | ||
keepAlive: 30 // 30 sec | ||
} | ||
}) | ||
function start(args) { | ||
args = minimist(args, { | ||
string: [ | ||
'hostname', | ||
'username', | ||
'password', | ||
'key', | ||
'cert', | ||
'ca', | ||
'clientId', | ||
'i', | ||
'id', | ||
], | ||
boolean: ['stdin', 'help', 'clean', 'insecure'], | ||
alias: { | ||
port: 'p', | ||
hostname: ['h', 'host'], | ||
topic: 't', | ||
qos: 'q', | ||
clean: 'c', | ||
keepalive: 'k', | ||
clientId: ['i', 'id'], | ||
username: 'u', | ||
password: 'P', | ||
protocol: ['C', 'l'], | ||
verbose: 'v', | ||
help: '-H', | ||
ca: 'cafile', | ||
}, | ||
default: { | ||
host: 'localhost', | ||
qos: 0, | ||
retain: false, | ||
clean: true, | ||
keepAlive: 30, // 30 sec | ||
}, | ||
}) | ||
if (args.help) { | ||
return helpMe.toStdout('subscribe') | ||
} | ||
if (args.help) { | ||
return helpMe.toStdout('subscribe') | ||
} | ||
args.topic = args.topic || args._.shift() | ||
args.topic = args.topic || args._.shift() | ||
if (!args.topic) { | ||
console.error('missing topic\n') | ||
return helpMe.toStdout('subscribe') | ||
} | ||
if (!args.topic) { | ||
console.error('missing topic\n') | ||
return helpMe.toStdout('subscribe') | ||
} | ||
if (args.key) { | ||
args.key = fs.readFileSync(args.key) | ||
} | ||
if (args.key) { | ||
args.key = fs.readFileSync(args.key) | ||
} | ||
if (args.cert) { | ||
args.cert = fs.readFileSync(args.cert) | ||
} | ||
if (args.cert) { | ||
args.cert = fs.readFileSync(args.cert) | ||
} | ||
if (args.ca) { | ||
args.ca = fs.readFileSync(args.ca) | ||
} | ||
if (args.ca) { | ||
args.ca = fs.readFileSync(args.ca) | ||
} | ||
if (args.key && args.cert && !args.protocol) { | ||
args.protocol = 'mqtts' | ||
} | ||
if (args.key && args.cert && !args.protocol) { | ||
args.protocol = 'mqtts' | ||
} | ||
if (args.insecure) { | ||
args.rejectUnauthorized = false | ||
} | ||
if (args.insecure) { | ||
args.rejectUnauthorized = false | ||
} | ||
if (args.port) { | ||
if (typeof args.port !== 'number') { | ||
console.warn('# Port: number expected, \'%s\' was given.', typeof args.port) | ||
return | ||
} | ||
} | ||
if (args.port) { | ||
if (typeof args.port !== 'number') { | ||
console.warn( | ||
"# Port: number expected, '%s' was given.", | ||
typeof args.port, | ||
) | ||
return | ||
} | ||
} | ||
if (args['will-topic']) { | ||
args.will = {} | ||
args.will.topic = args['will-topic'] | ||
args.will.payload = args['will-message'] | ||
args.will.qos = args['will-qos'] | ||
args.will.retain = args['will-retain'] | ||
} | ||
if (args['will-topic']) { | ||
args.will = {} | ||
args.will.topic = args['will-topic'] | ||
args.will.payload = args['will-message'] | ||
args.will.qos = args['will-qos'] | ||
args.will.retain = args['will-retain'] | ||
} | ||
args.keepAlive = args['keep-alive'] | ||
args.keepAlive = args['keep-alive'] | ||
const client = mqtt.connect(args) | ||
const client = mqtt.connect(args) | ||
client.on('connect', function () { | ||
client.subscribe(args.topic, { qos: args.qos }, function (err, result) { | ||
if (err) { | ||
console.error(err) | ||
process.exit(1) | ||
} | ||
client.on('connect', () => { | ||
client.subscribe(args.topic, { qos: args.qos }, (err, result) => { | ||
if (err) { | ||
console.error(err) | ||
process.exit(1) | ||
} | ||
result.forEach(function (sub) { | ||
if (sub.qos > 2) { | ||
console.error('subscription negated to', sub.topic, 'with code', sub.qos) | ||
process.exit(1) | ||
} | ||
}) | ||
}) | ||
}) | ||
result.forEach((sub) => { | ||
if (sub.qos > 2) { | ||
console.error( | ||
'subscription negated to', | ||
sub.topic, | ||
'with code', | ||
sub.qos, | ||
) | ||
process.exit(1) | ||
} | ||
}) | ||
}) | ||
}) | ||
client.on('message', function (topic, payload) { | ||
if (args.verbose) { | ||
console.log(topic, payload.toString()) | ||
} else { | ||
console.log(payload.toString()) | ||
} | ||
}) | ||
client.on('message', (topic, payload) => { | ||
if (args.verbose) { | ||
console.log(topic, payload.toString()) | ||
} else { | ||
console.log(payload.toString()) | ||
} | ||
}) | ||
client.on('error', function (err) { | ||
console.warn(err) | ||
client.end() | ||
}) | ||
client.on('error', (err) => { | ||
console.warn(err) | ||
client.end() | ||
}) | ||
} | ||
@@ -122,3 +140,3 @@ | ||
if (require.main === module) { | ||
start(process.argv.slice(2)) | ||
start(process.argv.slice(2)) | ||
} |
3206
lib/client.js
@@ -1,243 +0,51 @@ | ||
'use strict' | ||
/** | ||
* Module dependencies | ||
*/ | ||
const EventEmitter = require('events').EventEmitter | ||
const Store = require('./store') | ||
const { EventEmitter } = require('events') | ||
const TopicAliasRecv = require('./topic-alias-recv') | ||
const TopicAliasSend = require('./topic-alias-send') | ||
const mqttPacket = require('mqtt-packet') | ||
const DefaultMessageIdProvider = require('./default-message-id-provider') | ||
const Writable = require('readable-stream').Writable | ||
const inherits = require('inherits') | ||
const { Writable } = require('readable-stream') | ||
const reInterval = require('reinterval') | ||
const clone = require('rfdc/default') | ||
const validations = require('./validations') | ||
const xtend = require('xtend') | ||
const debug = require('debug')('mqttjs:client') | ||
const nextTick = process ? process.nextTick : function (callback) { setTimeout(callback, 0) } | ||
const setImmediate = global.setImmediate || function (...args) { | ||
const callback = args.shift() | ||
nextTick(callback.bind(null, ...args)) | ||
} | ||
const Store = require('./store') | ||
const handlePacket = require('./handlers') | ||
const nextTick = process | ||
? process.nextTick | ||
: (callback) => { | ||
setTimeout(callback, 0) | ||
} | ||
const setImmediate = | ||
global.setImmediate || | ||
((...args) => { | ||
const callback = args.shift() | ||
nextTick(() => { | ||
callback(...args) | ||
}) | ||
}) | ||
const defaultConnectOptions = { | ||
keepalive: 60, | ||
reschedulePings: true, | ||
protocolId: 'MQTT', | ||
protocolVersion: 4, | ||
reconnectPeriod: 1000, | ||
connectTimeout: 30 * 1000, | ||
clean: true, | ||
resubscribe: true, | ||
writeCache: true | ||
keepalive: 60, | ||
reschedulePings: true, | ||
protocolId: 'MQTT', | ||
protocolVersion: 4, | ||
reconnectPeriod: 1000, | ||
connectTimeout: 30 * 1000, | ||
clean: true, | ||
resubscribe: true, | ||
writeCache: true, | ||
} | ||
const socketErrors = [ | ||
'ECONNREFUSED', | ||
'EADDRINUSE', | ||
'ECONNRESET', | ||
'ENOTFOUND', | ||
'ETIMEDOUT' | ||
'ECONNREFUSED', | ||
'EADDRINUSE', | ||
'ECONNRESET', | ||
'ENOTFOUND', | ||
'ETIMEDOUT', | ||
] | ||
// Other Socket Errors: EADDRINUSE, ECONNRESET, ENOTFOUND, ETIMEDOUT. | ||
const errors = { | ||
0: '', | ||
1: 'Unacceptable protocol version', | ||
2: 'Identifier rejected', | ||
3: 'Server unavailable', | ||
4: 'Bad username or password', | ||
5: 'Not authorized', | ||
16: 'No matching subscribers', | ||
17: 'No subscription existed', | ||
128: 'Unspecified error', | ||
129: 'Malformed Packet', | ||
130: 'Protocol Error', | ||
131: 'Implementation specific error', | ||
132: 'Unsupported Protocol Version', | ||
133: 'Client Identifier not valid', | ||
134: 'Bad User Name or Password', | ||
135: 'Not authorized', | ||
136: 'Server unavailable', | ||
137: 'Server busy', | ||
138: 'Banned', | ||
139: 'Server shutting down', | ||
140: 'Bad authentication method', | ||
141: 'Keep Alive timeout', | ||
142: 'Session taken over', | ||
143: 'Topic Filter invalid', | ||
144: 'Topic Name invalid', | ||
145: 'Packet identifier in use', | ||
146: 'Packet Identifier not found', | ||
147: 'Receive Maximum exceeded', | ||
148: 'Topic Alias invalid', | ||
149: 'Packet too large', | ||
150: 'Message rate too high', | ||
151: 'Quota exceeded', | ||
152: 'Administrative action', | ||
153: 'Payload format invalid', | ||
154: 'Retain not supported', | ||
155: 'QoS not supported', | ||
156: 'Use another server', | ||
157: 'Server moved', | ||
158: 'Shared Subscriptions not supported', | ||
159: 'Connection rate exceeded', | ||
160: 'Maximum connect time', | ||
161: 'Subscription Identifiers not supported', | ||
162: 'Wildcard Subscriptions not supported' | ||
} | ||
function defaultId () { | ||
return 'mqttjs_' + Math.random().toString(16).substr(2, 8) | ||
} | ||
function applyTopicAlias (client, packet) { | ||
if (client.options.protocolVersion === 5) { | ||
if (packet.cmd === 'publish') { | ||
let alias | ||
if (packet.properties) { | ||
alias = packet.properties.topicAlias | ||
} | ||
const topic = packet.topic.toString() | ||
if (client.topicAliasSend) { | ||
if (alias) { | ||
if (topic.length !== 0) { | ||
// register topic alias | ||
debug('applyTopicAlias :: register topic: %s - alias: %d', topic, alias) | ||
if (!client.topicAliasSend.put(topic, alias)) { | ||
debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias) | ||
return new Error('Sending Topic Alias out of range') | ||
} | ||
} | ||
} else { | ||
if (topic.length !== 0) { | ||
if (client.options.autoAssignTopicAlias) { | ||
alias = client.topicAliasSend.getAliasByTopic(topic) | ||
if (alias) { | ||
packet.topic = '' | ||
packet.properties = { ...(packet.properties), topicAlias: alias } | ||
debug('applyTopicAlias :: auto assign(use) topic: %s - alias: %d', topic, alias) | ||
} else { | ||
alias = client.topicAliasSend.getLruAlias() | ||
client.topicAliasSend.put(topic, alias) | ||
packet.properties = { ...(packet.properties), topicAlias: alias } | ||
debug('applyTopicAlias :: auto assign topic: %s - alias: %d', topic, alias) | ||
} | ||
} else if (client.options.autoUseTopicAlias) { | ||
alias = client.topicAliasSend.getAliasByTopic(topic) | ||
if (alias) { | ||
packet.topic = '' | ||
packet.properties = { ...(packet.properties), topicAlias: alias } | ||
debug('applyTopicAlias :: auto use topic: %s - alias: %d', topic, alias) | ||
} | ||
} | ||
} | ||
} | ||
} else if (alias) { | ||
debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias) | ||
return new Error('Sending Topic Alias out of range') | ||
} | ||
} | ||
} | ||
} | ||
function removeTopicAliasAndRecoverTopicName (client, packet) { | ||
let alias | ||
if (packet.properties) { | ||
alias = packet.properties.topicAlias | ||
} | ||
let topic = packet.topic.toString() | ||
if (topic.length === 0) { | ||
// restore topic from alias | ||
if (typeof alias === 'undefined') { | ||
return new Error('Unregistered Topic Alias') | ||
} else { | ||
topic = client.topicAliasSend.getTopicByAlias(alias) | ||
if (typeof topic === 'undefined') { | ||
return new Error('Unregistered Topic Alias') | ||
} else { | ||
packet.topic = topic | ||
} | ||
} | ||
} | ||
if (alias) { | ||
delete packet.properties.topicAlias | ||
} | ||
} | ||
function sendPacket (client, packet, cb) { | ||
debug('sendPacket :: packet: %O', packet) | ||
debug('sendPacket :: emitting `packetsend`') | ||
client.emit('packetsend', packet) | ||
debug('sendPacket :: writing to stream') | ||
const result = mqttPacket.writeToStream(packet, client.stream, client.options) | ||
debug('sendPacket :: writeToStream result %s', result) | ||
if (!result && cb && cb !== nop) { | ||
debug('sendPacket :: handle events on `drain` once through callback.') | ||
client.stream.once('drain', cb) | ||
} else if (cb) { | ||
debug('sendPacket :: invoking cb') | ||
cb() | ||
} | ||
} | ||
function flush (queue) { | ||
if (queue) { | ||
debug('flush: queue exists? %b', !!(queue)) | ||
Object.keys(queue).forEach(function (messageId) { | ||
if (typeof queue[messageId].cb === 'function') { | ||
queue[messageId].cb(new Error('Connection closed')) | ||
// This is suspicious. Why do we only delete this if we have a callbck? | ||
// If this is by-design, then adding no as callback would cause this to get deleted unintentionally. | ||
delete queue[messageId] | ||
} | ||
}) | ||
} | ||
} | ||
function flushVolatile (queue) { | ||
if (queue) { | ||
debug('flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function') | ||
Object.keys(queue).forEach(function (messageId) { | ||
if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') { | ||
queue[messageId].cb(new Error('Connection closed')) | ||
delete queue[messageId] | ||
} | ||
}) | ||
} | ||
} | ||
function storeAndSend (client, packet, cb, cbStorePut) { | ||
debug('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd) | ||
let storePacket = packet | ||
let err | ||
if (storePacket.cmd === 'publish') { | ||
// The original packet is for sending. | ||
// The cloned storePacket is for storing to resend on reconnect. | ||
// Topic Alias must not be used after disconnected. | ||
storePacket = clone(packet) | ||
err = removeTopicAliasAndRecoverTopicName(client, storePacket) | ||
if (err) { | ||
return cb && cb(err) | ||
} | ||
} | ||
client.outgoingStore.put(storePacket, function storedPacket (err) { | ||
if (err) { | ||
return cb && cb(err) | ||
} | ||
cbStorePut() | ||
sendPacket(client, packet, cb) | ||
}) | ||
} | ||
function nop (error) { | ||
debug('nop ::', error) | ||
} | ||
/** | ||
@@ -250,1670 +58,1612 @@ * MqttClient constructor | ||
*/ | ||
function MqttClient (streamBuilder, options) { | ||
let k | ||
const that = this | ||
class MqttClient extends EventEmitter { | ||
static defaultId() { | ||
return `mqttjs_${Math.random().toString(16).substr(2, 8)}` | ||
} | ||
if (!(this instanceof MqttClient)) { | ||
return new MqttClient(streamBuilder, options) | ||
} | ||
constructor(streamBuilder, options) { | ||
super() | ||
this.options = options || {} | ||
let k | ||
// Defaults | ||
for (k in defaultConnectOptions) { | ||
if (typeof this.options[k] === 'undefined') { | ||
this.options[k] = defaultConnectOptions[k] | ||
} else { | ||
this.options[k] = options[k] | ||
} | ||
} | ||
this.options = options || {} | ||
debug('MqttClient :: options.protocol', options.protocol) | ||
debug('MqttClient :: options.protocolVersion', options.protocolVersion) | ||
debug('MqttClient :: options.username', options.username) | ||
debug('MqttClient :: options.keepalive', options.keepalive) | ||
debug('MqttClient :: options.reconnectPeriod', options.reconnectPeriod) | ||
debug('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized) | ||
debug('MqttClient :: options.properties.topicAliasMaximum', options.properties ? options.properties.topicAliasMaximum : undefined) | ||
// Defaults | ||
for (k in defaultConnectOptions) { | ||
if (typeof this.options[k] === 'undefined') { | ||
this.options[k] = defaultConnectOptions[k] | ||
} else { | ||
this.options[k] = options[k] | ||
} | ||
} | ||
this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId() | ||
this.log = this.options.log || debug | ||
this.noop = this._noop.bind(this) | ||
debug('MqttClient :: clientId', this.options.clientId) | ||
this.log('MqttClient :: options.protocol', options.protocol) | ||
this.log( | ||
'MqttClient :: options.protocolVersion', | ||
options.protocolVersion, | ||
) | ||
this.log('MqttClient :: options.username', options.username) | ||
this.log('MqttClient :: options.keepalive', options.keepalive) | ||
this.log( | ||
'MqttClient :: options.reconnectPeriod', | ||
options.reconnectPeriod, | ||
) | ||
this.log( | ||
'MqttClient :: options.rejectUnauthorized', | ||
options.rejectUnauthorized, | ||
) | ||
this.log( | ||
'MqttClient :: options.properties.topicAliasMaximum', | ||
options.properties | ||
? options.properties.topicAliasMaximum | ||
: undefined, | ||
) | ||
this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) } | ||
this.options.clientId = | ||
typeof options.clientId === 'string' | ||
? options.clientId | ||
: MqttClient.defaultId() | ||
// Disable pre-generated write cache if requested. Will allocate buffers on-the-fly instead. WARNING: This can affect write performance | ||
if (!this.options.writeCache) { | ||
mqttPacket.writeToStream.cacheNumbers = false | ||
} | ||
this.log('MqttClient :: clientId', this.options.clientId) | ||
this.streamBuilder = streamBuilder | ||
this.options.customHandleAcks = | ||
options.protocolVersion === 5 && options.customHandleAcks | ||
? options.customHandleAcks | ||
: (...args) => { | ||
args[3](0) | ||
} | ||
this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider | ||
// Disable pre-generated write cache if requested. Will allocate buffers on-the-fly instead. WARNING: This can affect write performance | ||
if (!this.options.writeCache) { | ||
mqttPacket.writeToStream.cacheNumbers = false | ||
} | ||
// Inflight message storages | ||
this.outgoingStore = options.outgoingStore || new Store() | ||
this.incomingStore = options.incomingStore || new Store() | ||
this.streamBuilder = streamBuilder | ||
// Should QoS zero messages be queued when the connection is broken? | ||
this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero | ||
this.messageIdProvider = | ||
typeof this.options.messageIdProvider === 'undefined' | ||
? new DefaultMessageIdProvider() | ||
: this.options.messageIdProvider | ||
// map of subscribed topics to support reconnection | ||
this._resubscribeTopics = {} | ||
// Inflight message storages | ||
this.outgoingStore = options.outgoingStore || new Store() | ||
this.incomingStore = options.incomingStore || new Store() | ||
// map of a subscribe messageId and a topic | ||
this.messageIdToTopic = {} | ||
// Should QoS zero messages be queued when the connection is broken? | ||
this.queueQoSZero = | ||
options.queueQoSZero === undefined ? true : options.queueQoSZero | ||
// Ping timer, setup in _setupPingTimer | ||
this.pingTimer = null | ||
// Is the client connected? | ||
this.connected = false | ||
// Are we disconnecting? | ||
this.disconnecting = false | ||
// Packet queue | ||
this.queue = [] | ||
// connack timer | ||
this.connackTimer = null | ||
// Reconnect timer | ||
this.reconnectTimer = null | ||
// Is processing store? | ||
this._storeProcessing = false | ||
// Packet Ids are put into the store during store processing | ||
this._packetIdsDuringStoreProcessing = {} | ||
// Store processing queue | ||
this._storeProcessingQueue = [] | ||
// map of subscribed topics to support reconnection | ||
this._resubscribeTopics = {} | ||
// Inflight callbacks | ||
this.outgoing = {} | ||
// map of a subscribe messageId and a topic | ||
this.messageIdToTopic = {} | ||
// True if connection is first time. | ||
this._firstConnection = true | ||
// Ping timer, setup in _setupPingTimer | ||
this.pingTimer = null | ||
// Is the client connected? | ||
this.connected = false | ||
// Are we disconnecting? | ||
this.disconnecting = false | ||
// Packet queue | ||
this.queue = [] | ||
// connack timer | ||
this.connackTimer = null | ||
// Reconnect timer | ||
this.reconnectTimer = null | ||
// Is processing store? | ||
this._storeProcessing = false | ||
// Packet Ids are put into the store during store processing | ||
this._packetIdsDuringStoreProcessing = {} | ||
// Store processing queue | ||
this._storeProcessingQueue = [] | ||
if (options.properties && options.properties.topicAliasMaximum > 0) { | ||
if (options.properties.topicAliasMaximum > 0xffff) { | ||
debug('MqttClient :: options.properties.topicAliasMaximum is out of range') | ||
} else { | ||
this.topicAliasRecv = new TopicAliasRecv(options.properties.topicAliasMaximum) | ||
} | ||
} | ||
// Inflight callbacks | ||
this.outgoing = {} | ||
// Send queued packets | ||
this.on('connect', function () { | ||
const queue = that.queue | ||
// True if connection is first time. | ||
this._firstConnection = true | ||
function deliver () { | ||
const entry = queue.shift() | ||
debug('deliver :: entry %o', entry) | ||
let packet = null | ||
if (options.properties && options.properties.topicAliasMaximum > 0) { | ||
if (options.properties.topicAliasMaximum > 0xffff) { | ||
this.log( | ||
'MqttClient :: options.properties.topicAliasMaximum is out of range', | ||
) | ||
} else { | ||
this.topicAliasRecv = new TopicAliasRecv( | ||
options.properties.topicAliasMaximum, | ||
) | ||
} | ||
} | ||
if (!entry) { | ||
that._resubscribe() | ||
return | ||
} | ||
// Send queued packets | ||
this.on('connect', () => { | ||
const { queue } = this | ||
packet = entry.packet | ||
debug('deliver :: call _sendPacket for %o', packet) | ||
let send = true | ||
if (packet.messageId && packet.messageId !== 0) { | ||
if (!that.messageIdProvider.register(packet.messageId)) { | ||
send = false | ||
} | ||
} | ||
if (send) { | ||
that._sendPacket( | ||
packet, | ||
function (err) { | ||
if (entry.cb) { | ||
entry.cb(err) | ||
} | ||
deliver() | ||
} | ||
) | ||
} else { | ||
debug('messageId: %d has already used. The message is skipped and removed.', packet.messageId) | ||
deliver() | ||
} | ||
} | ||
const deliver = () => { | ||
const entry = queue.shift() | ||
this.log('deliver :: entry %o', entry) | ||
let packet = null | ||
debug('connect :: sending queued packets') | ||
deliver() | ||
}) | ||
if (!entry) { | ||
this._resubscribe() | ||
return | ||
} | ||
this.on('close', function () { | ||
debug('close :: connected set to `false`') | ||
that.connected = false | ||
packet = entry.packet | ||
this.log('deliver :: call _sendPacket for %o', packet) | ||
let send = true | ||
if (packet.messageId && packet.messageId !== 0) { | ||
if (!this.messageIdProvider.register(packet.messageId)) { | ||
send = false | ||
} | ||
} | ||
if (send) { | ||
this._sendPacket(packet, (err) => { | ||
if (entry.cb) { | ||
entry.cb(err) | ||
} | ||
deliver() | ||
}) | ||
} else { | ||
this.log( | ||
'messageId: %d has already used. The message is skipped and removed.', | ||
packet.messageId, | ||
) | ||
deliver() | ||
} | ||
} | ||
debug('close :: clearing connackTimer') | ||
clearTimeout(that.connackTimer) | ||
this.log('connect :: sending queued packets') | ||
deliver() | ||
}) | ||
debug('close :: clearing ping timer') | ||
if (that.pingTimer !== null) { | ||
that.pingTimer.clear() | ||
that.pingTimer = null | ||
} | ||
this.on('close', () => { | ||
this.log('close :: connected set to `false`') | ||
this.connected = false | ||
if (that.topicAliasRecv) { | ||
that.topicAliasRecv.clear() | ||
} | ||
this.log('close :: clearing connackTimer') | ||
clearTimeout(this.connackTimer) | ||
debug('close :: calling _setupReconnect') | ||
that._setupReconnect() | ||
}) | ||
EventEmitter.call(this) | ||
this.log('close :: clearing ping timer') | ||
if (this.pingTimer !== null) { | ||
this.pingTimer.clear() | ||
this.pingTimer = null | ||
} | ||
debug('MqttClient :: setting up stream') | ||
this._setupStream() | ||
} | ||
inherits(MqttClient, EventEmitter) | ||
if (this.topicAliasRecv) { | ||
this.topicAliasRecv.clear() | ||
} | ||
/** | ||
* setup the event handlers in the inner stream. | ||
* | ||
* @api private | ||
*/ | ||
MqttClient.prototype._setupStream = function () { | ||
const that = this | ||
const writable = new Writable() | ||
const parser = mqttPacket.parser(this.options) | ||
let completeParse = null | ||
const packets = [] | ||
this.log('close :: calling _setupReconnect') | ||
this._setupReconnect() | ||
}) | ||
debug('_setupStream :: calling method to clear reconnect') | ||
this._clearReconnect() | ||
if (!this.options.manualConnect) { | ||
this.log('MqttClient :: setting up stream') | ||
this.connect() | ||
} | ||
} | ||
debug('_setupStream :: using streamBuilder provided to client to create stream') | ||
this.stream = this.streamBuilder(this) | ||
/** | ||
* Setup the event handlers in the inner stream, sends `connect` and `auth` packets | ||
*/ | ||
connect() { | ||
const writable = new Writable() | ||
const parser = mqttPacket.parser(this.options) | ||
let completeParse = null | ||
const packets = [] | ||
parser.on('packet', function (packet) { | ||
debug('parser :: on packet push to packets array.') | ||
packets.push(packet) | ||
}) | ||
this.log('connect :: calling method to clear reconnect') | ||
this._clearReconnect() | ||
function nextTickWork () { | ||
if (packets.length) { | ||
nextTick(work) | ||
} else { | ||
const done = completeParse | ||
completeParse = null | ||
done() | ||
} | ||
} | ||
this.log( | ||
'connect :: using streamBuilder provided to client to create stream', | ||
) | ||
this.stream = this.streamBuilder(this) | ||
function work () { | ||
debug('work :: getting next packet in queue') | ||
const packet = packets.shift() | ||
parser.on('packet', (packet) => { | ||
this.log('parser :: on packet push to packets array.') | ||
packets.push(packet) | ||
}) | ||
if (packet) { | ||
debug('work :: packet pulled from queue') | ||
that._handlePacket(packet, nextTickWork) | ||
} else { | ||
debug('work :: no packets in queue') | ||
const done = completeParse | ||
completeParse = null | ||
debug('work :: done flag is %s', !!(done)) | ||
if (done) done() | ||
} | ||
} | ||
const work = () => { | ||
this.log('work :: getting next packet in queue') | ||
const packet = packets.shift() | ||
writable._write = function (buf, enc, done) { | ||
completeParse = done | ||
debug('writable stream :: parsing buffer') | ||
parser.parse(buf) | ||
work() | ||
} | ||
if (packet) { | ||
this.log('work :: packet pulled from queue') | ||
handlePacket(this, packet, nextTickWork) | ||
} else { | ||
this.log('work :: no packets in queue') | ||
const done = completeParse | ||
completeParse = null | ||
this.log('work :: done flag is %s', !!done) | ||
if (done) done() | ||
} | ||
} | ||
function streamErrorHandler (error) { | ||
debug('streamErrorHandler :: error', error.message) | ||
if (socketErrors.includes(error.code)) { | ||
// handle error | ||
debug('streamErrorHandler :: emitting error') | ||
that.emit('error', error) | ||
} else { | ||
nop(error) | ||
} | ||
} | ||
const nextTickWork = () => { | ||
if (packets.length) { | ||
nextTick(work) | ||
} else { | ||
const done = completeParse | ||
completeParse = null | ||
done() | ||
} | ||
} | ||
debug('_setupStream :: pipe stream to writable stream') | ||
this.stream.pipe(writable) | ||
writable._write = (buf, enc, done) => { | ||
completeParse = done | ||
this.log('writable stream :: parsing buffer') | ||
parser.parse(buf) | ||
work() | ||
} | ||
// Suppress connection errors | ||
this.stream.on('error', streamErrorHandler) | ||
const streamErrorHandler = (error) => { | ||
this.log('streamErrorHandler :: error', error.message) | ||
if (socketErrors.includes(error.code)) { | ||
// handle error | ||
this.log('streamErrorHandler :: emitting error') | ||
this.emit('error', error) | ||
} else { | ||
this.noop(error) | ||
} | ||
} | ||
// Echo stream close | ||
this.stream.on('close', function () { | ||
debug('(%s)stream :: on close', that.options.clientId) | ||
flushVolatile(that.outgoing) | ||
debug('stream: emit close to MqttClient') | ||
that.emit('close') | ||
}) | ||
this.log('connect :: pipe stream to writable stream') | ||
this.stream.pipe(writable) | ||
// Send a connect packet | ||
debug('_setupStream: sending packet `connect`') | ||
const connectPacket = Object.create(this.options) | ||
connectPacket.cmd = 'connect' | ||
if (this.topicAliasRecv) { | ||
if (!connectPacket.properties) { | ||
connectPacket.properties = {} | ||
} | ||
if (this.topicAliasRecv) { | ||
connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max | ||
} | ||
} | ||
// avoid message queue | ||
sendPacket(this, connectPacket) | ||
// Suppress connection errors | ||
this.stream.on('error', streamErrorHandler) | ||
// Echo connection errors | ||
parser.on('error', this.emit.bind(this, 'error')) | ||
// Echo stream close | ||
this.stream.on('close', () => { | ||
this.log('(%s)stream :: on close', this.options.clientId) | ||
this._flushVolatile(this.outgoing) | ||
this.log('stream: emit close to MqttClient') | ||
this.emit('close') | ||
}) | ||
// auth | ||
if (this.options.properties) { | ||
if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) { | ||
that.end(() => | ||
this.emit('error', new Error('Packet has no Authentication Method') | ||
)) | ||
return this | ||
} | ||
if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') { | ||
const authPacket = xtend({ cmd: 'auth', reasonCode: 0 }, this.options.authPacket) | ||
sendPacket(this, authPacket) | ||
} | ||
} | ||
// Send a connect packet | ||
this.log('connect: sending packet `connect`') | ||
const connectPacket = Object.create(this.options) | ||
connectPacket.cmd = 'connect' | ||
if (this.topicAliasRecv) { | ||
if (!connectPacket.properties) { | ||
connectPacket.properties = {} | ||
} | ||
if (this.topicAliasRecv) { | ||
connectPacket.properties.topicAliasMaximum = | ||
this.topicAliasRecv.max | ||
} | ||
} | ||
// avoid message queue | ||
this._writePacket(connectPacket) | ||
// many drain listeners are needed for qos 1 callbacks if the connection is intermittent | ||
this.stream.setMaxListeners(1000) | ||
// Echo connection errors | ||
parser.on('error', this.emit.bind(this, 'error')) | ||
clearTimeout(this.connackTimer) | ||
this.connackTimer = setTimeout(function () { | ||
debug('!!connectTimeout hit!! Calling _cleanUp with force `true`') | ||
that._cleanUp(true) | ||
}, this.options.connectTimeout) | ||
} | ||
// auth | ||
if (this.options.properties) { | ||
if ( | ||
!this.options.properties.authenticationMethod && | ||
this.options.properties.authenticationData | ||
) { | ||
this.end(() => | ||
this.emit( | ||
'error', | ||
new Error('Packet has no Authentication Method'), | ||
), | ||
) | ||
return this | ||
} | ||
if ( | ||
this.options.properties.authenticationMethod && | ||
this.options.authPacket && | ||
typeof this.options.authPacket === 'object' | ||
) { | ||
const authPacket = { | ||
cmd: 'auth', | ||
reasonCode: 0, | ||
...this.options.authPacket, | ||
} | ||
this._writePacket(authPacket) | ||
} | ||
} | ||
MqttClient.prototype._handlePacket = function (packet, done) { | ||
const options = this.options | ||
// many drain listeners are needed for qos 1 callbacks if the connection is intermittent | ||
this.stream.setMaxListeners(1000) | ||
if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) { | ||
this.emit('error', new Error('exceeding packets size ' + packet.cmd)) | ||
this.end({ reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' } }) | ||
return this | ||
} | ||
debug('_handlePacket :: emitting packetreceive') | ||
this.emit('packetreceive', packet) | ||
clearTimeout(this.connackTimer) | ||
this.connackTimer = setTimeout(() => { | ||
this.log( | ||
'!!connectTimeout hit!! Calling _cleanUp with force `true`', | ||
) | ||
this._cleanUp(true) | ||
}, this.options.connectTimeout) | ||
switch (packet.cmd) { | ||
case 'publish': | ||
this._handlePublish(packet, done) | ||
break | ||
case 'puback': | ||
case 'pubrec': | ||
case 'pubcomp': | ||
case 'suback': | ||
case 'unsuback': | ||
this._handleAck(packet) | ||
done() | ||
break | ||
case 'pubrel': | ||
this._handlePubrel(packet, done) | ||
break | ||
case 'connack': | ||
this._handleConnack(packet) | ||
done() | ||
break | ||
case 'auth': | ||
this._handleAuth(packet) | ||
done() | ||
break | ||
case 'pingresp': | ||
this._handlePingresp(packet) | ||
done() | ||
break | ||
case 'disconnect': | ||
this._handleDisconnect(packet) | ||
done() | ||
break | ||
default: | ||
// do nothing | ||
// maybe we should do an error handling | ||
// or just log it | ||
break | ||
} | ||
} | ||
return this | ||
} | ||
MqttClient.prototype._checkDisconnecting = function (callback) { | ||
if (this.disconnecting) { | ||
if (callback && callback !== nop) { | ||
callback(new Error('client disconnecting')) | ||
} else { | ||
this.emit('error', new Error('client disconnecting')) | ||
} | ||
} | ||
return this.disconnecting | ||
} | ||
_flushVolatile(queue) { | ||
if (queue) { | ||
this.log( | ||
'_flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function', | ||
) | ||
Object.keys(queue).forEach((messageId) => { | ||
if ( | ||
queue[messageId].volatile && | ||
typeof queue[messageId].cb === 'function' | ||
) { | ||
queue[messageId].cb(new Error('Connection closed')) | ||
delete queue[messageId] | ||
} | ||
}) | ||
} | ||
} | ||
/** | ||
* publish - publish <message> to <topic> | ||
* | ||
* @param {String} topic - topic to publish to | ||
* @param {String, Buffer} message - message to publish | ||
* @param {Object} [opts] - publish options, includes: | ||
* {Number} qos - qos level to publish on | ||
* {Boolean} retain - whether or not to retain the message | ||
* {Boolean} dup - whether or not mark a message as duplicate | ||
* {Function} cbStorePut - function(){} called when message is put into `outgoingStore` | ||
* @param {Function} [callback] - function(err){} | ||
* called when publish succeeds or fails | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* | ||
* @example client.publish('topic', 'message'); | ||
* @example | ||
* client.publish('topic', 'message', {qos: 1, retain: true, dup: true}); | ||
* @example client.publish('topic', 'message', console.log); | ||
*/ | ||
MqttClient.prototype.publish = function (topic, message, opts, callback) { | ||
debug('publish :: message `%s` to topic `%s`', message, topic) | ||
const options = this.options | ||
_flush(queue) { | ||
if (queue) { | ||
this.log('_flush: queue exists? %b', !!queue) | ||
Object.keys(queue).forEach((messageId) => { | ||
if (typeof queue[messageId].cb === 'function') { | ||
queue[messageId].cb(new Error('Connection closed')) | ||
// This is suspicious. Why do we only delete this if we have a callback? | ||
// If this is by-design, then adding no as callback would cause this to get deleted unintentionally. | ||
delete queue[messageId] | ||
} | ||
}) | ||
} | ||
} | ||
// .publish(topic, payload, cb); | ||
if (typeof opts === 'function') { | ||
callback = opts | ||
opts = null | ||
} | ||
_removeTopicAliasAndRecoverTopicName(packet) { | ||
let alias | ||
if (packet.properties) { | ||
alias = packet.properties.topicAlias | ||
} | ||
// default opts | ||
const defaultOpts = { qos: 0, retain: false, dup: false } | ||
opts = xtend(defaultOpts, opts) | ||
let topic = packet.topic.toString() | ||
if (this._checkDisconnecting(callback)) { | ||
return this | ||
} | ||
this.log( | ||
'_removeTopicAliasAndRecoverTopicName :: alias %d, topic %o', | ||
alias, | ||
topic, | ||
) | ||
const that = this | ||
const publishProc = function () { | ||
let messageId = 0 | ||
if (opts.qos === 1 || opts.qos === 2) { | ||
messageId = that._nextId() | ||
if (messageId === null) { | ||
debug('No messageId left') | ||
return false | ||
} | ||
} | ||
const packet = { | ||
cmd: 'publish', | ||
topic: topic, | ||
payload: message, | ||
qos: opts.qos, | ||
retain: opts.retain, | ||
messageId: messageId, | ||
dup: opts.dup | ||
} | ||
if (topic.length === 0) { | ||
// restore topic from alias | ||
if (typeof alias === 'undefined') { | ||
return new Error('Unregistered Topic Alias') | ||
} | ||
topic = this.topicAliasSend.getTopicByAlias(alias) | ||
if (typeof topic === 'undefined') { | ||
return new Error('Unregistered Topic Alias') | ||
} | ||
packet.topic = topic | ||
} | ||
if (alias) { | ||
delete packet.properties.topicAlias | ||
} | ||
} | ||
if (options.protocolVersion === 5) { | ||
packet.properties = opts.properties | ||
} | ||
_checkDisconnecting(callback) { | ||
if (this.disconnecting) { | ||
if (callback && callback !== this.noop) { | ||
callback(new Error('client disconnecting')) | ||
} else { | ||
this.emit('error', new Error('client disconnecting')) | ||
} | ||
} | ||
return this.disconnecting | ||
} | ||
debug('publish :: qos', opts.qos) | ||
switch (opts.qos) { | ||
case 1: | ||
case 2: | ||
// Add to callbacks | ||
that.outgoing[packet.messageId] = { | ||
volatile: false, | ||
cb: callback || nop | ||
} | ||
debug('MqttClient:publish: packet cmd: %s', packet.cmd) | ||
that._sendPacket(packet, undefined, opts.cbStorePut) | ||
break | ||
default: | ||
debug('MqttClient:publish: packet cmd: %s', packet.cmd) | ||
that._sendPacket(packet, callback, opts.cbStorePut) | ||
break | ||
} | ||
return true | ||
} | ||
/** | ||
* publish - publish <message> to <topic> | ||
* | ||
* @param {String} topic - topic to publish to | ||
* @param {String, Buffer} message - message to publish | ||
* @param {Object} [opts] - publish options, includes: | ||
* {Number} qos - qos level to publish on | ||
* {Boolean} retain - whether or not to retain the message | ||
* {Boolean} dup - whether or not mark a message as duplicate | ||
* {Function} cbStorePut - function(){} called when message is put into `outgoingStore` | ||
* @param {Function} [callback] - function(err){} | ||
* called when publish succeeds or fails | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* | ||
* @example client.publish('topic', 'message'); | ||
* @example | ||
* client.publish('topic', 'message', {qos: 1, retain: true, dup: true}); | ||
* @example client.publish('topic', 'message', console.log); | ||
*/ | ||
publish(topic, message, opts, callback) { | ||
this.log('publish :: message `%s` to topic `%s`', message, topic) | ||
const { options } = this | ||
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !publishProc()) { | ||
this._storeProcessingQueue.push( | ||
{ | ||
invoke: publishProc, | ||
cbStorePut: opts.cbStorePut, | ||
callback: callback | ||
} | ||
) | ||
} | ||
return this | ||
} | ||
// .publish(topic, payload, cb); | ||
if (typeof opts === 'function') { | ||
callback = opts | ||
opts = null | ||
} | ||
/** | ||
* subscribe - subscribe to <topic> | ||
* | ||
* @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos} | ||
* @param {Object} [opts] - optional subscription options, includes: | ||
* {Number} qos - subscribe qos level | ||
* @param {Function} [callback] - function(err, granted){} where: | ||
* {Error} err - subscription error (none at the moment!) | ||
* {Array} granted - array of {topic: 't', qos: 0} | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* @example client.subscribe('topic'); | ||
* @example client.subscribe('topic', {qos: 1}); | ||
* @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log); | ||
* @example client.subscribe('topic', console.log); | ||
*/ | ||
MqttClient.prototype.subscribe = function () { | ||
const that = this | ||
const args = new Array(arguments.length) | ||
for (let i = 0; i < arguments.length; i++) { | ||
args[i] = arguments[i] | ||
} | ||
const subs = [] | ||
let obj = args.shift() | ||
const resubscribe = obj.resubscribe | ||
let callback = args.pop() || nop | ||
let opts = args.pop() | ||
const version = this.options.protocolVersion | ||
// default opts | ||
const defaultOpts = { qos: 0, retain: false, dup: false } | ||
opts = { ...defaultOpts, ...opts } | ||
delete obj.resubscribe | ||
if (this._checkDisconnecting(callback)) { | ||
return this | ||
} | ||
if (typeof obj === 'string') { | ||
obj = [obj] | ||
} | ||
const publishProc = () => { | ||
let messageId = 0 | ||
if (opts.qos === 1 || opts.qos === 2) { | ||
messageId = this._nextId() | ||
if (messageId === null) { | ||
this.log('No messageId left') | ||
return false | ||
} | ||
} | ||
const packet = { | ||
cmd: 'publish', | ||
topic, | ||
payload: message, | ||
qos: opts.qos, | ||
retain: opts.retain, | ||
messageId, | ||
dup: opts.dup, | ||
} | ||
if (typeof callback !== 'function') { | ||
opts = callback | ||
callback = nop | ||
} | ||
if (options.protocolVersion === 5) { | ||
packet.properties = opts.properties | ||
} | ||
const invalidTopic = validations.validateTopics(obj) | ||
if (invalidTopic !== null) { | ||
setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) | ||
return this | ||
} | ||
this.log('publish :: qos', opts.qos) | ||
switch (opts.qos) { | ||
case 1: | ||
case 2: | ||
// Add to callbacks | ||
this.outgoing[packet.messageId] = { | ||
volatile: false, | ||
cb: callback || this.noop, | ||
} | ||
this.log('MqttClient:publish: packet cmd: %s', packet.cmd) | ||
this._sendPacket(packet, undefined, opts.cbStorePut) | ||
break | ||
default: | ||
this.log('MqttClient:publish: packet cmd: %s', packet.cmd) | ||
this._sendPacket(packet, callback, opts.cbStorePut) | ||
break | ||
} | ||
return true | ||
} | ||
if (this._checkDisconnecting(callback)) { | ||
debug('subscribe: discconecting true') | ||
return this | ||
} | ||
if ( | ||
this._storeProcessing || | ||
this._storeProcessingQueue.length > 0 || | ||
!publishProc() | ||
) { | ||
this._storeProcessingQueue.push({ | ||
invoke: publishProc, | ||
cbStorePut: opts.cbStorePut, | ||
callback, | ||
}) | ||
} | ||
return this | ||
} | ||
const defaultOpts = { | ||
qos: 0 | ||
} | ||
if (version === 5) { | ||
defaultOpts.nl = false | ||
defaultOpts.rap = false | ||
defaultOpts.rh = 0 | ||
} | ||
opts = xtend(defaultOpts, opts) | ||
/** | ||
* subscribe - subscribe to <topic> | ||
* | ||
* @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos} | ||
* @param {Object} [opts] - optional subscription options, includes: | ||
* {Number} qos - subscribe qos level | ||
* @param {Function} [callback] - function(err, granted){} where: | ||
* {Error} err - subscription error (none at the moment!) | ||
* {Array} granted - array of {topic: 't', qos: 0} | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* @example client.subscribe('topic'); | ||
* @example client.subscribe('topic', {qos: 1}); | ||
* @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log); | ||
* @example client.subscribe('topic', console.log); | ||
*/ | ||
subscribe(...args) { | ||
const subs = [] | ||
let obj = args.shift() | ||
const { resubscribe } = obj | ||
let callback = args.pop() || this.noop | ||
let opts = args.pop() | ||
const version = this.options.protocolVersion | ||
if (Array.isArray(obj)) { | ||
obj.forEach(function (topic) { | ||
debug('subscribe: array topic %s', topic) | ||
if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, topic) || | ||
that._resubscribeTopics[topic].qos < opts.qos || | ||
resubscribe) { | ||
const currentOpts = { | ||
topic: topic, | ||
qos: opts.qos | ||
} | ||
if (version === 5) { | ||
currentOpts.nl = opts.nl | ||
currentOpts.rap = opts.rap | ||
currentOpts.rh = opts.rh | ||
currentOpts.properties = opts.properties | ||
} | ||
debug('subscribe: pushing topic `%s` and qos `%s` to subs list', currentOpts.topic, currentOpts.qos) | ||
subs.push(currentOpts) | ||
} | ||
}) | ||
} else { | ||
Object | ||
.keys(obj) | ||
.forEach(function (k) { | ||
debug('subscribe: object topic %s', k) | ||
if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, k) || | ||
that._resubscribeTopics[k].qos < obj[k].qos || | ||
resubscribe) { | ||
const currentOpts = { | ||
topic: k, | ||
qos: obj[k].qos | ||
} | ||
if (version === 5) { | ||
currentOpts.nl = obj[k].nl | ||
currentOpts.rap = obj[k].rap | ||
currentOpts.rh = obj[k].rh | ||
currentOpts.properties = opts.properties | ||
} | ||
debug('subscribe: pushing `%s` to subs list', currentOpts) | ||
subs.push(currentOpts) | ||
} | ||
}) | ||
} | ||
delete obj.resubscribe | ||
if (!subs.length) { | ||
callback(null, []) | ||
return this | ||
} | ||
if (typeof obj === 'string') { | ||
obj = [obj] | ||
} | ||
const subscribeProc = function () { | ||
const messageId = that._nextId() | ||
if (messageId === null) { | ||
debug('No messageId left') | ||
return false | ||
} | ||
if (typeof callback !== 'function') { | ||
opts = callback | ||
callback = this.noop | ||
} | ||
const packet = { | ||
cmd: 'subscribe', | ||
subscriptions: subs, | ||
qos: 1, | ||
retain: false, | ||
dup: false, | ||
messageId: messageId | ||
} | ||
const invalidTopic = validations.validateTopics(obj) | ||
if (invalidTopic !== null) { | ||
setImmediate(callback, new Error(`Invalid topic ${invalidTopic}`)) | ||
return this | ||
} | ||
if (opts.properties) { | ||
packet.properties = opts.properties | ||
} | ||
if (this._checkDisconnecting(callback)) { | ||
this.log('subscribe: discconecting true') | ||
return this | ||
} | ||
// subscriptions to resubscribe to in case of disconnect | ||
if (that.options.resubscribe) { | ||
debug('subscribe :: resubscribe true') | ||
const topics = [] | ||
subs.forEach(function (sub) { | ||
if (that.options.reconnectPeriod > 0) { | ||
const topic = { qos: sub.qos } | ||
if (version === 5) { | ||
topic.nl = sub.nl || false | ||
topic.rap = sub.rap || false | ||
topic.rh = sub.rh || 0 | ||
topic.properties = sub.properties | ||
} | ||
that._resubscribeTopics[sub.topic] = topic | ||
topics.push(sub.topic) | ||
} | ||
}) | ||
that.messageIdToTopic[packet.messageId] = topics | ||
} | ||
const defaultOpts = { | ||
qos: 0, | ||
} | ||
if (version === 5) { | ||
defaultOpts.nl = false | ||
defaultOpts.rap = false | ||
defaultOpts.rh = 0 | ||
} | ||
opts = { ...defaultOpts, ...opts } | ||
that.outgoing[packet.messageId] = { | ||
volatile: true, | ||
cb: function (err, packet) { | ||
if (!err) { | ||
const granted = packet.granted | ||
for (let i = 0; i < granted.length; i += 1) { | ||
subs[i].qos = granted[i] | ||
} | ||
} | ||
const parseSub = (topic, subOptions) => { | ||
// subOptions is defined only when providing a subs map, use opts otherwise | ||
subOptions = subOptions || opts | ||
if ( | ||
!Object.prototype.hasOwnProperty.call( | ||
this._resubscribeTopics, | ||
topic, | ||
) || | ||
this._resubscribeTopics[topic].qos < subOptions.qos || | ||
resubscribe | ||
) { | ||
const currentOpts = { | ||
topic, | ||
qos: subOptions.qos, | ||
} | ||
if (version === 5) { | ||
currentOpts.nl = subOptions.nl | ||
currentOpts.rap = subOptions.rap | ||
currentOpts.rh = subOptions.rh | ||
// use opts.properties | ||
currentOpts.properties = opts.properties | ||
} | ||
this.log( | ||
'subscribe: pushing topic `%s` and qos `%s` to subs list', | ||
currentOpts.topic, | ||
currentOpts.qos, | ||
) | ||
subs.push(currentOpts) | ||
} | ||
} | ||
callback(err, subs) | ||
} | ||
} | ||
debug('subscribe :: call _sendPacket') | ||
that._sendPacket(packet) | ||
return true | ||
} | ||
if (Array.isArray(obj)) { | ||
// array of topics | ||
obj.forEach((topic) => { | ||
this.log('subscribe: array topic %s', topic) | ||
parseSub(topic) | ||
}) | ||
} else { | ||
// object topic --> subOptions (no properties) | ||
Object.keys(obj).forEach((topic) => { | ||
this.log('subscribe: object topic %s, %o', topic, obj[topic]) | ||
parseSub(topic, obj[topic]) | ||
}) | ||
} | ||
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !subscribeProc()) { | ||
this._storeProcessingQueue.push( | ||
{ | ||
invoke: subscribeProc, | ||
callback: callback | ||
} | ||
) | ||
} | ||
if (!subs.length) { | ||
callback(null, []) | ||
return this | ||
} | ||
return this | ||
} | ||
const subscribeProc = () => { | ||
const messageId = this._nextId() | ||
if (messageId === null) { | ||
this.log('No messageId left') | ||
return false | ||
} | ||
/** | ||
* unsubscribe - unsubscribe from topic(s) | ||
* | ||
* @param {String, Array} topic - topics to unsubscribe from | ||
* @param {Object} [opts] - optional subscription options, includes: | ||
* {Object} properties - properties of unsubscribe packet | ||
* @param {Function} [callback] - callback fired on unsuback | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* @example client.unsubscribe('topic'); | ||
* @example client.unsubscribe('topic', console.log); | ||
*/ | ||
MqttClient.prototype.unsubscribe = function () { | ||
const that = this | ||
const args = new Array(arguments.length) | ||
for (let i = 0; i < arguments.length; i++) { | ||
args[i] = arguments[i] | ||
} | ||
let topic = args.shift() | ||
let callback = args.pop() || nop | ||
let opts = args.pop() | ||
if (typeof topic === 'string') { | ||
topic = [topic] | ||
} | ||
const packet = { | ||
cmd: 'subscribe', | ||
subscriptions: subs, | ||
qos: 1, | ||
retain: false, | ||
dup: false, | ||
messageId, | ||
} | ||
if (typeof callback !== 'function') { | ||
opts = callback | ||
callback = nop | ||
} | ||
if (opts.properties) { | ||
packet.properties = opts.properties | ||
} | ||
const invalidTopic = validations.validateTopics(topic) | ||
if (invalidTopic !== null) { | ||
setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) | ||
return this | ||
} | ||
// subscriptions to resubscribe to in case of disconnect | ||
if (this.options.resubscribe) { | ||
this.log('subscribe :: resubscribe true') | ||
const topics = [] | ||
subs.forEach((sub) => { | ||
if (this.options.reconnectPeriod > 0) { | ||
const topic = { qos: sub.qos } | ||
if (version === 5) { | ||
topic.nl = sub.nl || false | ||
topic.rap = sub.rap || false | ||
topic.rh = sub.rh || 0 | ||
topic.properties = sub.properties | ||
} | ||
this._resubscribeTopics[sub.topic] = topic | ||
topics.push(sub.topic) | ||
} | ||
}) | ||
this.messageIdToTopic[packet.messageId] = topics | ||
} | ||
if (that._checkDisconnecting(callback)) { | ||
return this | ||
} | ||
this.outgoing[packet.messageId] = { | ||
volatile: true, | ||
cb(err, packet2) { | ||
if (!err) { | ||
const { granted } = packet2 | ||
for (let i = 0; i < granted.length; i += 1) { | ||
subs[i].qos = granted[i] | ||
} | ||
} | ||
const unsubscribeProc = function () { | ||
const messageId = that._nextId() | ||
if (messageId === null) { | ||
debug('No messageId left') | ||
return false | ||
} | ||
const packet = { | ||
cmd: 'unsubscribe', | ||
qos: 1, | ||
messageId: messageId | ||
} | ||
callback(err, subs) | ||
}, | ||
} | ||
this.log('subscribe :: call _sendPacket') | ||
this._sendPacket(packet) | ||
return true | ||
} | ||
if (typeof topic === 'string') { | ||
packet.unsubscriptions = [topic] | ||
} else if (Array.isArray(topic)) { | ||
packet.unsubscriptions = topic | ||
} | ||
if ( | ||
this._storeProcessing || | ||
this._storeProcessingQueue.length > 0 || | ||
!subscribeProc() | ||
) { | ||
this._storeProcessingQueue.push({ | ||
invoke: subscribeProc, | ||
callback, | ||
}) | ||
} | ||
if (that.options.resubscribe) { | ||
packet.unsubscriptions.forEach(function (topic) { | ||
delete that._resubscribeTopics[topic] | ||
}) | ||
} | ||
return this | ||
} | ||
if (typeof opts === 'object' && opts.properties) { | ||
packet.properties = opts.properties | ||
} | ||
/** | ||
* unsubscribe - unsubscribe from topic(s) | ||
* | ||
* @param {String, Array} topic - topics to unsubscribe from | ||
* @param {Object} [opts] - optional subscription options, includes: | ||
* {Object} properties - properties of unsubscribe packet | ||
* @param {Function} [callback] - callback fired on unsuback | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* @example client.unsubscribe('topic'); | ||
* @example client.unsubscribe('topic', console.log); | ||
*/ | ||
unsubscribe(...args) { | ||
let topic = args.shift() | ||
let callback = args.pop() || this.noop | ||
let opts = args.pop() | ||
if (typeof topic === 'string') { | ||
topic = [topic] | ||
} | ||
that.outgoing[packet.messageId] = { | ||
volatile: true, | ||
cb: callback | ||
} | ||
if (typeof callback !== 'function') { | ||
opts = callback | ||
callback = this.noop | ||
} | ||
debug('unsubscribe: call _sendPacket') | ||
that._sendPacket(packet) | ||
const invalidTopic = validations.validateTopics(topic) | ||
if (invalidTopic !== null) { | ||
setImmediate(callback, new Error(`Invalid topic ${invalidTopic}`)) | ||
return this | ||
} | ||
return true | ||
} | ||
if (this._checkDisconnecting(callback)) { | ||
return this | ||
} | ||
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !unsubscribeProc()) { | ||
this._storeProcessingQueue.push( | ||
{ | ||
invoke: unsubscribeProc, | ||
callback: callback | ||
} | ||
) | ||
} | ||
const unsubscribeProc = () => { | ||
const messageId = this._nextId() | ||
if (messageId === null) { | ||
this.log('No messageId left') | ||
return false | ||
} | ||
const packet = { | ||
cmd: 'unsubscribe', | ||
qos: 1, | ||
messageId, | ||
} | ||
return this | ||
} | ||
if (typeof topic === 'string') { | ||
packet.unsubscriptions = [topic] | ||
} else if (Array.isArray(topic)) { | ||
packet.unsubscriptions = topic | ||
} | ||
/** | ||
* end - close connection | ||
* | ||
* @returns {MqttClient} this - for chaining | ||
* @param {Boolean} force - do not wait for all in-flight messages to be acked | ||
* @param {Object} opts - added to the disconnect packet | ||
* @param {Function} cb - called when the client has been closed | ||
* | ||
* @api public | ||
*/ | ||
MqttClient.prototype.end = function (force, opts, cb) { | ||
const that = this | ||
if (this.options.resubscribe) { | ||
packet.unsubscriptions.forEach((topic2) => { | ||
delete this._resubscribeTopics[topic2] | ||
}) | ||
} | ||
debug('end :: (%s)', this.options.clientId) | ||
if (typeof opts === 'object' && opts.properties) { | ||
packet.properties = opts.properties | ||
} | ||
if (force == null || typeof force !== 'boolean') { | ||
cb = opts || nop | ||
opts = force | ||
force = false | ||
if (typeof opts !== 'object') { | ||
cb = opts | ||
opts = null | ||
if (typeof cb !== 'function') { | ||
cb = nop | ||
} | ||
} | ||
} | ||
this.outgoing[packet.messageId] = { | ||
volatile: true, | ||
cb: callback, | ||
} | ||
if (typeof opts !== 'object') { | ||
cb = opts | ||
opts = null | ||
} | ||
this.log('unsubscribe: call _sendPacket') | ||
this._sendPacket(packet) | ||
debug('end :: cb? %s', !!cb) | ||
cb = cb || nop | ||
return true | ||
} | ||
function closeStores () { | ||
debug('end :: closeStores: closing incoming and outgoing stores') | ||
that.disconnected = true | ||
that.incomingStore.close(function (e1) { | ||
that.outgoingStore.close(function (e2) { | ||
debug('end :: closeStores: emitting end') | ||
that.emit('end') | ||
if (cb) { | ||
const err = e1 || e2 | ||
debug('end :: closeStores: invoking callback with args') | ||
cb(err) | ||
} | ||
}) | ||
}) | ||
if (that._deferredReconnect) { | ||
that._deferredReconnect() | ||
} | ||
} | ||
if ( | ||
this._storeProcessing || | ||
this._storeProcessingQueue.length > 0 || | ||
!unsubscribeProc() | ||
) { | ||
this._storeProcessingQueue.push({ | ||
invoke: unsubscribeProc, | ||
callback, | ||
}) | ||
} | ||
function finish () { | ||
// defer closesStores of an I/O cycle, | ||
// just to make sure things are | ||
// ok for websockets | ||
debug('end :: (%s) :: finish :: calling _cleanUp with force %s', that.options.clientId, force) | ||
that._cleanUp(force, () => { | ||
debug('end :: finish :: calling process.nextTick on closeStores') | ||
// const boundProcess = nextTick.bind(null, closeStores) | ||
nextTick(closeStores.bind(that)) | ||
}, opts) | ||
} | ||
return this | ||
} | ||
if (this.disconnecting) { | ||
cb() | ||
return this | ||
} | ||
/** | ||
* end - close connection | ||
* | ||
* @returns {MqttClient} this - for chaining | ||
* @param {Boolean} force - do not wait for all in-flight messages to be acked | ||
* @param {Object} opts - added to the disconnect packet | ||
* @param {Function} cb - called when the client has been closed | ||
* | ||
* @api public | ||
*/ | ||
end(force, opts, cb) { | ||
this.log('end :: (%s)', this.options.clientId) | ||
this._clearReconnect() | ||
if (force == null || typeof force !== 'boolean') { | ||
cb = opts || this.noop | ||
opts = force | ||
force = false | ||
if (typeof opts !== 'object') { | ||
cb = opts | ||
opts = null | ||
if (typeof cb !== 'function') { | ||
cb = this.noop | ||
} | ||
} | ||
} | ||
this.disconnecting = true | ||
if (typeof opts !== 'object') { | ||
cb = opts | ||
opts = null | ||
} | ||
if (!force && Object.keys(this.outgoing).length > 0) { | ||
// wait 10ms, just to be sure we received all of it | ||
debug('end :: (%s) :: calling finish in 10ms once outgoing is empty', that.options.clientId) | ||
this.once('outgoingEmpty', setTimeout.bind(null, finish, 10)) | ||
} else { | ||
debug('end :: (%s) :: immediately calling finish', that.options.clientId) | ||
finish() | ||
} | ||
this.log('end :: cb? %s', !!cb) | ||
cb = cb || this.noop | ||
return this | ||
} | ||
const closeStores = () => { | ||
this.log('end :: closeStores: closing incoming and outgoing stores') | ||
this.disconnected = true | ||
this.incomingStore.close((e1) => { | ||
this.outgoingStore.close((e2) => { | ||
this.log('end :: closeStores: emitting end') | ||
this.emit('end') | ||
if (cb) { | ||
const err = e1 || e2 | ||
this.log( | ||
'end :: closeStores: invoking callback with args', | ||
) | ||
cb(err) | ||
} | ||
}) | ||
}) | ||
if (this._deferredReconnect) { | ||
this._deferredReconnect() | ||
} | ||
} | ||
/** | ||
* removeOutgoingMessage - remove a message in outgoing store | ||
* the outgoing callback will be called withe Error('Message removed') if the message is removed | ||
* | ||
* @param {Number} messageId - messageId to remove message | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* | ||
* @example client.removeOutgoingMessage(client.getLastAllocated()); | ||
*/ | ||
MqttClient.prototype.removeOutgoingMessage = function (messageId) { | ||
const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null | ||
delete this.outgoing[messageId] | ||
this.outgoingStore.del({ messageId: messageId }, function () { | ||
cb(new Error('Message removed')) | ||
}) | ||
return this | ||
} | ||
const finish = () => { | ||
// defer closesStores of an I/O cycle, | ||
// just to make sure things are | ||
// ok for websockets | ||
this.log( | ||
'end :: (%s) :: finish :: calling _cleanUp with force %s', | ||
this.options.clientId, | ||
force, | ||
) | ||
this._cleanUp( | ||
force, | ||
() => { | ||
this.log( | ||
'end :: finish :: calling process.nextTick on closeStores', | ||
) | ||
// const boundProcess = nextTick.bind(null, closeStores) | ||
nextTick(closeStores) | ||
}, | ||
opts, | ||
) | ||
} | ||
/** | ||
* reconnect - connect again using the same options as connect() | ||
* | ||
* @param {Object} [opts] - optional reconnect options, includes: | ||
* {Store} incomingStore - a store for the incoming packets | ||
* {Store} outgoingStore - a store for the outgoing packets | ||
* if opts is not given, current stores are used | ||
* @returns {MqttClient} this - for chaining | ||
* | ||
* @api public | ||
*/ | ||
MqttClient.prototype.reconnect = function (opts) { | ||
debug('client reconnect') | ||
const that = this | ||
const f = function () { | ||
if (opts) { | ||
that.options.incomingStore = opts.incomingStore | ||
that.options.outgoingStore = opts.outgoingStore | ||
} else { | ||
that.options.incomingStore = null | ||
that.options.outgoingStore = null | ||
} | ||
that.incomingStore = that.options.incomingStore || new Store() | ||
that.outgoingStore = that.options.outgoingStore || new Store() | ||
that.disconnecting = false | ||
that.disconnected = false | ||
that._deferredReconnect = null | ||
that._reconnect() | ||
} | ||
if (this.disconnecting) { | ||
cb() | ||
return this | ||
} | ||
if (this.disconnecting && !this.disconnected) { | ||
this._deferredReconnect = f | ||
} else { | ||
f() | ||
} | ||
return this | ||
} | ||
this._clearReconnect() | ||
/** | ||
* _reconnect - implement reconnection | ||
* @api privateish | ||
*/ | ||
MqttClient.prototype._reconnect = function () { | ||
debug('_reconnect: emitting reconnect to client') | ||
this.emit('reconnect') | ||
if (this.connected) { | ||
this.end(() => { this._setupStream() }) | ||
debug('client already connected. disconnecting first.') | ||
} else { | ||
debug('_reconnect: calling _setupStream') | ||
this._setupStream() | ||
} | ||
} | ||
this.disconnecting = true | ||
/** | ||
* _setupReconnect - setup reconnect timer | ||
*/ | ||
MqttClient.prototype._setupReconnect = function () { | ||
const that = this | ||
if (!force && Object.keys(this.outgoing).length > 0) { | ||
// wait 10ms, just to be sure we received all of it | ||
this.log( | ||
'end :: (%s) :: calling finish in 10ms once outgoing is empty', | ||
this.options.clientId, | ||
) | ||
this.once('outgoingEmpty', setTimeout.bind(null, finish, 10)) | ||
} else { | ||
this.log( | ||
'end :: (%s) :: immediately calling finish', | ||
this.options.clientId, | ||
) | ||
finish() | ||
} | ||
if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) { | ||
if (!this.reconnecting) { | ||
debug('_setupReconnect :: emit `offline` state') | ||
this.emit('offline') | ||
debug('_setupReconnect :: set `reconnecting` to `true`') | ||
this.reconnecting = true | ||
} | ||
debug('_setupReconnect :: setting reconnectTimer for %d ms', that.options.reconnectPeriod) | ||
that.reconnectTimer = setInterval(function () { | ||
debug('reconnectTimer :: reconnect triggered!') | ||
that._reconnect() | ||
}, that.options.reconnectPeriod) | ||
} else { | ||
debug('_setupReconnect :: doing nothing...') | ||
} | ||
} | ||
return this | ||
} | ||
/** | ||
* _clearReconnect - clear the reconnect timer | ||
*/ | ||
MqttClient.prototype._clearReconnect = function () { | ||
debug('_clearReconnect : clearing reconnect timer') | ||
if (this.reconnectTimer) { | ||
clearInterval(this.reconnectTimer) | ||
this.reconnectTimer = null | ||
} | ||
} | ||
/** | ||
* removeOutgoingMessage - remove a message in outgoing store | ||
* the outgoing callback will be called withe Error('Message removed') if the message is removed | ||
* | ||
* @param {Number} messageId - messageId to remove message | ||
* @returns {MqttClient} this - for chaining | ||
* @api public | ||
* | ||
* @example client.removeOutgoingMessage(client.getLastAllocated()); | ||
*/ | ||
removeOutgoingMessage(messageId) { | ||
if (this.outgoing[messageId]) { | ||
const { cb } = this.outgoing[messageId] | ||
this._removeOutgoingAndStoreMessage(messageId, () => { | ||
cb(new Error('Message removed')) | ||
}) | ||
} | ||
return this | ||
} | ||
/** | ||
* _cleanUp - clean up on connection end | ||
* @api private | ||
*/ | ||
MqttClient.prototype._cleanUp = function (forced, done) { | ||
const opts = arguments[2] | ||
if (done) { | ||
debug('_cleanUp :: done callback provided for on stream close') | ||
this.stream.on('close', done) | ||
} | ||
/** | ||
* reconnect - connect again using the same options as connect() | ||
* | ||
* @param {Object} [opts] - optional reconnect options, includes: | ||
* {Store} incomingStore - a store for the incoming packets | ||
* {Store} outgoingStore - a store for the outgoing packets | ||
* if opts is not given, current stores are used | ||
* @returns {MqttClient} this - for chaining | ||
* | ||
* @api public | ||
*/ | ||
reconnect(opts) { | ||
this.log('client reconnect') | ||
const f = () => { | ||
if (opts) { | ||
this.options.incomingStore = opts.incomingStore | ||
this.options.outgoingStore = opts.outgoingStore | ||
} else { | ||
this.options.incomingStore = null | ||
this.options.outgoingStore = null | ||
} | ||
this.incomingStore = this.options.incomingStore || new Store() | ||
this.outgoingStore = this.options.outgoingStore || new Store() | ||
this.disconnecting = false | ||
this.disconnected = false | ||
this._deferredReconnect = null | ||
this._reconnect() | ||
} | ||
debug('_cleanUp :: forced? %s', forced) | ||
if (forced) { | ||
if ((this.options.reconnectPeriod === 0) && this.options.clean) { | ||
flush(this.outgoing) | ||
} | ||
debug('_cleanUp :: (%s) :: destroying stream', this.options.clientId) | ||
this.stream.destroy() | ||
} else { | ||
const packet = xtend({ cmd: 'disconnect' }, opts) | ||
debug('_cleanUp :: (%s) :: call _sendPacket with disconnect packet', this.options.clientId) | ||
this._sendPacket( | ||
packet, | ||
setImmediate.bind( | ||
null, | ||
this.stream.end.bind(this.stream) | ||
) | ||
) | ||
} | ||
if (this.disconnecting && !this.disconnected) { | ||
this._deferredReconnect = f | ||
} else { | ||
f() | ||
} | ||
return this | ||
} | ||
if (!this.disconnecting) { | ||
debug('_cleanUp :: client not disconnecting. Clearing and resetting reconnect.') | ||
this._clearReconnect() | ||
this._setupReconnect() | ||
} | ||
/** | ||
* _reconnect - implement reconnection | ||
* @api privateish | ||
*/ | ||
_reconnect() { | ||
this.log('_reconnect: emitting reconnect to client') | ||
this.emit('reconnect') | ||
if (this.connected) { | ||
this.end(() => { | ||
this.connect() | ||
}) | ||
this.log('client already connected. disconnecting first.') | ||
} else { | ||
this.log('_reconnect: calling connect') | ||
this.connect() | ||
} | ||
} | ||
if (this.pingTimer !== null) { | ||
debug('_cleanUp :: clearing pingTimer') | ||
this.pingTimer.clear() | ||
this.pingTimer = null | ||
} | ||
/** | ||
* _setupReconnect - setup reconnect timer | ||
*/ | ||
_setupReconnect() { | ||
if ( | ||
!this.disconnecting && | ||
!this.reconnectTimer && | ||
this.options.reconnectPeriod > 0 | ||
) { | ||
if (!this.reconnecting) { | ||
this.log('_setupReconnect :: emit `offline` state') | ||
this.emit('offline') | ||
this.log('_setupReconnect :: set `reconnecting` to `true`') | ||
this.reconnecting = true | ||
} | ||
this.log( | ||
'_setupReconnect :: setting reconnectTimer for %d ms', | ||
this.options.reconnectPeriod, | ||
) | ||
this.reconnectTimer = setInterval(() => { | ||
this.log('reconnectTimer :: reconnect triggered!') | ||
this._reconnect() | ||
}, this.options.reconnectPeriod) | ||
} else { | ||
this.log('_setupReconnect :: doing nothing...') | ||
} | ||
} | ||
if (done && !this.connected) { | ||
debug('_cleanUp :: (%s) :: removing stream `done` callback `close` listener', this.options.clientId) | ||
this.stream.removeListener('close', done) | ||
done() | ||
} | ||
} | ||
/** | ||
* _clearReconnect - clear the reconnect timer | ||
*/ | ||
_clearReconnect() { | ||
this.log('_clearReconnect : clearing reconnect timer') | ||
if (this.reconnectTimer) { | ||
clearInterval(this.reconnectTimer) | ||
this.reconnectTimer = null | ||
} | ||
} | ||
/** | ||
* _sendPacket - send or queue a packet | ||
* @param {Object} packet - packet options | ||
* @param {Function} cb - callback when the packet is sent | ||
* @param {Function} cbStorePut - called when message is put into outgoingStore | ||
* @param {Boolean} noStore - send without put to the store | ||
* @api private | ||
*/ | ||
MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut, noStore) { | ||
debug('_sendPacket :: (%s) :: start', this.options.clientId) | ||
cbStorePut = cbStorePut || nop | ||
cb = cb || nop | ||
/** | ||
* _cleanUp - clean up on connection end | ||
* @api private | ||
*/ | ||
_cleanUp(forced, done, opts = {}) { | ||
if (done) { | ||
this.log('_cleanUp :: done callback provided for on stream close') | ||
this.stream.on('close', done) | ||
} | ||
const err = applyTopicAlias(this, packet) | ||
if (err) { | ||
cb(err) | ||
return | ||
} | ||
this.log('_cleanUp :: forced? %s', forced) | ||
if (forced) { | ||
if (this.options.reconnectPeriod === 0 && this.options.clean) { | ||
this._flush(this.outgoing) | ||
} | ||
this.log( | ||
'_cleanUp :: (%s) :: destroying stream', | ||
this.options.clientId, | ||
) | ||
this.stream.destroy() | ||
} else { | ||
const packet = { cmd: 'disconnect', ...opts } | ||
this.log( | ||
'_cleanUp :: (%s) :: call _sendPacket with disconnect packet', | ||
this.options.clientId, | ||
) | ||
this._sendPacket(packet, () => { | ||
this.log( | ||
'_cleanUp :: (%s) :: destroying stream', | ||
this.options.clientId, | ||
) | ||
setImmediate(() => { | ||
this.stream.end(() => { | ||
this.log( | ||
'_cleanUp :: (%s) :: stream destroyed', | ||
this.options.clientId, | ||
) | ||
// once stream is closed the 'close' event will fire and that will | ||
// emit client `close` event and call `done` callback if done is provided | ||
}) | ||
}) | ||
}) | ||
} | ||
if (!this.connected) { | ||
// allow auth packets to be sent while authenticating with the broker (mqtt5 enhanced auth) | ||
if (packet.cmd === 'auth') { | ||
this._shiftPingInterval() | ||
sendPacket(this, packet, cb) | ||
return | ||
} | ||
if (!this.disconnecting) { | ||
this.log( | ||
'_cleanUp :: client not disconnecting. Clearing and resetting reconnect.', | ||
) | ||
this._clearReconnect() | ||
this._setupReconnect() | ||
} | ||
debug('_sendPacket :: client not connected. Storing packet offline.') | ||
this._storePacket(packet, cb, cbStorePut) | ||
return | ||
} | ||
if (this.pingTimer !== null) { | ||
this.log('_cleanUp :: clearing pingTimer') | ||
this.pingTimer.clear() | ||
this.pingTimer = null | ||
} | ||
// When sending a packet, reschedule the ping timer | ||
this._shiftPingInterval() | ||
if (done && !this.connected) { | ||
this.log( | ||
'_cleanUp :: (%s) :: removing stream `done` callback `close` listener', | ||
this.options.clientId, | ||
) | ||
this.stream.removeListener('close', done) | ||
done() | ||
} | ||
} | ||
// If "noStore" is true, the message is sent without being recorded in the store. | ||
// Messages that have not received puback or pubcomp remain in the store after disconnection | ||
// and are resent from the store upon reconnection. | ||
// For resend upon reconnection, "noStore" is set to true. This is because the message is already stored in the store. | ||
// This is to avoid interrupting other processes while recording to the store. | ||
if (noStore) { | ||
sendPacket(this, packet, cb) | ||
return | ||
} | ||
_storeAndSend(packet, cb, cbStorePut) { | ||
this.log( | ||
'storeAndSend :: store packet with cmd %s to outgoingStore', | ||
packet.cmd, | ||
) | ||
let storePacket = packet | ||
let err | ||
if (storePacket.cmd === 'publish') { | ||
// The original packet is for sending. | ||
// The cloned storePacket is for storing to resend on reconnect. | ||
// Topic Alias must not be used after disconnected. | ||
storePacket = clone(packet) | ||
err = this._removeTopicAliasAndRecoverTopicName(storePacket) | ||
if (err) { | ||
return cb && cb(err) | ||
} | ||
} | ||
this.outgoingStore.put(storePacket, (err2) => { | ||
if (err2) { | ||
return cb && cb(err2) | ||
} | ||
cbStorePut() | ||
this._writePacket(packet, cb) | ||
}) | ||
} | ||
switch (packet.cmd) { | ||
case 'publish': | ||
break | ||
case 'pubrel': | ||
storeAndSend(this, packet, cb, cbStorePut) | ||
return | ||
default: | ||
sendPacket(this, packet, cb) | ||
return | ||
} | ||
_applyTopicAlias(packet) { | ||
if (this.options.protocolVersion === 5) { | ||
if (packet.cmd === 'publish') { | ||
let alias | ||
if (packet.properties) { | ||
alias = packet.properties.topicAlias | ||
} | ||
const topic = packet.topic.toString() | ||
if (this.topicAliasSend) { | ||
if (alias) { | ||
if (topic.length !== 0) { | ||
// register topic alias | ||
this.log( | ||
'applyTopicAlias :: register topic: %s - alias: %d', | ||
topic, | ||
alias, | ||
) | ||
if (!this.topicAliasSend.put(topic, alias)) { | ||
this.log( | ||
'applyTopicAlias :: error out of range. topic: %s - alias: %d', | ||
topic, | ||
alias, | ||
) | ||
return new Error( | ||
'Sending Topic Alias out of range', | ||
) | ||
} | ||
} | ||
} else if (topic.length !== 0) { | ||
if (this.options.autoAssignTopicAlias) { | ||
alias = this.topicAliasSend.getAliasByTopic(topic) | ||
if (alias) { | ||
packet.topic = '' | ||
packet.properties = { | ||
...packet.properties, | ||
topicAlias: alias, | ||
} | ||
this.log( | ||
'applyTopicAlias :: auto assign(use) topic: %s - alias: %d', | ||
topic, | ||
alias, | ||
) | ||
} else { | ||
alias = this.topicAliasSend.getLruAlias() | ||
this.topicAliasSend.put(topic, alias) | ||
packet.properties = { | ||
...packet.properties, | ||
topicAlias: alias, | ||
} | ||
this.log( | ||
'applyTopicAlias :: auto assign topic: %s - alias: %d', | ||
topic, | ||
alias, | ||
) | ||
} | ||
} else if (this.options.autoUseTopicAlias) { | ||
alias = this.topicAliasSend.getAliasByTopic(topic) | ||
if (alias) { | ||
packet.topic = '' | ||
packet.properties = { | ||
...packet.properties, | ||
topicAlias: alias, | ||
} | ||
this.log( | ||
'applyTopicAlias :: auto use topic: %s - alias: %d', | ||
topic, | ||
alias, | ||
) | ||
} | ||
} | ||
} | ||
} else if (alias) { | ||
this.log( | ||
'applyTopicAlias :: error out of range. topic: %s - alias: %d', | ||
topic, | ||
alias, | ||
) | ||
return new Error('Sending Topic Alias out of range') | ||
} | ||
} | ||
} | ||
} | ||
switch (packet.qos) { | ||
case 2: | ||
case 1: | ||
storeAndSend(this, packet, cb, cbStorePut) | ||
break | ||
/** | ||
* no need of case here since it will be caught by default | ||
* and jshint comply that before default it must be a break | ||
* anyway it will result in -1 evaluation | ||
*/ | ||
case 0: | ||
/* falls through */ | ||
default: | ||
sendPacket(this, packet, cb) | ||
break | ||
} | ||
debug('_sendPacket :: (%s) :: end', this.options.clientId) | ||
} | ||
_noop(err) { | ||
this.log('noop ::', err) | ||
} | ||
/** | ||
* _storePacket - queue a packet | ||
* @param {Object} packet - packet options | ||
* @param {Function} cb - callback when the packet is sent | ||
* @param {Function} cbStorePut - called when message is put into outgoingStore | ||
* @api private | ||
*/ | ||
MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) { | ||
debug('_storePacket :: packet: %o', packet) | ||
debug('_storePacket :: cb? %s', !!cb) | ||
cbStorePut = cbStorePut || nop | ||
/** Writes the packet to stream and emits events */ | ||
_writePacket(packet, cb) { | ||
this.log('_writePacket :: packet: %O', packet) | ||
this.log('_writePacket :: emitting `packetsend`') | ||
let storePacket = packet | ||
if (storePacket.cmd === 'publish') { | ||
// The original packet is for sending. | ||
// The cloned storePacket is for storing to resend on reconnect. | ||
// Topic Alias must not be used after disconnected. | ||
storePacket = clone(packet) | ||
const err = removeTopicAliasAndRecoverTopicName(this, storePacket) | ||
if (err) { | ||
return cb && cb(err) | ||
} | ||
} | ||
// check that the packet is not a qos of 0, or that the command is not a publish | ||
if (((storePacket.qos || 0) === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') { | ||
this.queue.push({ packet: storePacket, cb: cb }) | ||
} else if (storePacket.qos > 0) { | ||
cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null | ||
this.outgoingStore.put(storePacket, function (err) { | ||
if (err) { | ||
return cb && cb(err) | ||
} | ||
cbStorePut() | ||
}) | ||
} else if (cb) { | ||
cb(new Error('No connection to broker')) | ||
} | ||
} | ||
this.emit('packetsend', packet) | ||
/** | ||
* _setupPingTimer - setup the ping timer | ||
* | ||
* @api private | ||
*/ | ||
MqttClient.prototype._setupPingTimer = function () { | ||
debug('_setupPingTimer :: keepalive %d (seconds)', this.options.keepalive) | ||
const that = this | ||
// When writing a packet, reschedule the ping timer | ||
this._shiftPingInterval() | ||
if (!this.pingTimer && this.options.keepalive) { | ||
this.pingResp = true | ||
this.pingTimer = reInterval(function () { | ||
that._checkPing() | ||
}, this.options.keepalive * 1000) | ||
} | ||
} | ||
this.log('_writePacket :: writing to stream') | ||
const result = mqttPacket.writeToStream( | ||
packet, | ||
this.stream, | ||
this.options, | ||
) | ||
this.log('_writePacket :: writeToStream result %s', result) | ||
if (!result && cb && cb !== this.noop) { | ||
this.log( | ||
'_writePacket :: handle events on `drain` once through callback.', | ||
) | ||
this.stream.once('drain', cb) | ||
} else if (cb) { | ||
this.log('_writePacket :: invoking cb') | ||
cb() | ||
} | ||
} | ||
/** | ||
* _shiftPingInterval - reschedule the ping interval | ||
* | ||
* @api private | ||
*/ | ||
MqttClient.prototype._shiftPingInterval = function () { | ||
if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) { | ||
this.pingTimer.reschedule(this.options.keepalive * 1000) | ||
} | ||
} | ||
/** | ||
* _checkPing - check if a pingresp has come back, and ping the server again | ||
* | ||
* @api private | ||
*/ | ||
MqttClient.prototype._checkPing = function () { | ||
debug('_checkPing :: checking ping...') | ||
if (this.pingResp) { | ||
debug('_checkPing :: ping response received. Clearing flag and sending `pingreq`') | ||
this.pingResp = false | ||
this._sendPacket({ cmd: 'pingreq' }) | ||
} else { | ||
// do a forced cleanup since socket will be in bad shape | ||
debug('_checkPing :: calling _cleanUp with force true') | ||
this._cleanUp(true) | ||
} | ||
} | ||
/** | ||
* _sendPacket - send or queue a packet | ||
* @param {Object} packet - packet options | ||
* @param {Function} cb - callback when the packet is sent | ||
* @param {Function} cbStorePut - called when message is put into outgoingStore | ||
* @param {Boolean} noStore - send without put to the store | ||
* @api private | ||
*/ | ||
_sendPacket(packet, cb, cbStorePut, noStore) { | ||
this.log('_sendPacket :: (%s) :: start', this.options.clientId) | ||
cbStorePut = cbStorePut || this.noop | ||
cb = cb || this.noop | ||
/** | ||
* _handlePingresp - handle a pingresp | ||
* | ||
* @api private | ||
*/ | ||
MqttClient.prototype._handlePingresp = function () { | ||
this.pingResp = true | ||
} | ||
const err = this._applyTopicAlias(packet) | ||
if (err) { | ||
cb(err) | ||
return | ||
} | ||
/** | ||
* _handleConnack | ||
* | ||
* @param {Object} packet | ||
* @api private | ||
*/ | ||
MqttClient.prototype._handleConnack = function (packet) { | ||
debug('_handleConnack') | ||
const options = this.options | ||
const version = options.protocolVersion | ||
const rc = version === 5 ? packet.reasonCode : packet.returnCode | ||
if (!this.connected) { | ||
// allow auth packets to be sent while authenticating with the broker (mqtt5 enhanced auth) | ||
if (packet.cmd === 'auth') { | ||
this._writePacket(this, packet, cb) | ||
return | ||
} | ||
clearTimeout(this.connackTimer) | ||
delete this.topicAliasSend | ||
this.log( | ||
'_sendPacket :: client not connected. Storing packet offline.', | ||
) | ||
this._storePacket(packet, cb, cbStorePut) | ||
return | ||
} | ||
if (packet.properties) { | ||
if (packet.properties.topicAliasMaximum) { | ||
if (packet.properties.topicAliasMaximum > 0xffff) { | ||
this.emit('error', new Error('topicAliasMaximum from broker is out of range')) | ||
return | ||
} | ||
if (packet.properties.topicAliasMaximum > 0) { | ||
this.topicAliasSend = new TopicAliasSend(packet.properties.topicAliasMaximum) | ||
} | ||
} | ||
if (packet.properties.serverKeepAlive && options.keepalive) { | ||
options.keepalive = packet.properties.serverKeepAlive | ||
this._shiftPingInterval() | ||
} | ||
if (packet.properties.maximumPacketSize) { | ||
if (!options.properties) { options.properties = {} } | ||
options.properties.maximumPacketSize = packet.properties.maximumPacketSize | ||
} | ||
} | ||
// If "noStore" is true, the message is sent without being recorded in the store. | ||
// Messages that have not received puback or pubcomp remain in the store after disconnection | ||
// and are resent from the store upon reconnection. | ||
// For resend upon reconnection, "noStore" is set to true. This is because the message is already stored in the store. | ||
// This is to avoid interrupting other processes while recording to the store. | ||
if (noStore) { | ||
this._writePacket(packet, cb) | ||
return | ||
} | ||
if (rc === 0) { | ||
this.reconnecting = false | ||
this._onConnect(packet) | ||
} else if (rc > 0) { | ||
const err = new Error('Connection refused: ' + errors[rc]) | ||
err.code = rc | ||
this.emit('error', err) | ||
} | ||
} | ||
switch (packet.cmd) { | ||
case 'publish': | ||
break | ||
case 'pubrel': | ||
this._storeAndSend(packet, cb, cbStorePut) | ||
return | ||
default: | ||
this._writePacket(packet, cb) | ||
return | ||
} | ||
MqttClient.prototype._handleAuth = function (packet) { | ||
const options = this.options | ||
const version = options.protocolVersion | ||
const rc = version === 5 ? packet.reasonCode : packet.returnCode | ||
switch (packet.qos) { | ||
case 2: | ||
case 1: | ||
this._storeAndSend(packet, cb, cbStorePut) | ||
break | ||
/** | ||
* no need of case here since it will be caught by default | ||
* and jshint comply that before default it must be a break | ||
* anyway it will result in -1 evaluation | ||
*/ | ||
case 0: | ||
/* falls through */ | ||
default: | ||
this._writePacket(packet, cb) | ||
break | ||
} | ||
this.log('_sendPacket :: (%s) :: end', this.options.clientId) | ||
} | ||
if (version !== 5) { | ||
const err = new Error('Protocol error: Auth packets are only supported in MQTT 5. Your version:' + version) | ||
err.code = rc | ||
this.emit('error', err) | ||
return | ||
} | ||
/** | ||
* _storePacket - queue a packet | ||
* @param {Object} packet - packet options | ||
* @param {Function} cb - callback when the packet is sent | ||
* @param {Function} cbStorePut - called when message is put into outgoingStore | ||
* @api private | ||
*/ | ||
_storePacket(packet, cb, cbStorePut) { | ||
this.log('_storePacket :: packet: %o', packet) | ||
this.log('_storePacket :: cb? %s', !!cb) | ||
cbStorePut = cbStorePut || this.noop | ||
const that = this | ||
this.handleAuth(packet, function (err, packet) { | ||
if (err) { | ||
that.emit('error', err) | ||
return | ||
} | ||
let storePacket = packet | ||
if (storePacket.cmd === 'publish') { | ||
// The original packet is for sending. | ||
// The cloned storePacket is for storing to resend on reconnect. | ||
// Topic Alias must not be used after disconnected. | ||
storePacket = clone(packet) | ||
const err = this._removeTopicAliasAndRecoverTopicName(storePacket) | ||
if (err) { | ||
return cb && cb(err) | ||
} | ||
} | ||
// check that the packet is not a qos of 0, or that the command is not a publish | ||
if ( | ||
((storePacket.qos || 0) === 0 && this.queueQoSZero) || | ||
storePacket.cmd !== 'publish' | ||
) { | ||
this.queue.push({ packet: storePacket, cb }) | ||
} else if (storePacket.qos > 0) { | ||
cb = this.outgoing[storePacket.messageId] | ||
? this.outgoing[storePacket.messageId].cb | ||
: null | ||
this.outgoingStore.put(storePacket, (err) => { | ||
if (err) { | ||
return cb && cb(err) | ||
} | ||
cbStorePut() | ||
}) | ||
} else if (cb) { | ||
cb(new Error('No connection to broker')) | ||
} | ||
} | ||
if (rc === 24) { | ||
that.reconnecting = false | ||
that._sendPacket(packet) | ||
} else { | ||
const error = new Error('Connection refused: ' + errors[rc]) | ||
err.code = rc | ||
that.emit('error', error) | ||
} | ||
}) | ||
} | ||
/** | ||
* _setupPingTimer - setup the ping timer | ||
* | ||
* @api private | ||
*/ | ||
_setupPingTimer() { | ||
this.log( | ||
'_setupPingTimer :: keepalive %d (seconds)', | ||
this.options.keepalive, | ||
) | ||
/** | ||
* @param packet the packet received by the broker | ||
* @return the auth packet to be returned to the broker | ||
* @api public | ||
*/ | ||
MqttClient.prototype.handleAuth = function (packet, callback) { | ||
callback() | ||
} | ||
if (!this.pingTimer && this.options.keepalive) { | ||
this.pingResp = true | ||
this.pingTimer = reInterval(() => { | ||
this._checkPing() | ||
}, this.options.keepalive * 1000) | ||
} | ||
} | ||
/** | ||
* _handlePublish | ||
* | ||
* @param {Object} packet | ||
* @api private | ||
*/ | ||
/* | ||
those late 2 case should be rewrite to comply with coding style: | ||
/** | ||
* _shiftPingInterval - reschedule the ping interval | ||
* | ||
* @api private | ||
*/ | ||
_shiftPingInterval() { | ||
if ( | ||
this.pingTimer && | ||
this.options.keepalive && | ||
this.options.reschedulePings | ||
) { | ||
this.pingTimer.reschedule(this.options.keepalive * 1000) | ||
} | ||
} | ||
case 1: | ||
case 0: | ||
// do not wait sending a puback | ||
// no callback passed | ||
if (1 === qos) { | ||
this._sendPacket({ | ||
cmd: 'puback', | ||
messageId: messageId | ||
}); | ||
} | ||
// emit the message event for both qos 1 and 0 | ||
this.emit('message', topic, message, packet); | ||
this.handleMessage(packet, done); | ||
break; | ||
default: | ||
// do nothing but every switch mus have a default | ||
// log or throw an error about unknown qos | ||
break; | ||
/** | ||
* _checkPing - check if a pingresp has come back, and ping the server again | ||
* | ||
* @api private | ||
*/ | ||
_checkPing() { | ||
this.log('_checkPing :: checking ping...') | ||
if (this.pingResp) { | ||
this.log( | ||
'_checkPing :: ping response received. Clearing flag and sending `pingreq`', | ||
) | ||
this.pingResp = false | ||
this._sendPacket({ cmd: 'pingreq' }) | ||
} else { | ||
// do a forced cleanup since socket will be in bad shape | ||
this.log('_checkPing :: calling _cleanUp with force true') | ||
this._cleanUp(true) | ||
} | ||
} | ||
for now i just suppressed the warnings | ||
*/ | ||
MqttClient.prototype._handlePublish = function (packet, done) { | ||
debug('_handlePublish: packet %o', packet) | ||
done = typeof done !== 'undefined' ? done : nop | ||
let topic = packet.topic.toString() | ||
const message = packet.payload | ||
const qos = packet.qos | ||
const messageId = packet.messageId | ||
const that = this | ||
const options = this.options | ||
const validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153] | ||
if (this.options.protocolVersion === 5) { | ||
let alias | ||
if (packet.properties) { | ||
alias = packet.properties.topicAlias | ||
} | ||
if (typeof alias !== 'undefined') { | ||
if (topic.length === 0) { | ||
if (alias > 0 && alias <= 0xffff) { | ||
const gotTopic = this.topicAliasRecv.getTopicByAlias(alias) | ||
if (gotTopic) { | ||
topic = gotTopic | ||
debug('_handlePublish :: topic complemented by alias. topic: %s - alias: %d', topic, alias) | ||
} else { | ||
debug('_handlePublish :: unregistered topic alias. alias: %d', alias) | ||
this.emit('error', new Error('Received unregistered Topic Alias')) | ||
return | ||
} | ||
} else { | ||
debug('_handlePublish :: topic alias out of range. alias: %d', alias) | ||
this.emit('error', new Error('Received Topic Alias is out of range')) | ||
return | ||
} | ||
} else { | ||
if (this.topicAliasRecv.put(topic, alias)) { | ||
debug('_handlePublish :: registered topic: %s - alias: %d', topic, alias) | ||
} else { | ||
debug('_handlePublish :: topic alias out of range. alias: %d', alias) | ||
this.emit('error', new Error('Received Topic Alias is out of range')) | ||
return | ||
} | ||
} | ||
} | ||
} | ||
debug('_handlePublish: qos %d', qos) | ||
switch (qos) { | ||
case 2: { | ||
options.customHandleAcks(topic, message, packet, function (error, code) { | ||
if (!(error instanceof Error)) { | ||
code = error | ||
error = null | ||
} | ||
if (error) { return that.emit('error', error) } | ||
if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) } | ||
if (code) { | ||
that._sendPacket({ cmd: 'pubrec', messageId: messageId, reasonCode: code }, done) | ||
} else { | ||
that.incomingStore.put(packet, function () { | ||
that._sendPacket({ cmd: 'pubrec', messageId: messageId }, done) | ||
}) | ||
} | ||
}) | ||
break | ||
} | ||
case 1: { | ||
// emit the message event | ||
options.customHandleAcks(topic, message, packet, function (error, code) { | ||
if (!(error instanceof Error)) { | ||
code = error | ||
error = null | ||
} | ||
if (error) { return that.emit('error', error) } | ||
if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) } | ||
if (!code) { that.emit('message', topic, message, packet) } | ||
that.handleMessage(packet, function (err) { | ||
if (err) { | ||
return done && done(err) | ||
} | ||
that._sendPacket({ cmd: 'puback', messageId: messageId, reasonCode: code }, done) | ||
}) | ||
}) | ||
break | ||
} | ||
case 0: | ||
// emit the message event | ||
this.emit('message', topic, message, packet) | ||
this.handleMessage(packet, done) | ||
break | ||
default: | ||
// do nothing | ||
debug('_handlePublish: unknown QoS. Doing nothing.') | ||
// log or throw an error about unknown qos | ||
break | ||
} | ||
} | ||
/** | ||
* @param packet the packet received by the broker | ||
* @return the auth packet to be returned to the broker | ||
* @api public | ||
*/ | ||
handleAuth(packet, callback) { | ||
callback() | ||
} | ||
/** | ||
* Handle messages with backpressure support, one at a time. | ||
* Override at will. | ||
* | ||
* @param Packet packet the packet | ||
* @param Function callback call when finished | ||
* @api public | ||
*/ | ||
MqttClient.prototype.handleMessage = function (packet, callback) { | ||
callback() | ||
} | ||
/** | ||
* Handle messages with backpressure support, one at a time. | ||
* Override at will. | ||
* | ||
* @param Packet packet the packet | ||
* @param Function callback call when finished | ||
* @api public | ||
*/ | ||
handleMessage(packet, callback) { | ||
callback() | ||
} | ||
/** | ||
* _handleAck | ||
* | ||
* @param {Object} packet | ||
* @api private | ||
*/ | ||
/** | ||
* _nextId | ||
* @return unsigned int | ||
*/ | ||
_nextId() { | ||
return this.messageIdProvider.allocate() | ||
} | ||
MqttClient.prototype._handleAck = function (packet) { | ||
/* eslint no-fallthrough: "off" */ | ||
const messageId = packet.messageId | ||
const type = packet.cmd | ||
let response = null | ||
const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null | ||
const that = this | ||
let err | ||
/** | ||
* getLastMessageId | ||
* @return unsigned int | ||
*/ | ||
getLastMessageId() { | ||
return this.messageIdProvider.getLastAllocated() | ||
} | ||
// Checking `!cb` happens to work, but it's not technically "correct". | ||
// | ||
// Why? This code assumes that "no callback" is the same as that "we're not | ||
// waiting for responses" (puback, pubrec, pubcomp, suback, or unsuback). | ||
// | ||
// It would be better to check `if (!this.outgoing[messageId])` here, but | ||
// there's no reason to change it and risk (another) regression. | ||
// | ||
// The only reason this code works is becaues code in MqttClient.publish, | ||
// MqttClinet.subscribe, and MqttClient.unsubscribe ensures that we will | ||
// have a callback even if the user doesn't pass one in.) | ||
if (!cb) { | ||
debug('_handleAck :: Server sent an ack in error. Ignoring.') | ||
// Server sent an ack in error, ignore it. | ||
return | ||
} | ||
/** | ||
* _resubscribe | ||
* @api private | ||
*/ | ||
_resubscribe() { | ||
this.log('_resubscribe') | ||
const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics) | ||
if ( | ||
!this._firstConnection && | ||
(this.options.clean || | ||
(this.options.protocolVersion === 5 && | ||
!this.connackPacket.sessionPresent)) && | ||
_resubscribeTopicsKeys.length > 0 | ||
) { | ||
if (this.options.resubscribe) { | ||
if (this.options.protocolVersion === 5) { | ||
this.log('_resubscribe: protocolVersion 5') | ||
for ( | ||
let topicI = 0; | ||
topicI < _resubscribeTopicsKeys.length; | ||
topicI++ | ||
) { | ||
const resubscribeTopic = {} | ||
resubscribeTopic[_resubscribeTopicsKeys[topicI]] = | ||
this._resubscribeTopics[ | ||
_resubscribeTopicsKeys[topicI] | ||
] | ||
resubscribeTopic.resubscribe = true | ||
this.subscribe(resubscribeTopic, { | ||
properties: | ||
resubscribeTopic[_resubscribeTopicsKeys[topicI]] | ||
.properties, | ||
}) | ||
} | ||
} else { | ||
this._resubscribeTopics.resubscribe = true | ||
this.subscribe(this._resubscribeTopics) | ||
} | ||
} else { | ||
this._resubscribeTopics = {} | ||
} | ||
} | ||
// Process | ||
debug('_handleAck :: packet type', type) | ||
switch (type) { | ||
case 'pubcomp': | ||
// same thing as puback for QoS 2 | ||
case 'puback': { | ||
const pubackRC = packet.reasonCode | ||
// Callback - we're done | ||
if (pubackRC && pubackRC > 0 && pubackRC !== 16) { | ||
err = new Error('Publish error: ' + errors[pubackRC]) | ||
err.code = pubackRC | ||
cb(err, packet) | ||
} | ||
delete this.outgoing[messageId] | ||
this.outgoingStore.del(packet, cb) | ||
this.messageIdProvider.deallocate(messageId) | ||
this._invokeStoreProcessingQueue() | ||
break | ||
} | ||
case 'pubrec': { | ||
response = { | ||
cmd: 'pubrel', | ||
qos: 2, | ||
messageId: messageId | ||
} | ||
const pubrecRC = packet.reasonCode | ||
this._firstConnection = false | ||
} | ||
if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { | ||
err = new Error('Publish error: ' + errors[pubrecRC]) | ||
err.code = pubrecRC | ||
cb(err, packet) | ||
} else { | ||
this._sendPacket(response) | ||
} | ||
break | ||
} | ||
case 'suback': { | ||
delete this.outgoing[messageId] | ||
this.messageIdProvider.deallocate(messageId) | ||
for (let grantedI = 0; grantedI < packet.granted.length; grantedI++) { | ||
if ((packet.granted[grantedI] & 0x80) !== 0) { | ||
// suback with Failure status | ||
const topics = this.messageIdToTopic[messageId] | ||
if (topics) { | ||
topics.forEach(function (topic) { | ||
delete that._resubscribeTopics[topic] | ||
}) | ||
} | ||
} | ||
} | ||
delete this.messageIdToTopic[messageId] | ||
this._invokeStoreProcessingQueue() | ||
cb(null, packet) | ||
break | ||
} | ||
case 'unsuback': { | ||
delete this.outgoing[messageId] | ||
this.messageIdProvider.deallocate(messageId) | ||
this._invokeStoreProcessingQueue() | ||
cb(null) | ||
break | ||
} | ||
default: | ||
that.emit('error', new Error('unrecognized packet type')) | ||
} | ||
/** | ||
* _onConnect | ||
* | ||
* @api private | ||
*/ | ||
_onConnect(packet) { | ||
if (this.disconnected) { | ||
this.emit('connect', packet) | ||
return | ||
} | ||
if (this.disconnecting && | ||
Object.keys(this.outgoing).length === 0) { | ||
this.emit('outgoingEmpty') | ||
} | ||
} | ||
this.connackPacket = packet | ||
this.messageIdProvider.clear() | ||
this._setupPingTimer() | ||
/** | ||
* _handlePubrel | ||
* | ||
* @param {Object} packet | ||
* @api private | ||
*/ | ||
MqttClient.prototype._handlePubrel = function (packet, callback) { | ||
debug('handling pubrel packet') | ||
callback = typeof callback !== 'undefined' ? callback : nop | ||
const messageId = packet.messageId | ||
const that = this | ||
this.connected = true | ||
const comp = { cmd: 'pubcomp', messageId: messageId } | ||
const startStreamProcess = () => { | ||
let outStore = this.outgoingStore.createStream() | ||
that.incomingStore.get(packet, function (err, pub) { | ||
if (!err) { | ||
that.emit('message', pub.topic, pub.payload, pub) | ||
that.handleMessage(pub, function (err) { | ||
if (err) { | ||
return callback(err) | ||
} | ||
that.incomingStore.del(pub, nop) | ||
that._sendPacket(comp, callback) | ||
}) | ||
} else { | ||
that._sendPacket(comp, callback) | ||
} | ||
}) | ||
} | ||
const remove = () => { | ||
outStore.destroy() | ||
outStore = null | ||
this._flushStoreProcessingQueue() | ||
clearStoreProcessing() | ||
} | ||
/** | ||
* _handleDisconnect | ||
* | ||
* @param {Object} packet | ||
* @api private | ||
*/ | ||
MqttClient.prototype._handleDisconnect = function (packet) { | ||
this.emit('disconnect', packet) | ||
} | ||
const clearStoreProcessing = () => { | ||
this._storeProcessing = false | ||
this._packetIdsDuringStoreProcessing = {} | ||
} | ||
/** | ||
* _nextId | ||
* @return unsigned int | ||
*/ | ||
MqttClient.prototype._nextId = function () { | ||
return this.messageIdProvider.allocate() | ||
} | ||
this.once('close', remove) | ||
outStore.on('error', (err) => { | ||
clearStoreProcessing() | ||
this._flushStoreProcessingQueue() | ||
this.removeListener('close', remove) | ||
this.emit('error', err) | ||
}) | ||
/** | ||
* getLastMessageId | ||
* @return unsigned int | ||
*/ | ||
MqttClient.prototype.getLastMessageId = function () { | ||
return this.messageIdProvider.getLastAllocated() | ||
} | ||
const storeDeliver = () => { | ||
// edge case, we wrapped this twice | ||
if (!outStore) { | ||
return | ||
} | ||
/** | ||
* _resubscribe | ||
* @api private | ||
*/ | ||
MqttClient.prototype._resubscribe = function () { | ||
debug('_resubscribe') | ||
const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics) | ||
if (!this._firstConnection && | ||
(this.options.clean || (this.options.protocolVersion === 5 && !this.connackPacket.sessionPresent)) && | ||
_resubscribeTopicsKeys.length > 0) { | ||
if (this.options.resubscribe) { | ||
if (this.options.protocolVersion === 5) { | ||
debug('_resubscribe: protocolVersion 5') | ||
for (let topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) { | ||
const resubscribeTopic = {} | ||
resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]] | ||
resubscribeTopic.resubscribe = true | ||
this.subscribe(resubscribeTopic, { properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties }) | ||
} | ||
} else { | ||
this._resubscribeTopics.resubscribe = true | ||
this.subscribe(this._resubscribeTopics) | ||
} | ||
} else { | ||
this._resubscribeTopics = {} | ||
} | ||
} | ||
const packet2 = outStore.read(1) | ||
this._firstConnection = false | ||
} | ||
let cb | ||
/** | ||
* _onConnect | ||
* | ||
* @api private | ||
*/ | ||
MqttClient.prototype._onConnect = function (packet) { | ||
if (this.disconnected) { | ||
this.emit('connect', packet) | ||
return | ||
} | ||
if (!packet2) { | ||
// read when data is available in the future | ||
outStore.once('readable', storeDeliver) | ||
return | ||
} | ||
const that = this | ||
this._storeProcessing = true | ||
this.connackPacket = packet | ||
this.messageIdProvider.clear() | ||
this._setupPingTimer() | ||
// Skip already processed store packets | ||
if (this._packetIdsDuringStoreProcessing[packet2.messageId]) { | ||
storeDeliver() | ||
return | ||
} | ||
this.connected = true | ||
// Avoid unnecessary stream read operations when disconnected | ||
if (!this.disconnecting && !this.reconnectTimer) { | ||
cb = this.outgoing[packet2.messageId] | ||
? this.outgoing[packet2.messageId].cb | ||
: null | ||
this.outgoing[packet2.messageId] = { | ||
volatile: false, | ||
cb(err, status) { | ||
// Ensure that the original callback passed in to publish gets invoked | ||
if (cb) { | ||
cb(err, status) | ||
} | ||
function startStreamProcess () { | ||
let outStore = that.outgoingStore.createStream() | ||
storeDeliver() | ||
}, | ||
} | ||
this._packetIdsDuringStoreProcessing[ | ||
packet2.messageId | ||
] = true | ||
if (this.messageIdProvider.register(packet2.messageId)) { | ||
this._sendPacket(packet2, undefined, undefined, true) | ||
} else { | ||
this.log( | ||
'messageId: %d has already used.', | ||
packet2.messageId, | ||
) | ||
} | ||
} else if (outStore.destroy) { | ||
outStore.destroy() | ||
} | ||
} | ||
function clearStoreProcessing () { | ||
that._storeProcessing = false | ||
that._packetIdsDuringStoreProcessing = {} | ||
} | ||
outStore.on('end', () => { | ||
let allProcessed = true | ||
for (const id in this._packetIdsDuringStoreProcessing) { | ||
if (!this._packetIdsDuringStoreProcessing[id]) { | ||
allProcessed = false | ||
break | ||
} | ||
} | ||
if (allProcessed) { | ||
clearStoreProcessing() | ||
this.removeListener('close', remove) | ||
this._invokeAllStoreProcessingQueue() | ||
this.emit('connect', packet) | ||
} else { | ||
startStreamProcess() | ||
} | ||
}) | ||
storeDeliver() | ||
} | ||
// start flowing | ||
startStreamProcess() | ||
} | ||
that.once('close', remove) | ||
outStore.on('error', function (err) { | ||
clearStoreProcessing() | ||
that._flushStoreProcessingQueue() | ||
that.removeListener('close', remove) | ||
that.emit('error', err) | ||
}) | ||
_invokeStoreProcessingQueue() { | ||
// If _storeProcessing is true, the message is resending. | ||
// During resend, processing is skipped to prevent new messages from interrupting. #1635 | ||
if (!this._storeProcessing && this._storeProcessingQueue.length > 0) { | ||
const f = this._storeProcessingQueue[0] | ||
if (f && f.invoke()) { | ||
this._storeProcessingQueue.shift() | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
function remove () { | ||
outStore.destroy() | ||
outStore = null | ||
that._flushStoreProcessingQueue() | ||
clearStoreProcessing() | ||
} | ||
_invokeAllStoreProcessingQueue() { | ||
while (this._invokeStoreProcessingQueue()) { | ||
/* empty */ | ||
} | ||
} | ||
function storeDeliver () { | ||
// edge case, we wrapped this twice | ||
if (!outStore) { | ||
return | ||
} | ||
_flushStoreProcessingQueue() { | ||
for (const f of this._storeProcessingQueue) { | ||
if (f.cbStorePut) f.cbStorePut(new Error('Connection closed')) | ||
if (f.callback) f.callback(new Error('Connection closed')) | ||
} | ||
this._storeProcessingQueue.splice(0) | ||
} | ||
const packet = outStore.read(1) | ||
let cb | ||
if (!packet) { | ||
// read when data is available in the future | ||
outStore.once('readable', storeDeliver) | ||
return | ||
} | ||
that._storeProcessing = true | ||
// Skip already processed store packets | ||
if (that._packetIdsDuringStoreProcessing[packet.messageId]) { | ||
storeDeliver() | ||
return | ||
} | ||
// Avoid unnecessary stream read operations when disconnected | ||
if (!that.disconnecting && !that.reconnectTimer) { | ||
cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null | ||
that.outgoing[packet.messageId] = { | ||
volatile: false, | ||
cb: function (err, status) { | ||
// Ensure that the original callback passed in to publish gets invoked | ||
if (cb) { | ||
cb(err, status) | ||
} | ||
storeDeliver() | ||
} | ||
} | ||
that._packetIdsDuringStoreProcessing[packet.messageId] = true | ||
if (that.messageIdProvider.register(packet.messageId)) { | ||
that._sendPacket(packet, undefined, undefined, true) | ||
} else { | ||
debug('messageId: %d has already used.', packet.messageId) | ||
} | ||
} else if (outStore.destroy) { | ||
outStore.destroy() | ||
} | ||
} | ||
outStore.on('end', function () { | ||
let allProcessed = true | ||
for (const id in that._packetIdsDuringStoreProcessing) { | ||
if (!that._packetIdsDuringStoreProcessing[id]) { | ||
allProcessed = false | ||
break | ||
} | ||
} | ||
if (allProcessed) { | ||
clearStoreProcessing() | ||
that.removeListener('close', remove) | ||
that._invokeAllStoreProcessingQueue() | ||
that.emit('connect', packet) | ||
} else { | ||
startStreamProcess() | ||
} | ||
}) | ||
storeDeliver() | ||
} | ||
// start flowing | ||
startStreamProcess() | ||
/** | ||
* _removeOutgoingAndStoreMessage | ||
* @param {Number} messageId - messageId to remove message | ||
* @param {Function} cb - called when the message removed | ||
* @api private | ||
*/ | ||
_removeOutgoingAndStoreMessage(messageId, cb) { | ||
const self = this | ||
delete this.outgoing[messageId] | ||
self.outgoingStore.del({ messageId }, (err, packet) => { | ||
cb(err, packet) | ||
self.messageIdProvider.deallocate(messageId) | ||
self._invokeStoreProcessingQueue() | ||
}) | ||
} | ||
} | ||
MqttClient.prototype._invokeStoreProcessingQueue = function () { | ||
if (this._storeProcessingQueue.length > 0) { | ||
const f = this._storeProcessingQueue[0] | ||
if (f && f.invoke()) { | ||
this._storeProcessingQueue.shift() | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
MqttClient.prototype._invokeAllStoreProcessingQueue = function () { | ||
while (this._invokeStoreProcessingQueue()) { /* empty */ } | ||
} | ||
MqttClient.prototype._flushStoreProcessingQueue = function () { | ||
for (const f of this._storeProcessingQueue) { | ||
if (f.cbStorePut) f.cbStorePut(new Error('Connection closed')) | ||
if (f.callback) f.callback(new Error('Connection closed')) | ||
} | ||
this._storeProcessingQueue.splice(0) | ||
} | ||
module.exports = MqttClient |
@@ -1,8 +0,5 @@ | ||
'use strict' | ||
const { Buffer } = require('buffer') | ||
const Transform = require('readable-stream').Transform | ||
const { Transform } = require('readable-stream') | ||
const duplexify = require('duplexify') | ||
/* global FileReader */ | ||
let my | ||
@@ -13,118 +10,118 @@ let proxy | ||
function buildProxy () { | ||
const proxy = new Transform() | ||
proxy._write = function (chunk, encoding, next) { | ||
my.sendSocketMessage({ | ||
data: chunk.buffer, | ||
success: function () { | ||
next() | ||
}, | ||
fail: function () { | ||
next(new Error()) | ||
} | ||
}) | ||
} | ||
proxy._flush = function socketEnd (done) { | ||
my.closeSocket({ | ||
success: function () { | ||
done() | ||
} | ||
}) | ||
} | ||
function buildProxy() { | ||
const _proxy = new Transform() | ||
_proxy._write = (chunk, encoding, next) => { | ||
my.sendSocketMessage({ | ||
data: chunk.buffer, | ||
success() { | ||
next() | ||
}, | ||
fail() { | ||
next(new Error()) | ||
}, | ||
}) | ||
} | ||
_proxy._flush = (done) => { | ||
my.closeSocket({ | ||
success() { | ||
done() | ||
}, | ||
}) | ||
} | ||
return proxy | ||
return _proxy | ||
} | ||
function setDefaultOpts (opts) { | ||
if (!opts.hostname) { | ||
opts.hostname = 'localhost' | ||
} | ||
if (!opts.path) { | ||
opts.path = '/' | ||
} | ||
function setDefaultOpts(opts) { | ||
if (!opts.hostname) { | ||
opts.hostname = 'localhost' | ||
} | ||
if (!opts.path) { | ||
opts.path = '/' | ||
} | ||
if (!opts.wsOptions) { | ||
opts.wsOptions = {} | ||
} | ||
if (!opts.wsOptions) { | ||
opts.wsOptions = {} | ||
} | ||
} | ||
function buildUrl (opts, client) { | ||
const protocol = opts.protocol === 'alis' ? 'wss' : 'ws' | ||
let url = protocol + '://' + opts.hostname + opts.path | ||
if (opts.port && opts.port !== 80 && opts.port !== 443) { | ||
url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path | ||
} | ||
if (typeof (opts.transformWsUrl) === 'function') { | ||
url = opts.transformWsUrl(url, opts, client) | ||
} | ||
return url | ||
function buildUrl(opts, client) { | ||
const protocol = opts.protocol === 'alis' ? 'wss' : 'ws' | ||
let url = `${protocol}://${opts.hostname}${opts.path}` | ||
if (opts.port && opts.port !== 80 && opts.port !== 443) { | ||
url = `${protocol}://${opts.hostname}:${opts.port}${opts.path}` | ||
} | ||
if (typeof opts.transformWsUrl === 'function') { | ||
url = opts.transformWsUrl(url, opts, client) | ||
} | ||
return url | ||
} | ||
function bindEventHandler () { | ||
if (isInitialized) return | ||
function bindEventHandler() { | ||
if (isInitialized) return | ||
isInitialized = true | ||
isInitialized = true | ||
my.onSocketOpen(function () { | ||
stream.setReadable(proxy) | ||
stream.setWritable(proxy) | ||
stream.emit('connect') | ||
}) | ||
my.onSocketOpen(() => { | ||
stream.setReadable(proxy) | ||
stream.setWritable(proxy) | ||
stream.emit('connect') | ||
}) | ||
my.onSocketMessage(function (res) { | ||
if (typeof res.data === 'string') { | ||
const buffer = Buffer.from(res.data, 'base64') | ||
proxy.push(buffer) | ||
} else { | ||
const reader = new FileReader() | ||
reader.addEventListener('load', function () { | ||
let data = reader.result | ||
my.onSocketMessage((res) => { | ||
if (typeof res.data === 'string') { | ||
const buffer = Buffer.from(res.data, 'base64') | ||
proxy.push(buffer) | ||
} else { | ||
const reader = new FileReader() | ||
reader.addEventListener('load', () => { | ||
let data = reader.result | ||
if (data instanceof ArrayBuffer) data = Buffer.from(data) | ||
else data = Buffer.from(data, 'utf8') | ||
proxy.push(data) | ||
}) | ||
reader.readAsArrayBuffer(res.data) | ||
} | ||
}) | ||
if (data instanceof ArrayBuffer) data = Buffer.from(data) | ||
else data = Buffer.from(data, 'utf8') | ||
proxy.push(data) | ||
}) | ||
reader.readAsArrayBuffer(res.data) | ||
} | ||
}) | ||
my.onSocketClose(function () { | ||
stream.end() | ||
stream.destroy() | ||
}) | ||
my.onSocketClose(() => { | ||
stream.end() | ||
stream.destroy() | ||
}) | ||
my.onSocketError(function (res) { | ||
stream.destroy(res) | ||
}) | ||
my.onSocketError((res) => { | ||
stream.destroy(res) | ||
}) | ||
} | ||
function buildStream (client, opts) { | ||
opts.hostname = opts.hostname || opts.host | ||
function buildStream(client, opts) { | ||
opts.hostname = opts.hostname || opts.host | ||
if (!opts.hostname) { | ||
throw new Error('Could not determine host. Specify host manually.') | ||
} | ||
if (!opts.hostname) { | ||
throw new Error('Could not determine host. Specify host manually.') | ||
} | ||
const websocketSubProtocol = | ||
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
const websocketSubProtocol = | ||
opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3 | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
setDefaultOpts(opts) | ||
setDefaultOpts(opts) | ||
const url = buildUrl(opts, client) | ||
my = opts.my | ||
my.connectSocket({ | ||
url: url, | ||
protocols: websocketSubProtocol | ||
}) | ||
const url = buildUrl(opts, client) | ||
my = opts.my | ||
my.connectSocket({ | ||
url, | ||
protocols: websocketSubProtocol, | ||
}) | ||
proxy = buildProxy() | ||
stream = duplexify.obj() | ||
proxy = buildProxy() | ||
stream = duplexify.obj() | ||
bindEventHandler() | ||
bindEventHandler() | ||
return stream | ||
return stream | ||
} | ||
module.exports = buildStream |
@@ -1,3 +0,2 @@ | ||
'use strict' | ||
const url = require('url') | ||
const MqttClient = require('../client') | ||
@@ -7,5 +6,3 @@ const Store = require('../store') | ||
const UniqueMessageIdProvider = require('../unique-message-id-provider') | ||
const IS_BROWSER = require('../is-browser').IS_BROWSER | ||
const url = require('url') | ||
const xtend = require('xtend') | ||
const { IS_BROWSER } = require('../is-browser') | ||
const debug = require('debug')('mqttjs') | ||
@@ -16,13 +13,13 @@ | ||
if (!IS_BROWSER) { | ||
protocols.mqtt = require('./tcp') | ||
protocols.tcp = require('./tcp') | ||
protocols.ssl = require('./tls') | ||
protocols.tls = require('./tls') | ||
protocols.mqtts = require('./tls') | ||
protocols.mqtt = require('./tcp') | ||
protocols.tcp = require('./tcp') | ||
protocols.ssl = require('./tls') | ||
protocols.tls = require('./tls') | ||
protocols.mqtts = require('./tls') | ||
} else { | ||
protocols.wx = require('./wx') | ||
protocols.wxs = require('./wx') | ||
protocols.wx = require('./wx') | ||
protocols.wxs = require('./wx') | ||
protocols.ali = require('./ali') | ||
protocols.alis = require('./ali') | ||
protocols.ali = require('./ali') | ||
protocols.alis = require('./ali') | ||
} | ||
@@ -38,13 +35,13 @@ | ||
*/ | ||
function parseAuthOptions (opts) { | ||
let matches | ||
if (opts.auth) { | ||
matches = opts.auth.match(/^(.+):(.+)$/) | ||
if (matches) { | ||
opts.username = matches[1] | ||
opts.password = matches[2] | ||
} else { | ||
opts.username = opts.auth | ||
} | ||
} | ||
function parseAuthOptions(opts) { | ||
let matches | ||
if (opts.auth) { | ||
matches = opts.auth.match(/^(.+):(.+)$/) | ||
if (matches) { | ||
opts.username = matches[1] | ||
opts.password = matches[2] | ||
} else { | ||
opts.username = opts.auth | ||
} | ||
} | ||
} | ||
@@ -58,109 +55,118 @@ | ||
*/ | ||
function connect (brokerUrl, opts) { | ||
debug('connecting to an MQTT broker...') | ||
if ((typeof brokerUrl === 'object') && !opts) { | ||
opts = brokerUrl | ||
brokerUrl = null | ||
} | ||
function connect(brokerUrl, opts) { | ||
debug('connecting to an MQTT broker...') | ||
if (typeof brokerUrl === 'object' && !opts) { | ||
opts = brokerUrl | ||
brokerUrl = null | ||
} | ||
opts = opts || {} | ||
opts = opts || {} | ||
if (brokerUrl) { | ||
// eslint-disable-next-line | ||
if (brokerUrl) { | ||
// eslint-disable-next-line | ||
const parsed = url.parse(brokerUrl, true) | ||
if (parsed.port != null) { | ||
parsed.port = Number(parsed.port) | ||
} | ||
if (parsed.port != null) { | ||
parsed.port = Number(parsed.port) | ||
} | ||
opts = xtend(parsed, opts) | ||
opts = { ...parsed, ...opts } | ||
if (opts.protocol === null) { | ||
throw new Error('Missing protocol') | ||
} | ||
if (opts.protocol === null) { | ||
throw new Error('Missing protocol') | ||
} | ||
opts.protocol = opts.protocol.replace(/:$/, '') | ||
} | ||
opts.protocol = opts.protocol.replace(/:$/, '') | ||
} | ||
// merge in the auth options if supplied | ||
parseAuthOptions(opts) | ||
// merge in the auth options if supplied | ||
parseAuthOptions(opts) | ||
// support clientId passed in the query string of the url | ||
if (opts.query && typeof opts.query.clientId === 'string') { | ||
opts.clientId = opts.query.clientId | ||
} | ||
// support clientId passed in the query string of the url | ||
if (opts.query && typeof opts.query.clientId === 'string') { | ||
opts.clientId = opts.query.clientId | ||
} | ||
if (opts.cert && opts.key) { | ||
if (opts.protocol) { | ||
if (['mqtts', 'wss', 'wxs', 'alis'].indexOf(opts.protocol) === -1) { | ||
switch (opts.protocol) { | ||
case 'mqtt': | ||
opts.protocol = 'mqtts' | ||
break | ||
case 'ws': | ||
opts.protocol = 'wss' | ||
break | ||
case 'wx': | ||
opts.protocol = 'wxs' | ||
break | ||
case 'ali': | ||
opts.protocol = 'alis' | ||
break | ||
default: | ||
throw new Error('Unknown protocol for secure connection: "' + opts.protocol + '"!') | ||
} | ||
} | ||
} else { | ||
// A cert and key was provided, however no protocol was specified, so we will throw an error. | ||
throw new Error('Missing secure protocol key') | ||
} | ||
} | ||
if (opts.cert && opts.key) { | ||
if (opts.protocol) { | ||
if (['mqtts', 'wss', 'wxs', 'alis'].indexOf(opts.protocol) === -1) { | ||
switch (opts.protocol) { | ||
case 'mqtt': | ||
opts.protocol = 'mqtts' | ||
break | ||
case 'ws': | ||
opts.protocol = 'wss' | ||
break | ||
case 'wx': | ||
opts.protocol = 'wxs' | ||
break | ||
case 'ali': | ||
opts.protocol = 'alis' | ||
break | ||
default: | ||
throw new Error( | ||
`Unknown protocol for secure connection: "${opts.protocol}"!`, | ||
) | ||
} | ||
} | ||
} else { | ||
// A cert and key was provided, however no protocol was specified, so we will throw an error. | ||
throw new Error('Missing secure protocol key') | ||
} | ||
} | ||
if (!protocols[opts.protocol]) { | ||
const isSecure = ['mqtts', 'wss'].indexOf(opts.protocol) !== -1 | ||
opts.protocol = [ | ||
'mqtt', | ||
'mqtts', | ||
'ws', | ||
'wss', | ||
'wx', | ||
'wxs', | ||
'ali', | ||
'alis' | ||
].filter(function (key, index) { | ||
if (isSecure && index % 2 === 0) { | ||
// Skip insecure protocols when requesting a secure one. | ||
return false | ||
} | ||
return (typeof protocols[key] === 'function') | ||
})[0] | ||
} | ||
if (!protocols[opts.protocol]) { | ||
const isSecure = ['mqtts', 'wss'].indexOf(opts.protocol) !== -1 | ||
opts.protocol = [ | ||
'mqtt', | ||
'mqtts', | ||
'ws', | ||
'wss', | ||
'wx', | ||
'wxs', | ||
'ali', | ||
'alis', | ||
].filter((key, index) => { | ||
if (isSecure && index % 2 === 0) { | ||
// Skip insecure protocols when requesting a secure one. | ||
return false | ||
} | ||
return typeof protocols[key] === 'function' | ||
})[0] | ||
} | ||
if (opts.clean === false && !opts.clientId) { | ||
throw new Error('Missing clientId for unclean clients') | ||
} | ||
if (opts.clean === false && !opts.clientId) { | ||
throw new Error('Missing clientId for unclean clients') | ||
} | ||
if (opts.protocol) { | ||
opts.defaultProtocol = opts.protocol | ||
} | ||
if (opts.protocol) { | ||
opts.defaultProtocol = opts.protocol | ||
} | ||
function wrapper (client) { | ||
if (opts.servers) { | ||
if (!client._reconnectCount || client._reconnectCount === opts.servers.length) { | ||
client._reconnectCount = 0 | ||
} | ||
function wrapper(client) { | ||
if (opts.servers) { | ||
if ( | ||
!client._reconnectCount || | ||
client._reconnectCount === opts.servers.length | ||
) { | ||
client._reconnectCount = 0 | ||
} | ||
opts.host = opts.servers[client._reconnectCount].host | ||
opts.port = opts.servers[client._reconnectCount].port | ||
opts.protocol = (!opts.servers[client._reconnectCount].protocol ? opts.defaultProtocol : opts.servers[client._reconnectCount].protocol) | ||
opts.hostname = opts.host | ||
opts.host = opts.servers[client._reconnectCount].host | ||
opts.port = opts.servers[client._reconnectCount].port | ||
opts.protocol = !opts.servers[client._reconnectCount].protocol | ||
? opts.defaultProtocol | ||
: opts.servers[client._reconnectCount].protocol | ||
opts.hostname = opts.host | ||
client._reconnectCount++ | ||
} | ||
client._reconnectCount++ | ||
} | ||
debug('calling streambuilder for', opts.protocol) | ||
return protocols[opts.protocol](client, opts) | ||
} | ||
const client = new MqttClient(wrapper, opts) | ||
client.on('error', function () { /* Automatically set up client error handling */ }) | ||
return client | ||
debug('calling streambuilder for', opts.protocol) | ||
return protocols[opts.protocol](client, opts) | ||
} | ||
const client = new MqttClient(wrapper, opts) | ||
client.on('error', () => { | ||
/* Automatically set up client error handling */ | ||
}) | ||
return client | ||
} | ||
@@ -167,0 +173,0 @@ |
@@ -1,2 +0,1 @@ | ||
'use strict' | ||
const net = require('net') | ||
@@ -9,13 +8,13 @@ const debug = require('debug')('mqttjs:tcp') | ||
*/ | ||
function streamBuilder (client, opts) { | ||
opts.port = opts.port || 1883 | ||
opts.hostname = opts.hostname || opts.host || 'localhost' | ||
function streamBuilder(client, opts) { | ||
opts.port = opts.port || 1883 | ||
opts.hostname = opts.hostname || opts.host || 'localhost' | ||
const port = opts.port | ||
const host = opts.hostname | ||
const { port } = opts | ||
const host = opts.hostname | ||
debug('port %d and host %s', port, host) | ||
return net.createConnection(port, host) | ||
debug('port %d and host %s', port, host) | ||
return net.createConnection(port, host) | ||
} | ||
module.exports = streamBuilder |
@@ -1,2 +0,1 @@ | ||
'use strict' | ||
const tls = require('tls') | ||
@@ -6,44 +5,49 @@ const net = require('net') | ||
function buildBuilder (mqttClient, opts) { | ||
opts.port = opts.port || 8883 | ||
opts.host = opts.hostname || opts.host || 'localhost' | ||
function buildBuilder(mqttClient, opts) { | ||
opts.port = opts.port || 8883 | ||
opts.host = opts.hostname || opts.host || 'localhost' | ||
if (net.isIP(opts.host) === 0) { | ||
opts.servername = opts.host | ||
} | ||
if (net.isIP(opts.host) === 0) { | ||
opts.servername = opts.host | ||
} | ||
opts.rejectUnauthorized = opts.rejectUnauthorized !== false | ||
opts.rejectUnauthorized = opts.rejectUnauthorized !== false | ||
delete opts.path | ||
delete opts.path | ||
debug('port %d host %s rejectUnauthorized %b', opts.port, opts.host, opts.rejectUnauthorized) | ||
debug( | ||
'port %d host %s rejectUnauthorized %b', | ||
opts.port, | ||
opts.host, | ||
opts.rejectUnauthorized, | ||
) | ||
const connection = tls.connect(opts) | ||
/* eslint no-use-before-define: [2, "nofunc"] */ | ||
connection.on('secureConnect', function () { | ||
if (opts.rejectUnauthorized && !connection.authorized) { | ||
connection.emit('error', new Error('TLS not authorized')) | ||
} else { | ||
connection.removeListener('error', handleTLSerrors) | ||
} | ||
}) | ||
const connection = tls.connect(opts) | ||
/* eslint no-use-before-define: [2, "nofunc"] */ | ||
connection.on('secureConnect', () => { | ||
if (opts.rejectUnauthorized && !connection.authorized) { | ||
connection.emit('error', new Error('TLS not authorized')) | ||
} else { | ||
connection.removeListener('error', handleTLSerrors) | ||
} | ||
}) | ||
function handleTLSerrors (err) { | ||
// How can I get verify this error is a tls error? | ||
if (opts.rejectUnauthorized) { | ||
mqttClient.emit('error', err) | ||
} | ||
function handleTLSerrors(err) { | ||
// How can I get verify this error is a tls error? | ||
if (opts.rejectUnauthorized) { | ||
mqttClient.emit('error', err) | ||
} | ||
// close this connection to match the behaviour of net | ||
// otherwise all we get is an error from the connection | ||
// and close event doesn't fire. This is a work around | ||
// to enable the reconnect code to work the same as with | ||
// net.createConnection | ||
connection.end() | ||
} | ||
// close this connection to match the behaviour of net | ||
// otherwise all we get is an error from the connection | ||
// and close event doesn't fire. This is a work around | ||
// to enable the reconnect code to work the same as with | ||
// net.createConnection | ||
connection.end() | ||
} | ||
connection.on('error', handleTLSerrors) | ||
return connection | ||
connection.on('error', handleTLSerrors) | ||
return connection | ||
} | ||
module.exports = buildBuilder |
@@ -1,3 +0,1 @@ | ||
'use strict' | ||
const { Buffer } = require('buffer') | ||
@@ -7,252 +5,262 @@ const WS = require('ws') | ||
const duplexify = require('duplexify') | ||
const Transform = require('readable-stream').Transform | ||
const IS_BROWSER = require('../is-browser').IS_BROWSER | ||
const { Transform } = require('readable-stream') | ||
const { IS_BROWSER } = require('../is-browser') | ||
const WSS_OPTIONS = [ | ||
'rejectUnauthorized', | ||
'ca', | ||
'cert', | ||
'key', | ||
'pfx', | ||
'passphrase' | ||
'rejectUnauthorized', | ||
'ca', | ||
'cert', | ||
'key', | ||
'pfx', | ||
'passphrase', | ||
] | ||
function buildUrl (opts, client) { | ||
let url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path | ||
if (typeof (opts.transformWsUrl) === 'function') { | ||
url = opts.transformWsUrl(url, opts, client) | ||
} | ||
return url | ||
function buildUrl(opts, client) { | ||
let url = `${opts.protocol}://${opts.hostname}:${opts.port}${opts.path}` | ||
if (typeof opts.transformWsUrl === 'function') { | ||
url = opts.transformWsUrl(url, opts, client) | ||
} | ||
return url | ||
} | ||
function setDefaultOpts (opts) { | ||
const options = opts | ||
if (!opts.hostname) { | ||
options.hostname = 'localhost' | ||
} | ||
if (!opts.port) { | ||
if (opts.protocol === 'wss') { | ||
options.port = 443 | ||
} else { | ||
options.port = 80 | ||
} | ||
} | ||
if (!opts.path) { | ||
options.path = '/' | ||
} | ||
function setDefaultOpts(opts) { | ||
const options = opts | ||
if (!opts.hostname) { | ||
options.hostname = 'localhost' | ||
} | ||
if (!opts.port) { | ||
if (opts.protocol === 'wss') { | ||
options.port = 443 | ||
} else { | ||
options.port = 80 | ||
} | ||
} | ||
if (!opts.path) { | ||
options.path = '/' | ||
} | ||
if (!opts.wsOptions) { | ||
options.wsOptions = {} | ||
} | ||
if (!IS_BROWSER && opts.protocol === 'wss') { | ||
// Add cert/key/ca etc options | ||
WSS_OPTIONS.forEach(function (prop) { | ||
if (Object.prototype.hasOwnProperty.call(opts, prop) && !Object.prototype.hasOwnProperty.call(opts.wsOptions, prop)) { | ||
options.wsOptions[prop] = opts[prop] | ||
} | ||
}) | ||
} | ||
if (!opts.wsOptions) { | ||
options.wsOptions = {} | ||
} | ||
if (!IS_BROWSER && opts.protocol === 'wss') { | ||
// Add cert/key/ca etc options | ||
WSS_OPTIONS.forEach((prop) => { | ||
if ( | ||
Object.prototype.hasOwnProperty.call(opts, prop) && | ||
!Object.prototype.hasOwnProperty.call(opts.wsOptions, prop) | ||
) { | ||
options.wsOptions[prop] = opts[prop] | ||
} | ||
}) | ||
} | ||
return options | ||
return options | ||
} | ||
function setDefaultBrowserOpts (opts) { | ||
const options = setDefaultOpts(opts) | ||
function setDefaultBrowserOpts(opts) { | ||
const options = setDefaultOpts(opts) | ||
if (!options.hostname) { | ||
options.hostname = options.host | ||
} | ||
if (!options.hostname) { | ||
options.hostname = options.host | ||
} | ||
if (!options.hostname) { | ||
// Throwing an error in a Web Worker if no `hostname` is given, because we | ||
// can not determine the `hostname` automatically. If connecting to | ||
// localhost, please supply the `hostname` as an argument. | ||
if (typeof (document) === 'undefined') { | ||
throw new Error('Could not determine host. Specify host manually.') | ||
} | ||
const parsed = new URL(document.URL) | ||
options.hostname = parsed.hostname | ||
if (!options.hostname) { | ||
// Throwing an error in a Web Worker if no `hostname` is given, because we | ||
// can not determine the `hostname` automatically. If connecting to | ||
// localhost, please supply the `hostname` as an argument. | ||
if (typeof document === 'undefined') { | ||
throw new Error('Could not determine host. Specify host manually.') | ||
} | ||
const parsed = new URL(document.URL) | ||
options.hostname = parsed.hostname | ||
if (!options.port) { | ||
options.port = parsed.port | ||
} | ||
} | ||
if (!options.port) { | ||
options.port = parsed.port | ||
} | ||
} | ||
// objectMode should be defined for logic | ||
if (options.objectMode === undefined) { | ||
options.objectMode = !(options.binary === true || options.binary === undefined) | ||
} | ||
// objectMode should be defined for logic | ||
if (options.objectMode === undefined) { | ||
options.objectMode = !( | ||
options.binary === true || options.binary === undefined | ||
) | ||
} | ||
return options | ||
return options | ||
} | ||
function createWebSocket (client, url, opts) { | ||
debug('createWebSocket') | ||
debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion) | ||
const websocketSubProtocol = | ||
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
function createWebSocket(client, url, opts) { | ||
debug('createWebSocket') | ||
debug(`protocol: ${opts.protocolId} ${opts.protocolVersion}`) | ||
const websocketSubProtocol = | ||
opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3 | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol) | ||
const socket = new WS(url, [websocketSubProtocol], opts.wsOptions) | ||
return socket | ||
debug( | ||
`creating new Websocket for url: ${url} and protocol: ${websocketSubProtocol}`, | ||
) | ||
const socket = new WS(url, [websocketSubProtocol], opts.wsOptions) | ||
return socket | ||
} | ||
function createBrowserWebSocket (client, opts) { | ||
const websocketSubProtocol = | ||
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
function createBrowserWebSocket(client, opts) { | ||
const websocketSubProtocol = | ||
opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3 | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
const url = buildUrl(opts, client) | ||
/* global WebSocket */ | ||
const socket = new WebSocket(url, [websocketSubProtocol]) | ||
socket.binaryType = 'arraybuffer' | ||
return socket | ||
const url = buildUrl(opts, client) | ||
const socket = new WebSocket(url, [websocketSubProtocol]) | ||
socket.binaryType = 'arraybuffer' | ||
return socket | ||
} | ||
function streamBuilder (client, opts) { | ||
debug('streamBuilder') | ||
const options = setDefaultOpts(opts) | ||
const url = buildUrl(options, client) | ||
const socket = createWebSocket(client, url, options) | ||
const webSocketStream = WS.createWebSocketStream(socket, options.wsOptions) | ||
webSocketStream.url = url | ||
socket.on('close', () => { webSocketStream.destroy() }) | ||
return webSocketStream | ||
function streamBuilder(client, opts) { | ||
debug('streamBuilder') | ||
const options = setDefaultOpts(opts) | ||
const url = buildUrl(options, client) | ||
const socket = createWebSocket(client, url, options) | ||
const webSocketStream = WS.createWebSocketStream(socket, options.wsOptions) | ||
webSocketStream.url = url | ||
socket.on('close', () => { | ||
webSocketStream.destroy() | ||
}) | ||
return webSocketStream | ||
} | ||
function browserStreamBuilder (client, opts) { | ||
debug('browserStreamBuilder') | ||
let stream | ||
const options = setDefaultBrowserOpts(opts) | ||
// sets the maximum socket buffer size before throttling | ||
const bufferSize = options.browserBufferSize || 1024 * 512 | ||
function browserStreamBuilder(client, opts) { | ||
debug('browserStreamBuilder') | ||
let stream | ||
const options = setDefaultBrowserOpts(opts) | ||
// sets the maximum socket buffer size before throttling | ||
const bufferSize = options.browserBufferSize || 1024 * 512 | ||
const bufferTimeout = opts.browserBufferTimeout || 1000 | ||
const bufferTimeout = opts.browserBufferTimeout || 1000 | ||
const coerceToBuffer = !opts.objectMode | ||
const coerceToBuffer = !opts.objectMode | ||
const socket = createBrowserWebSocket(client, opts) | ||
const socket = createBrowserWebSocket(client, opts) | ||
const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser) | ||
const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser) | ||
if (!opts.objectMode) { | ||
proxy._writev = writev | ||
} | ||
proxy.on('close', () => { socket.close() }) | ||
if (!opts.objectMode) { | ||
proxy._writev = writev | ||
} | ||
proxy.on('close', () => { | ||
socket.close() | ||
}) | ||
const eventListenerSupport = (typeof socket.addEventListener !== 'undefined') | ||
const eventListenerSupport = typeof socket.addEventListener !== 'undefined' | ||
// was already open when passed in | ||
if (socket.readyState === socket.OPEN) { | ||
stream = proxy | ||
} else { | ||
stream = stream = duplexify(undefined, undefined, opts) | ||
if (!opts.objectMode) { | ||
stream._writev = writev | ||
} | ||
// was already open when passed in | ||
if (socket.readyState === socket.OPEN) { | ||
stream = proxy | ||
} else { | ||
stream = duplexify(undefined, undefined, opts) | ||
if (!opts.objectMode) { | ||
stream._writev = writev | ||
} | ||
if (eventListenerSupport) { | ||
socket.addEventListener('open', onopen) | ||
} else { | ||
socket.onopen = onopen | ||
} | ||
} | ||
if (eventListenerSupport) { | ||
socket.addEventListener('open', onOpen) | ||
} else { | ||
socket.onopen = onOpen | ||
} | ||
} | ||
stream.socket = socket | ||
stream.socket = socket | ||
if (eventListenerSupport) { | ||
socket.addEventListener('close', onclose) | ||
socket.addEventListener('error', onerror) | ||
socket.addEventListener('message', onmessage) | ||
} else { | ||
socket.onclose = onclose | ||
socket.onerror = onerror | ||
socket.onmessage = onmessage | ||
} | ||
if (eventListenerSupport) { | ||
socket.addEventListener('close', onclose) | ||
socket.addEventListener('error', onerror) | ||
socket.addEventListener('message', onmessage) | ||
} else { | ||
socket.onclose = onclose | ||
socket.onerror = onerror | ||
socket.onmessage = onmessage | ||
} | ||
// methods for browserStreamBuilder | ||
// methods for browserStreamBuilder | ||
function buildProxy (options, socketWrite, socketEnd) { | ||
const proxy = new Transform({ | ||
objectModeMode: options.objectMode | ||
}) | ||
function buildProxy(pOptions, socketWrite, socketEnd) { | ||
const _proxy = new Transform({ | ||
objectModeMode: pOptions.objectMode, | ||
}) | ||
proxy._write = socketWrite | ||
proxy._flush = socketEnd | ||
_proxy._write = socketWrite | ||
_proxy._flush = socketEnd | ||
return proxy | ||
} | ||
return _proxy | ||
} | ||
function onopen () { | ||
stream.setReadable(proxy) | ||
stream.setWritable(proxy) | ||
stream.emit('connect') | ||
} | ||
function onOpen() { | ||
stream.setReadable(proxy) | ||
stream.setWritable(proxy) | ||
stream.emit('connect') | ||
} | ||
function onclose () { | ||
stream.end() | ||
stream.destroy() | ||
} | ||
function onclose() { | ||
stream.end() | ||
stream.destroy() | ||
} | ||
function onerror (err) { | ||
stream.destroy(err) | ||
} | ||
function onerror(err) { | ||
stream.destroy(err) | ||
} | ||
function onmessage (event) { | ||
let data = event.data | ||
if (data instanceof ArrayBuffer) data = Buffer.from(data) | ||
else data = Buffer.from(data, 'utf8') | ||
proxy.push(data) | ||
} | ||
function onmessage(event) { | ||
let { data } = event | ||
if (data instanceof ArrayBuffer) data = Buffer.from(data) | ||
else data = Buffer.from(data, 'utf8') | ||
proxy.push(data) | ||
} | ||
// this is to be enabled only if objectMode is false | ||
function writev (chunks, cb) { | ||
const buffers = new Array(chunks.length) | ||
for (let i = 0; i < chunks.length; i++) { | ||
if (typeof chunks[i].chunk === 'string') { | ||
buffers[i] = Buffer.from(chunks[i], 'utf8') | ||
} else { | ||
buffers[i] = chunks[i].chunk | ||
} | ||
} | ||
// this is to be enabled only if objectMode is false | ||
function writev(chunks, cb) { | ||
const buffers = new Array(chunks.length) | ||
for (let i = 0; i < chunks.length; i++) { | ||
if (typeof chunks[i].chunk === 'string') { | ||
buffers[i] = Buffer.from(chunks[i], 'utf8') | ||
} else { | ||
buffers[i] = chunks[i].chunk | ||
} | ||
} | ||
this._write(Buffer.concat(buffers), 'binary', cb) | ||
} | ||
this._write(Buffer.concat(buffers), 'binary', cb) | ||
} | ||
function socketWriteBrowser (chunk, enc, next) { | ||
if (socket.bufferedAmount > bufferSize) { | ||
// throttle data until buffered amount is reduced. | ||
setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next) | ||
} | ||
function socketWriteBrowser(chunk, enc, next) { | ||
if (socket.bufferedAmount > bufferSize) { | ||
// throttle data until buffered amount is reduced. | ||
setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next) | ||
} | ||
if (coerceToBuffer && typeof chunk === 'string') { | ||
chunk = Buffer.from(chunk, 'utf8') | ||
} | ||
if (coerceToBuffer && typeof chunk === 'string') { | ||
chunk = Buffer.from(chunk, 'utf8') | ||
} | ||
try { | ||
socket.send(chunk) | ||
} catch (err) { | ||
return next(err) | ||
} | ||
try { | ||
socket.send(chunk) | ||
} catch (err) { | ||
return next(err) | ||
} | ||
next() | ||
} | ||
next() | ||
} | ||
function socketEndBrowser (done) { | ||
socket.close() | ||
done() | ||
} | ||
function socketEndBrowser(done) { | ||
socket.close() | ||
done() | ||
} | ||
// end methods for browserStreamBuilder | ||
// end methods for browserStreamBuilder | ||
return stream | ||
return stream | ||
} | ||
if (IS_BROWSER) { | ||
module.exports = browserStreamBuilder | ||
module.exports = browserStreamBuilder | ||
} else { | ||
module.exports = streamBuilder | ||
module.exports = streamBuilder | ||
} |
@@ -1,133 +0,132 @@ | ||
'use strict' | ||
const { Buffer } = require('buffer') | ||
const Transform = require('readable-stream').Transform | ||
const { Transform } = require('readable-stream') | ||
const duplexify = require('duplexify') | ||
/* global wx */ | ||
let socketTask, proxy, stream | ||
let socketTask | ||
let proxy | ||
let stream | ||
function buildProxy () { | ||
const proxy = new Transform() | ||
proxy._write = function (chunk, encoding, next) { | ||
socketTask.send({ | ||
data: chunk.buffer, | ||
success: function () { | ||
next() | ||
}, | ||
fail: function (errMsg) { | ||
next(new Error(errMsg)) | ||
} | ||
}) | ||
} | ||
proxy._flush = function socketEnd (done) { | ||
socketTask.close({ | ||
success: function () { | ||
done() | ||
} | ||
}) | ||
} | ||
function buildProxy() { | ||
const _proxy = new Transform() | ||
_proxy._write = (chunk, encoding, next) => { | ||
socketTask.send({ | ||
data: chunk.buffer, | ||
success() { | ||
next() | ||
}, | ||
fail(errMsg) { | ||
next(new Error(errMsg)) | ||
}, | ||
}) | ||
} | ||
_proxy._flush = (done) => { | ||
socketTask.close({ | ||
success() { | ||
done() | ||
}, | ||
}) | ||
} | ||
return proxy | ||
return _proxy | ||
} | ||
function setDefaultOpts (opts) { | ||
if (!opts.hostname) { | ||
opts.hostname = 'localhost' | ||
} | ||
if (!opts.path) { | ||
opts.path = '/' | ||
} | ||
function setDefaultOpts(opts) { | ||
if (!opts.hostname) { | ||
opts.hostname = 'localhost' | ||
} | ||
if (!opts.path) { | ||
opts.path = '/' | ||
} | ||
if (!opts.wsOptions) { | ||
opts.wsOptions = {} | ||
} | ||
if (!opts.wsOptions) { | ||
opts.wsOptions = {} | ||
} | ||
} | ||
function buildUrl (opts, client) { | ||
const protocol = opts.protocol === 'wxs' ? 'wss' : 'ws' | ||
let url = protocol + '://' + opts.hostname + opts.path | ||
if (opts.port && opts.port !== 80 && opts.port !== 443) { | ||
url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path | ||
} | ||
if (typeof (opts.transformWsUrl) === 'function') { | ||
url = opts.transformWsUrl(url, opts, client) | ||
} | ||
return url | ||
function buildUrl(opts, client) { | ||
const protocol = opts.protocol === 'wxs' ? 'wss' : 'ws' | ||
let url = `${protocol}://${opts.hostname}${opts.path}` | ||
if (opts.port && opts.port !== 80 && opts.port !== 443) { | ||
url = `${protocol}://${opts.hostname}:${opts.port}${opts.path}` | ||
} | ||
if (typeof opts.transformWsUrl === 'function') { | ||
url = opts.transformWsUrl(url, opts, client) | ||
} | ||
return url | ||
} | ||
function bindEventHandler () { | ||
socketTask.onOpen(function () { | ||
stream.setReadable(proxy) | ||
stream.setWritable(proxy) | ||
stream.emit('connect') | ||
}) | ||
function bindEventHandler() { | ||
socketTask.onOpen(() => { | ||
stream.setReadable(proxy) | ||
stream.setWritable(proxy) | ||
stream.emit('connect') | ||
}) | ||
socketTask.onMessage(function (res) { | ||
let data = res.data | ||
socketTask.onMessage((res) => { | ||
let { data } = res | ||
if (data instanceof ArrayBuffer) data = Buffer.from(data) | ||
else data = Buffer.from(data, 'utf8') | ||
proxy.push(data) | ||
}) | ||
if (data instanceof ArrayBuffer) data = Buffer.from(data) | ||
else data = Buffer.from(data, 'utf8') | ||
proxy.push(data) | ||
}) | ||
socketTask.onClose(function () { | ||
stream.end() | ||
stream.destroy() | ||
}) | ||
socketTask.onClose(() => { | ||
stream.end() | ||
stream.destroy() | ||
}) | ||
socketTask.onError(function (res) { | ||
stream.destroy(new Error(res.errMsg)) | ||
}) | ||
socketTask.onError((res) => { | ||
stream.destroy(new Error(res.errMsg)) | ||
}) | ||
} | ||
function buildStream (client, opts) { | ||
opts.hostname = opts.hostname || opts.host | ||
function buildStream(client, opts) { | ||
opts.hostname = opts.hostname || opts.host | ||
if (!opts.hostname) { | ||
throw new Error('Could not determine host. Specify host manually.') | ||
} | ||
if (!opts.hostname) { | ||
throw new Error('Could not determine host. Specify host manually.') | ||
} | ||
const websocketSubProtocol = | ||
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
const websocketSubProtocol = | ||
opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3 | ||
? 'mqttv3.1' | ||
: 'mqtt' | ||
setDefaultOpts(opts) | ||
setDefaultOpts(opts) | ||
const url = buildUrl(opts, client) | ||
socketTask = wx.connectSocket({ | ||
url: url, | ||
protocols: [websocketSubProtocol] | ||
}) | ||
const url = buildUrl(opts, client) | ||
socketTask = wx.connectSocket({ | ||
url, | ||
protocols: [websocketSubProtocol], | ||
}) | ||
proxy = buildProxy() | ||
stream = duplexify.obj() | ||
stream._destroy = function (err, cb) { | ||
socketTask.close({ | ||
success: function () { | ||
cb && cb(err) | ||
} | ||
}) | ||
} | ||
proxy = buildProxy() | ||
stream = duplexify.obj() | ||
stream._destroy = (err, cb) => { | ||
socketTask.close({ | ||
success() { | ||
if (cb) cb(err) | ||
}, | ||
}) | ||
} | ||
const destroyRef = stream.destroy | ||
stream.destroy = function () { | ||
stream.destroy = destroyRef | ||
const destroyRef = stream.destroy | ||
stream.destroy = () => { | ||
stream.destroy = destroyRef | ||
const self = this | ||
setTimeout(function () { | ||
socketTask.close({ | ||
fail: function () { | ||
self._destroy(new Error()) | ||
} | ||
}) | ||
}, 0) | ||
}.bind(stream) | ||
setTimeout(() => { | ||
socketTask.close({ | ||
fail() { | ||
stream._destroy(new Error()) | ||
}, | ||
}) | ||
}, 0) | ||
} | ||
bindEventHandler() | ||
bindEventHandler() | ||
return stream | ||
return stream | ||
} | ||
module.exports = buildStream |
@@ -1,3 +0,1 @@ | ||
'use strict' | ||
/** | ||
@@ -7,64 +5,60 @@ * DefaultMessageAllocator constructor | ||
*/ | ||
function DefaultMessageIdProvider () { | ||
if (!(this instanceof DefaultMessageIdProvider)) { | ||
return new DefaultMessageIdProvider() | ||
} | ||
class DefaultMessageIdProvider { | ||
constructor() { | ||
/** | ||
* MessageIDs starting with 1 | ||
* ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 | ||
*/ | ||
this.nextId = Math.max(1, Math.floor(Math.random() * 65535)) | ||
} | ||
/** | ||
* MessageIDs starting with 1 | ||
* ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 | ||
*/ | ||
this.nextId = Math.max(1, Math.floor(Math.random() * 65535)) | ||
} | ||
/** | ||
* allocate | ||
* | ||
* Get the next messageId. | ||
* @return unsigned int | ||
*/ | ||
allocate() { | ||
// id becomes current state of this.nextId and increments afterwards | ||
const id = this.nextId++ | ||
// Ensure 16 bit unsigned int (max 65535, nextId got one higher) | ||
if (this.nextId === 65536) { | ||
this.nextId = 1 | ||
} | ||
return id | ||
} | ||
/** | ||
* allocate | ||
* | ||
* Get the next messageId. | ||
* @return unsigned int | ||
*/ | ||
DefaultMessageIdProvider.prototype.allocate = function () { | ||
// id becomes current state of this.nextId and increments afterwards | ||
const id = this.nextId++ | ||
// Ensure 16 bit unsigned int (max 65535, nextId got one higher) | ||
if (this.nextId === 65536) { | ||
this.nextId = 1 | ||
} | ||
return id | ||
} | ||
/** | ||
* getLastAllocated | ||
* Get the last allocated messageId. | ||
* @return unsigned int | ||
*/ | ||
getLastAllocated() { | ||
return this.nextId === 1 ? 65535 : this.nextId - 1 | ||
} | ||
/** | ||
* getLastAllocated | ||
* Get the last allocated messageId. | ||
* @return unsigned int | ||
*/ | ||
DefaultMessageIdProvider.prototype.getLastAllocated = function () { | ||
return (this.nextId === 1) ? 65535 : (this.nextId - 1) | ||
} | ||
/** | ||
* register | ||
* Register messageId. If success return true, otherwise return false. | ||
* @param { unsigned int } - messageId to register, | ||
* @return boolean | ||
*/ | ||
register(messageId) { | ||
return true | ||
} | ||
/** | ||
* register | ||
* Register messageId. If success return true, otherwise return false. | ||
* @param { unsigned int } - messageId to register, | ||
* @return boolean | ||
*/ | ||
DefaultMessageIdProvider.prototype.register = function (messageId) { | ||
return true | ||
} | ||
/** | ||
* deallocate | ||
* Deallocate messageId. | ||
* @param { unsigned int } - messageId to deallocate, | ||
*/ | ||
deallocate(messageId) {} | ||
/** | ||
* deallocate | ||
* Deallocate messageId. | ||
* @param { unsigned int } - messageId to deallocate, | ||
*/ | ||
DefaultMessageIdProvider.prototype.deallocate = function (messageId) { | ||
/** | ||
* clear | ||
* Deallocate all messageIds. | ||
*/ | ||
clear() {} | ||
} | ||
/** | ||
* clear | ||
* Deallocate all messageIds. | ||
*/ | ||
DefaultMessageIdProvider.prototype.clear = function () { | ||
} | ||
module.exports = DefaultMessageIdProvider |
const legacyIsBrowser = | ||
(typeof process !== 'undefined' && process.title === 'browser') || | ||
// eslint-disable-next-line camelcase | ||
typeof __webpack_require__ === 'function' | ||
(typeof process !== 'undefined' && process.title === 'browser') || | ||
// eslint-disable-next-line camelcase | ||
typeof __webpack_require__ === 'function' | ||
const isBrowser = | ||
typeof window !== 'undefined' && typeof document !== 'undefined' | ||
typeof window !== 'undefined' && typeof document !== 'undefined' | ||
module.exports = { | ||
IS_BROWSER: isBrowser || legacyIsBrowser | ||
IS_BROWSER: isBrowser || legacyIsBrowser, | ||
} |
181
lib/store.js
@@ -1,12 +0,9 @@ | ||
'use strict' | ||
/** | ||
* Module dependencies | ||
*/ | ||
const xtend = require('xtend') | ||
const { Readable } = require('readable-stream') | ||
const Readable = require('readable-stream').Readable | ||
const streamsOpts = { objectMode: true } | ||
const defaultStoreOptions = { | ||
clean: true | ||
clean: true, | ||
} | ||
@@ -20,110 +17,106 @@ | ||
*/ | ||
function Store (options) { | ||
if (!(this instanceof Store)) { | ||
return new Store(options) | ||
} | ||
class Store { | ||
constructor(options) { | ||
this.options = options || {} | ||
this.options = options || {} | ||
// Defaults | ||
this.options = { ...defaultStoreOptions, ...options } | ||
// Defaults | ||
this.options = xtend(defaultStoreOptions, options) | ||
this._inflights = new Map() | ||
} | ||
this._inflights = new Map() | ||
} | ||
/** | ||
* Adds a packet to the store, a packet is | ||
* anything that has a messageId property. | ||
* | ||
*/ | ||
put(packet, cb) { | ||
this._inflights.set(packet.messageId, packet) | ||
/** | ||
* Adds a packet to the store, a packet is | ||
* anything that has a messageId property. | ||
* | ||
*/ | ||
Store.prototype.put = function (packet, cb) { | ||
this._inflights.set(packet.messageId, packet) | ||
if (cb) { | ||
cb() | ||
} | ||
if (cb) { | ||
cb() | ||
} | ||
return this | ||
} | ||
return this | ||
} | ||
/** | ||
* Creates a stream with all the packets in the store | ||
* | ||
*/ | ||
createStream() { | ||
const stream = new Readable(streamsOpts) | ||
const values = [] | ||
let destroyed = false | ||
let i = 0 | ||
/** | ||
* Creates a stream with all the packets in the store | ||
* | ||
*/ | ||
Store.prototype.createStream = function () { | ||
const stream = new Readable(streamsOpts) | ||
const values = [] | ||
let destroyed = false | ||
let i = 0 | ||
this._inflights.forEach((value, key) => { | ||
values.push(value) | ||
}) | ||
this._inflights.forEach(function (value, key) { | ||
values.push(value) | ||
}) | ||
stream._read = () => { | ||
if (!destroyed && i < values.length) { | ||
stream.push(values[i++]) | ||
} else { | ||
stream.push(null) | ||
} | ||
} | ||
stream._read = function () { | ||
if (!destroyed && i < values.length) { | ||
this.push(values[i++]) | ||
} else { | ||
this.push(null) | ||
} | ||
} | ||
stream.destroy = () => { | ||
if (destroyed) { | ||
return | ||
} | ||
stream.destroy = function () { | ||
if (destroyed) { | ||
return | ||
} | ||
destroyed = true | ||
const self = this | ||
setTimeout(() => { | ||
stream.emit('close') | ||
}, 0) | ||
} | ||
destroyed = true | ||
return stream | ||
} | ||
setTimeout(function () { | ||
self.emit('close') | ||
}, 0) | ||
} | ||
/** | ||
* deletes a packet from the store. | ||
*/ | ||
del(packet, cb) { | ||
packet = this._inflights.get(packet.messageId) | ||
if (packet) { | ||
this._inflights.delete(packet.messageId) | ||
cb(null, packet) | ||
} else if (cb) { | ||
cb(new Error('missing packet')) | ||
} | ||
return stream | ||
} | ||
return this | ||
} | ||
/** | ||
* deletes a packet from the store. | ||
*/ | ||
Store.prototype.del = function (packet, cb) { | ||
packet = this._inflights.get(packet.messageId) | ||
if (packet) { | ||
this._inflights.delete(packet.messageId) | ||
cb(null, packet) | ||
} else if (cb) { | ||
cb(new Error('missing packet')) | ||
} | ||
/** | ||
* get a packet from the store. | ||
*/ | ||
get(packet, cb) { | ||
packet = this._inflights.get(packet.messageId) | ||
if (packet) { | ||
cb(null, packet) | ||
} else if (cb) { | ||
cb(new Error('missing packet')) | ||
} | ||
return this | ||
} | ||
return this | ||
} | ||
/** | ||
* get a packet from the store. | ||
*/ | ||
Store.prototype.get = function (packet, cb) { | ||
packet = this._inflights.get(packet.messageId) | ||
if (packet) { | ||
cb(null, packet) | ||
} else if (cb) { | ||
cb(new Error('missing packet')) | ||
} | ||
return this | ||
/** | ||
* Close the store | ||
*/ | ||
close(cb) { | ||
if (this.options.clean) { | ||
this._inflights = null | ||
} | ||
if (cb) { | ||
cb() | ||
} | ||
} | ||
} | ||
/** | ||
* Close the store | ||
*/ | ||
Store.prototype.close = function (cb) { | ||
if (this.options.clean) { | ||
this._inflights = null | ||
} | ||
if (cb) { | ||
cb() | ||
} | ||
} | ||
module.exports = Store |
@@ -1,3 +0,1 @@ | ||
'use strict' | ||
/** | ||
@@ -8,41 +6,40 @@ * Topic Alias receiving manager | ||
*/ | ||
function TopicAliasRecv (max) { | ||
if (!(this instanceof TopicAliasRecv)) { | ||
return new TopicAliasRecv(max) | ||
} | ||
this.aliasToTopic = {} | ||
this.max = max | ||
} | ||
class TopicAliasRecv { | ||
constructor(max) { | ||
this.aliasToTopic = {} | ||
this.max = max | ||
} | ||
/** | ||
* Insert or update topic - alias entry. | ||
* @param {String} [topic] - topic | ||
* @param {Number} [alias] - topic alias | ||
* @returns {Boolean} - if success return true otherwise false | ||
*/ | ||
TopicAliasRecv.prototype.put = function (topic, alias) { | ||
if (alias === 0 || alias > this.max) { | ||
return false | ||
} | ||
this.aliasToTopic[alias] = topic | ||
this.length = Object.keys(this.aliasToTopic).length | ||
return true | ||
} | ||
/** | ||
* Insert or update topic - alias entry. | ||
* @param {String} [topic] - topic | ||
* @param {Number} [alias] - topic alias | ||
* @returns {Boolean} - if success return true otherwise false | ||
*/ | ||
put(topic, alias) { | ||
if (alias === 0 || alias > this.max) { | ||
return false | ||
} | ||
this.aliasToTopic[alias] = topic | ||
this.length = Object.keys(this.aliasToTopic).length | ||
return true | ||
} | ||
/** | ||
* Get topic by alias | ||
* @param {String} [topic] - topic | ||
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined | ||
*/ | ||
TopicAliasRecv.prototype.getTopicByAlias = function (alias) { | ||
return this.aliasToTopic[alias] | ||
} | ||
/** | ||
* Get topic by alias | ||
* @param {String} [topic] - topic | ||
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined | ||
*/ | ||
getTopicByAlias(alias) { | ||
return this.aliasToTopic[alias] | ||
} | ||
/** | ||
* Clear all entries | ||
*/ | ||
TopicAliasRecv.prototype.clear = function () { | ||
this.aliasToTopic = {} | ||
/** | ||
* Clear all entries | ||
*/ | ||
clear() { | ||
this.aliasToTopic = {} | ||
} | ||
} | ||
module.exports = TopicAliasRecv |
@@ -1,8 +0,6 @@ | ||
'use strict' | ||
/** | ||
* Module dependencies | ||
*/ | ||
const LruMap = require('lru-cache') | ||
const NumberAllocator = require('number-allocator').NumberAllocator | ||
const LRUCache = require('lru-cache') | ||
const { NumberAllocator } = require('number-allocator') | ||
@@ -14,79 +12,78 @@ /** | ||
*/ | ||
function TopicAliasSend (max) { | ||
if (!(this instanceof TopicAliasSend)) { | ||
return new TopicAliasSend(max) | ||
} | ||
class TopicAliasSend { | ||
constructor(max) { | ||
if (max > 0) { | ||
this.aliasToTopic = new LRUCache({ max }) | ||
this.topicToAlias = {} | ||
this.numberAllocator = new NumberAllocator(1, max) | ||
this.max = max | ||
this.length = 0 | ||
} | ||
} | ||
if (max > 0) { | ||
this.aliasToTopic = new LruMap({ max: max }) | ||
this.topicToAlias = {} | ||
this.numberAllocator = new NumberAllocator(1, max) | ||
this.max = max | ||
this.length = 0 | ||
} | ||
} | ||
/** | ||
* Insert or update topic - alias entry. | ||
* @param {String} [topic] - topic | ||
* @param {Number} [alias] - topic alias | ||
* @returns {Boolean} - if success return true otherwise false | ||
*/ | ||
put(topic, alias) { | ||
if (alias === 0 || alias > this.max) { | ||
return false | ||
} | ||
const entry = this.aliasToTopic.get(alias) | ||
if (entry) { | ||
delete this.topicToAlias[entry] | ||
} | ||
this.aliasToTopic.set(alias, topic) | ||
this.topicToAlias[topic] = alias | ||
this.numberAllocator.use(alias) | ||
this.length = this.aliasToTopic.size | ||
return true | ||
} | ||
/** | ||
* Insert or update topic - alias entry. | ||
* @param {String} [topic] - topic | ||
* @param {Number} [alias] - topic alias | ||
* @returns {Boolean} - if success return true otherwise false | ||
*/ | ||
TopicAliasSend.prototype.put = function (topic, alias) { | ||
if (alias === 0 || alias > this.max) { | ||
return false | ||
} | ||
const entry = this.aliasToTopic.get(alias) | ||
if (entry) { | ||
delete this.topicToAlias[entry] | ||
} | ||
this.aliasToTopic.set(alias, topic) | ||
this.topicToAlias[topic] = alias | ||
this.numberAllocator.use(alias) | ||
this.length = this.aliasToTopic.length | ||
return true | ||
} | ||
/** | ||
* Get topic by alias | ||
* @param {Number} [alias] - topic alias | ||
* @returns {String} - if mapped topic exists return topic, otherwise return undefined | ||
*/ | ||
getTopicByAlias(alias) { | ||
return this.aliasToTopic.get(alias) | ||
} | ||
/** | ||
* Get topic by alias | ||
* @param {Number} [alias] - topic alias | ||
* @returns {String} - if mapped topic exists return topic, otherwise return undefined | ||
*/ | ||
TopicAliasSend.prototype.getTopicByAlias = function (alias) { | ||
return this.aliasToTopic.get(alias) | ||
} | ||
/** | ||
* Get topic by alias | ||
* @param {String} [topic] - topic | ||
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined | ||
*/ | ||
getAliasByTopic(topic) { | ||
const alias = this.topicToAlias[topic] | ||
if (typeof alias !== 'undefined') { | ||
this.aliasToTopic.get(alias) // LRU update | ||
} | ||
return alias | ||
} | ||
/** | ||
* Get topic by alias | ||
* @param {String} [topic] - topic | ||
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined | ||
*/ | ||
TopicAliasSend.prototype.getAliasByTopic = function (topic) { | ||
const alias = this.topicToAlias[topic] | ||
if (typeof alias !== 'undefined') { | ||
this.aliasToTopic.get(alias) // LRU update | ||
} | ||
return alias | ||
} | ||
/** | ||
* Clear all entries | ||
*/ | ||
clear() { | ||
this.aliasToTopic.clear() | ||
this.topicToAlias = {} | ||
this.numberAllocator.clear() | ||
this.length = 0 | ||
} | ||
/** | ||
* Clear all entries | ||
*/ | ||
TopicAliasSend.prototype.clear = function () { | ||
this.aliasToTopic.reset() | ||
this.topicToAlias = {} | ||
this.numberAllocator.clear() | ||
this.length = 0 | ||
/** | ||
* Get Least Recently Used (LRU) topic alias | ||
* @returns {Number} - if vacant alias exists then return it, otherwise then return LRU alias | ||
*/ | ||
getLruAlias() { | ||
const alias = this.numberAllocator.firstVacant() | ||
if (alias) return alias | ||
// get last alias (key) from LRU cache | ||
return [...this.aliasToTopic.keys()][this.aliasToTopic.size - 1] | ||
} | ||
} | ||
/** | ||
* Get Least Recently Used (LRU) topic alias | ||
* @returns {Number} - if vacant alias exists then return it, otherwise then return LRU alias | ||
*/ | ||
TopicAliasSend.prototype.getLruAlias = function () { | ||
const alias = this.numberAllocator.firstVacant() | ||
if (alias) return alias | ||
return this.aliasToTopic.keys()[this.aliasToTopic.length - 1] | ||
} | ||
module.exports = TopicAliasSend |
@@ -1,5 +0,3 @@ | ||
'use strict' | ||
const { NumberAllocator } = require('number-allocator') | ||
const NumberAllocator = require('number-allocator').NumberAllocator | ||
/** | ||
@@ -9,58 +7,56 @@ * UniqueMessageAllocator constructor | ||
*/ | ||
function UniqueMessageIdProvider () { | ||
if (!(this instanceof UniqueMessageIdProvider)) { | ||
return new UniqueMessageIdProvider() | ||
} | ||
class UniqueMessageIdProvider { | ||
constructor() { | ||
this.numberAllocator = new NumberAllocator(1, 65535) | ||
} | ||
this.numberAllocator = new NumberAllocator(1, 65535) | ||
} | ||
/** | ||
* allocate | ||
* | ||
* Get the next messageId. | ||
* @return if messageId is fully allocated then return null, | ||
* otherwise return the smallest usable unsigned int messageId. | ||
*/ | ||
allocate() { | ||
this.lastId = this.numberAllocator.alloc() | ||
return this.lastId | ||
} | ||
/** | ||
* allocate | ||
* | ||
* Get the next messageId. | ||
* @return if messageId is fully allocated then return null, | ||
* otherwise return the smallest usable unsigned int messageId. | ||
*/ | ||
UniqueMessageIdProvider.prototype.allocate = function () { | ||
this.lastId = this.numberAllocator.alloc() | ||
return this.lastId | ||
} | ||
/** | ||
* getLastAllocated | ||
* Get the last allocated messageId. | ||
* @return unsigned int | ||
*/ | ||
getLastAllocated() { | ||
return this.lastId | ||
} | ||
/** | ||
* getLastAllocated | ||
* Get the last allocated messageId. | ||
* @return unsigned int | ||
*/ | ||
UniqueMessageIdProvider.prototype.getLastAllocated = function () { | ||
return this.lastId | ||
} | ||
/** | ||
* register | ||
* Register messageId. If success return true, otherwise return false. | ||
* @param { unsigned int } - messageId to register, | ||
* @return boolean | ||
*/ | ||
register(messageId) { | ||
return this.numberAllocator.use(messageId) | ||
} | ||
/** | ||
* register | ||
* Register messageId. If success return true, otherwise return false. | ||
* @param { unsigned int } - messageId to register, | ||
* @return boolean | ||
*/ | ||
UniqueMessageIdProvider.prototype.register = function (messageId) { | ||
return this.numberAllocator.use(messageId) | ||
} | ||
/** | ||
* deallocate | ||
* Deallocate messageId. | ||
* @param { unsigned int } - messageId to deallocate, | ||
*/ | ||
deallocate(messageId) { | ||
this.numberAllocator.free(messageId) | ||
} | ||
/** | ||
* deallocate | ||
* Deallocate messageId. | ||
* @param { unsigned int } - messageId to deallocate, | ||
*/ | ||
UniqueMessageIdProvider.prototype.deallocate = function (messageId) { | ||
this.numberAllocator.free(messageId) | ||
/** | ||
* clear | ||
* Deallocate all messageIds. | ||
*/ | ||
clear() { | ||
this.numberAllocator.clear() | ||
} | ||
} | ||
/** | ||
* clear | ||
* Deallocate all messageIds. | ||
*/ | ||
UniqueMessageIdProvider.prototype.clear = function () { | ||
this.numberAllocator.clear() | ||
} | ||
module.exports = UniqueMessageIdProvider |
@@ -1,3 +0,1 @@ | ||
'use strict' | ||
/** | ||
@@ -12,21 +10,21 @@ * Validate a topic to see if it's valid or not. | ||
*/ | ||
function validateTopic (topic) { | ||
const parts = topic.split('/') | ||
function validateTopic(topic) { | ||
const parts = topic.split('/') | ||
for (let i = 0; i < parts.length; i++) { | ||
if (parts[i] === '+') { | ||
continue | ||
} | ||
for (let i = 0; i < parts.length; i++) { | ||
if (parts[i] === '+') { | ||
continue | ||
} | ||
if (parts[i] === '#') { | ||
// for Rule #2 | ||
return i === parts.length - 1 | ||
} | ||
if (parts[i] === '#') { | ||
// for Rule #2 | ||
return i === parts.length - 1 | ||
} | ||
if (parts[i].indexOf('+') !== -1 || parts[i].indexOf('#') !== -1) { | ||
return false | ||
} | ||
} | ||
if (parts[i].indexOf('+') !== -1 || parts[i].indexOf('#') !== -1) { | ||
return false | ||
} | ||
} | ||
return true | ||
return true | ||
} | ||
@@ -36,19 +34,19 @@ | ||
* Validate an array of topics to see if any of them is valid or not | ||
* @param {Array} topics - Array of topics | ||
* @param {Array} topics - Array of topics | ||
* @returns {String} If the topics is valid, returns null. Otherwise, returns the invalid one | ||
*/ | ||
function validateTopics (topics) { | ||
if (topics.length === 0) { | ||
return 'empty_topic_list' | ||
} | ||
for (let i = 0; i < topics.length; i++) { | ||
if (!validateTopic(topics[i])) { | ||
return topics[i] | ||
} | ||
} | ||
return null | ||
function validateTopics(topics) { | ||
if (topics.length === 0) { | ||
return 'empty_topic_list' | ||
} | ||
for (let i = 0; i < topics.length; i++) { | ||
if (!validateTopic(topics[i])) { | ||
return topics[i] | ||
} | ||
} | ||
return null | ||
} | ||
module.exports = { | ||
validateTopics: validateTopics | ||
validateTopics, | ||
} |
{ | ||
"name": "mqtt", | ||
"description": "A library for the MQTT protocol", | ||
"version": "5.0.0-beta.2", | ||
"version": "5.0.0-beta.3", | ||
"contributors": [ | ||
@@ -26,3 +26,4 @@ "Adam Rudd <adamvrr@gmail.com>", | ||
"scripts": { | ||
"pretest": "standard | snazzy", | ||
"lint": "eslint --ext .js .", | ||
"lint-fix": "eslint --fix --ext .js .", | ||
"typescript-compile-test": "tsc -p test/typescript/tsconfig.json", | ||
@@ -66,3 +67,3 @@ "typescript-compile-execute": "node test/typescript/broker-connect-subscribe-and-publish.js", | ||
"pre-commit": [ | ||
"pretest" | ||
"lint" | ||
], | ||
@@ -93,55 +94,52 @@ "bin": { | ||
"dependencies": { | ||
"commist": "^1.0.0", | ||
"commist": "^3.2.0", | ||
"concat-stream": "^2.0.0", | ||
"debug": "^4.1.1", | ||
"duplexify": "^4.1.1", | ||
"help-me": "^3.0.0", | ||
"inherits": "^2.0.3", | ||
"lru-cache": "^6.0.0", | ||
"minimist": "^1.2.6", | ||
"mqtt-packet": "^8.1.2", | ||
"debug": "^4.3.4", | ||
"duplexify": "^4.1.2", | ||
"help-me": "^4.2.0", | ||
"lru-cache": "^7.18.3", | ||
"minimist": "^1.2.8", | ||
"mqtt-packet": "^8.2.0", | ||
"number-allocator": "^1.0.14", | ||
"pump": "^3.0.0", | ||
"readable-stream": "^4.1.0", | ||
"readable-stream": "^4.4.2", | ||
"reinterval": "^1.1.0", | ||
"rfdc": "^1.3.0", | ||
"split2": "^3.1.0", | ||
"ws": "^7.5.5", | ||
"xtend": "^4.0.2" | ||
"split2": "^4.2.0", | ||
"ws": "^8.13.0" | ||
}, | ||
"devDependencies": { | ||
"@release-it/conventional-changelog": "^5.1.1", | ||
"@types/node": "^12.20.55", | ||
"@types/tape": "^4.13.2", | ||
"@types/ws": "^7.4.7", | ||
"aedes": "^0.46.2", | ||
"@release-it/conventional-changelog": "^6.0.0", | ||
"@types/node": "^20.4.0", | ||
"@types/tape": "^5.6.0", | ||
"@types/ws": "^8.5.5", | ||
"airtap": "^4.0.4", | ||
"airtap-playwright": "^1.0.1", | ||
"browserify": "^17.0.0", | ||
"chai": "^4.2.0", | ||
"chai": "^4.3.7", | ||
"chokidar": "^3.5.3", | ||
"codecov": "^3.0.4", | ||
"codecov": "^3.8.2", | ||
"conventional-changelog-cli": "^3.0.0", | ||
"end-of-stream": "^1.4.1", | ||
"global": "^4.3.2", | ||
"mkdirp": "^0.5.1", | ||
"mocha": "^9.2.0", | ||
"mqtt-connection": "^4.0.0", | ||
"end-of-stream": "^1.4.4", | ||
"eslint": "^8.45.0", | ||
"eslint-config-airbnb-base": "^15.0.0", | ||
"eslint-config-prettier": "^8.8.0", | ||
"eslint-plugin-import": "^2.27.5", | ||
"eslint-plugin-prettier": "^5.0.0", | ||
"global": "^4.4.0", | ||
"mkdirp": "^3.0.1", | ||
"mocha": "^10.2.0", | ||
"mqtt-connection": "^4.1.0", | ||
"mqtt-level-store": "^3.1.0", | ||
"nyc": "^15.0.1", | ||
"nyc": "^15.1.0", | ||
"pre-commit": "^1.2.2", | ||
"prettier": "^3.0.0", | ||
"release-it": "^15.11.0", | ||
"rimraf": "^3.0.2", | ||
"should": "^13.2.1", | ||
"sinon": "^9.0.0", | ||
"rimraf": "^5.0.1", | ||
"should": "^13.2.3", | ||
"sinon": "^15.2.0", | ||
"snazzy": "^9.0.0", | ||
"standard": "^16.0.4", | ||
"tape": "^5.5.2", | ||
"terser": "^5.14.2", | ||
"typescript": "^4.5.5" | ||
}, | ||
"standard": { | ||
"env": [ | ||
"mocha" | ||
] | ||
"tape": "^5.6.4", | ||
"terser": "^5.18.2", | ||
"typescript": "^5.1.6" | ||
} | ||
} |
@@ -5,6 +5,11 @@ # ![mqtt.js](https://raw.githubusercontent.com/mqttjs/MQTT.js/137ee0e3940c1f01049a30248c70f24dc6e6f829/MQTT.js.png) | ||
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat)](http://standardjs.com/) [![Maintenance](https://img.shields.io/badge/Maintained%3F-yes-green.svg)](https://github.com/mqttjs/MQTT.js/graphs/commit-activity) | ||
[![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](https://github.com/mqttjs/MQTT.js/pulls)\ | ||
![NPM Version](https://img.shields.io/npm/v/mqtt?logo=npm) ![NPM Downloads](https://img.shields.io/npm/dm/mqtt.svg) | ||
MQTT.js is a client library for the [MQTT](http://mqtt.org/) protocol, written | ||
in JavaScript for node.js and the browser. | ||
> MQTT [5.0.0-beta.0](https://github.com/mqttjs/MQTT.js/releases/tag/v5.0.0-beta.0) is now available! Try it out and give us feedback! `npm i mqtt@5.0.0-beta.0` | ||
> MQTT [5.0.0 BETA](https://www.npmjs.com/package/mqtt/v/beta) is now available! Try it out and give us [feedback](https://github.com/mqttjs/MQTT.js/issues/1639): `npm i mqtt@beta` | ||
@@ -100,4 +105,4 @@ ## Table of Contents | ||
client.on("connect", function () { | ||
client.subscribe("presence", function (err) { | ||
client.on("connect", () => { | ||
client.subscribe("presence", (err) => { | ||
if (!err) { | ||
@@ -109,3 +114,3 @@ client.publish("presence", "Hello mqtt"); | ||
client.on("message", function (topic, message) { | ||
client.on("message", (topic, message) => { | ||
// message is Buffer | ||
@@ -306,19 +311,20 @@ console.log(message.toString()); | ||
- <a href="#connect"><code>mqtt.<b>connect()</b></code></a> | ||
- <a href="#client"><code>mqtt.<b>Client()</b></code></a> | ||
- <a href="#publish"><code>mqtt.Client#<b>publish()</b></code></a> | ||
- <a href="#subscribe"><code>mqtt.Client#<b>subscribe()</b></code></a> | ||
- <a href="#unsubscribe"><code>mqtt.Client#<b>unsubscribe()</b></code></a> | ||
- <a href="#end"><code>mqtt.Client#<b>end()</b></code></a> | ||
- <a href="#removeOutgoingMessage"><code>mqtt.Client#<b>removeOutgoingMessage()</b></code></a> | ||
- <a href="#reconnect"><code>mqtt.Client#<b>reconnect()</b></code></a> | ||
- <a href="#handleMessage"><code>mqtt.Client#<b>handleMessage()</b></code></a> | ||
- <a href="#connected"><code>mqtt.Client#<b>connected</b></code></a> | ||
- <a href="#reconnecting"><code>mqtt.Client#<b>reconnecting</b></code></a> | ||
- <a href="#getLastMessageId"><code>mqtt.Client#<b>getLastMessageId()</b></code></a> | ||
- <a href="#store"><code>mqtt.<b>Store()</b></code></a> | ||
- <a href="#put"><code>mqtt.Store#<b>put()</b></code></a> | ||
- <a href="#del"><code>mqtt.Store#<b>del()</b></code></a> | ||
- <a href="#createStream"><code>mqtt.Store#<b>createStream()</b></code></a> | ||
- <a href="#close"><code>mqtt.Store#<b>close()</b></code></a> | ||
- [`mqtt.connect()`](#connect) | ||
- [`mqtt.Client()`](#client) | ||
- [`mqtt.Client#connect()`](#client-connect) | ||
- [`mqtt.Client#publish()`](#publish) | ||
- [`mqtt.Client#subscribe()`](#subscribe) | ||
- [`mqtt.Client#unsubscribe()`](#unsubscribe) | ||
- [`mqtt.Client#end()`](#end) | ||
- [`mqtt.Client#removeOutgoingMessage()`](#removeOutgoingMessage) | ||
- [`mqtt.Client#reconnect()`](#reconnect) | ||
- [`mqtt.Client#handleMessage()`](#handleMessage) | ||
- [`mqtt.Client#connected`](#connected) | ||
- [`mqtt.Client#reconnecting`](#reconnecting) | ||
- [`mqtt.Client#getLastMessageId()`](#getLastMessageId) | ||
- [`mqtt.Store()`](#store) | ||
- [`mqtt.Store#put()`](#put) | ||
- [`mqtt.Store#del()`](#del) | ||
- [`mqtt.Store#createStream()`](#createStream) | ||
- [`mqtt.Store#close()`](#close) | ||
@@ -370,3 +376,3 @@ --- | ||
- `wsOptions`: is the WebSocket connection options. Default is `{}`. | ||
It's specific for WebSockets. For possible options have a look at: https://github.com/websockets/ws/blob/master/doc/ws.md. | ||
It's specific for WebSockets. For possible options have a look at: <https://github.com/websockets/ws/blob/master/doc/ws.md>. | ||
- `keepalive`: `60` seconds, set to `0` to disable | ||
@@ -389,5 +395,7 @@ - `reschedulePings`: reschedule ping messages after sending packets (default `true`) | ||
- `customHandleAcks`: MQTT 5 feature of custom handling puback and pubrec packets. Its callback: | ||
```js | ||
customHandleAcks: function(topic, message, packet, done) {/*some logic wit colling done(error, reasonCode)*/} | ||
``` | ||
- `autoUseTopicAlias`: enabling automatic Topic Alias using functionality | ||
@@ -427,2 +435,4 @@ - `autoAssignTopicAlias`: enabling automatic Topic Alias assign functionality | ||
- `messageIdProvider`: custom messageId provider. when `new UniqueMessageIdProvider()` is set, then non conflict messageId is provided. | ||
- `log`: custom log function. Default uses [debug](https://www.npmjs.com/package/debug) package. | ||
- `manualConnect`: prevents the constructor to call `connect`. In this case after the `mqtt.connect` is called you should call `client.connect` manually. | ||
@@ -501,3 +511,3 @@ In case mqtts (mqtt over tls) is required, the `options` object is | ||
Emitted when <a href="#end"><code>mqtt.Client#<b>end()</b></code></a> is called. | ||
Emitted when [`mqtt.Client#end()`](#end) is called. | ||
If a callback was passed to `mqtt.Client#end()`, this event is emitted once the | ||
@@ -540,2 +550,8 @@ callback returns. | ||
<a name="client-connect"></a> | ||
### mqtt.Client#connect() | ||
By default client connects when constructor is called. To prevent this you can set `manualConnect` option to `true` and call `client.connect()` manually. | ||
<a name="publish"></a> | ||
@@ -752,5 +768,5 @@ | ||
The MQTT.js bundle is available through http://unpkg.com, specifically | ||
at https://unpkg.com/mqtt/dist/mqtt.min.js. | ||
See http://unpkg.com for the full documentation on version ranges. | ||
The MQTT.js bundle is available through <http://unpkg.com>, specifically | ||
at <https://unpkg.com/mqtt/dist/mqtt.min.js>. | ||
See <http://unpkg.com> for the full documentation on version ranges. | ||
@@ -757,0 +773,0 @@ <a name="browserify"></a> |
@@ -64,2 +64,4 @@ import { MqttClient } from './client' | ||
log?: (...args: any[]) => void | ||
autoUseTopicAlias?: boolean | ||
@@ -109,3 +111,8 @@ autoAssignTopicAlias?: boolean | ||
userProperties?: UserProperties | ||
} | ||
}, | ||
authPacket?: any | ||
/** Prevent to call `connect` in constructor */ | ||
manualConnect?: boolean | ||
} | ||
@@ -201,2 +208,11 @@ transformWsUrl?: (url: string, options: IClientOptions, client: MqttClient) => string, | ||
} | ||
export interface IClientSubscribeProperties { | ||
/* | ||
* MQTT 5.0 properies object of subscribe | ||
* */ | ||
properties?: { | ||
subscriptionIdentifier?: number, | ||
userProperties?: UserProperties | ||
} | ||
} | ||
export interface IClientReconnectOptions { | ||
@@ -203,0 +219,0 @@ /** |
@@ -8,2 +8,3 @@ /// <reference types="node" /> | ||
IClientSubscribeOptions, | ||
IClientSubscribeProperties, | ||
IClientReconnectOptions | ||
@@ -102,2 +103,4 @@ } from './client-options' | ||
static defaultId (): string | ||
constructor (streamBuilder: (client: MqttClient) => IStream, options: IClientOptions) | ||
@@ -124,2 +127,7 @@ | ||
/** | ||
* Setup the event handlers in the inner stream, sends `connect` and `auth` packets | ||
*/ | ||
public connect(): this | ||
/** | ||
* publish - publish <message> to <topic> | ||
@@ -170,3 +178,7 @@ * | ||
string | ||
| string[], opts: IClientSubscribeOptions, callback?: ClientSubscribeCallback): this | ||
| string[] | ||
| ISubscriptionMap, | ||
opts: | ||
IClientSubscribeOptions | ||
| IClientSubscribeProperties, callback?: ClientSubscribeCallback): this | ||
public subscribe (topic: | ||
@@ -173,0 +185,0 @@ string |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1209889
14
42
28720
958
33
+ Addedbrace-expansion@2.0.1(transitive)
+ Addedcommist@3.2.0(transitive)
+ Addedglob@8.1.0(transitive)
+ Addedhelp-me@4.2.0(transitive)
+ Addedlru-cache@7.18.3(transitive)
+ Addedminimatch@5.1.6(transitive)
+ Addedsplit2@4.2.0(transitive)
+ Addedws@8.18.0(transitive)
- Removedinherits@^2.0.3
- Removedpump@^3.0.0
- Removedxtend@^4.0.2
- Removedbrace-expansion@1.1.11(transitive)
- Removedcommist@1.1.0(transitive)
- Removedconcat-map@0.0.1(transitive)
- Removedglob@7.2.3(transitive)
- Removedhelp-me@3.0.0(transitive)
- Removedleven@2.1.0(transitive)
- Removedlru-cache@6.0.0(transitive)
- Removedminimatch@3.1.2(transitive)
- Removedpath-is-absolute@1.0.1(transitive)
- Removedpump@3.0.2(transitive)
- Removedsplit2@3.2.2(transitive)
- Removedws@7.5.10(transitive)
- Removedxtend@4.0.2(transitive)
- Removedyallist@4.0.0(transitive)
Updatedcommist@^3.2.0
Updateddebug@^4.3.4
Updatedduplexify@^4.1.2
Updatedhelp-me@^4.2.0
Updatedlru-cache@^7.18.3
Updatedminimist@^1.2.8
Updatedmqtt-packet@^8.2.0
Updatedreadable-stream@^4.4.2
Updatedsplit2@^4.2.0
Updatedws@^8.13.0