Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mqtt-packet

Package Overview
Dependencies
Maintainers
1
Versions
73
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mqtt-packet - npm Package Compare versions

Comparing version 3.4.4 to 4.0.0

benchmarks/generateNet.js

2

benchmarks/generate.js
var mqtt = require('../')
, max = 10000000
, max = 100000
, i

@@ -5,0 +5,0 @@ , start = Date.now()

/* Protocol - protocol constants */
var protocol = module.exports;
/* Command code => mnemonic */
module.exports.types = {
protocol.types = {
0: 'reserved',

@@ -24,30 +25,84 @@ 1: 'connect',

/* Mnemonic => Command code */
module.exports.codes = {}
for(var k in module.exports.types) {
var v = module.exports.types[k];
module.exports.codes[v] = k;
protocol.codes = {}
for(var k in protocol.types) {
var v = protocol.types[k];
protocol.codes[v] = k;
}
/* Header */
module.exports.CMD_SHIFT = 4;
module.exports.CMD_MASK = 0xF0;
module.exports.DUP_MASK = 0x08;
module.exports.QOS_MASK = 0x03;
module.exports.QOS_SHIFT = 1;
module.exports.RETAIN_MASK = 0x01;
protocol.CMD_SHIFT = 4;
protocol.CMD_MASK = 0xF0;
protocol.DUP_MASK = 0x08;
protocol.QOS_MASK = 0x03;
protocol.QOS_SHIFT = 1;
protocol.RETAIN_MASK = 0x01;
/* Length */
module.exports.LENGTH_MASK = 0x7F;
module.exports.LENGTH_FIN_MASK = 0x80;
protocol.LENGTH_MASK = 0x7F;
protocol.LENGTH_FIN_MASK = 0x80;
/* Connack */
module.exports.SESSIONPRESENT_MASK = 0x01;
protocol.SESSIONPRESENT_MASK = 0x01;
protocol.SESSIONPRESENT_HEADER = new Buffer([protocol.SESSIONPRESENT_MASK]);
protocol.CONNACK_HEADER = new Buffer([protocol.codes['connack'] << protocol.CMD_SHIFT])
/* Connect */
module.exports.USERNAME_MASK = 0x80;
module.exports.PASSWORD_MASK = 0x40;
module.exports.WILL_RETAIN_MASK = 0x20;
module.exports.WILL_QOS_MASK = 0x18;
module.exports.WILL_QOS_SHIFT = 3;
module.exports.WILL_FLAG_MASK = 0x04;
module.exports.CLEAN_SESSION_MASK = 0x02;
protocol.USERNAME_MASK = 0x80;
protocol.PASSWORD_MASK = 0x40;
protocol.WILL_RETAIN_MASK = 0x20;
protocol.WILL_QOS_MASK = 0x18;
protocol.WILL_QOS_SHIFT = 3;
protocol.WILL_FLAG_MASK = 0x04;
protocol.CLEAN_SESSION_MASK = 0x02;
protocol.CONNECT_HEADER = new Buffer([protocol.codes['connect'] << protocol.CMD_SHIFT])
function genHeader (type) {
return [0, 1, 2].map(function(qos) {
return [0, 1].map(function(dup) {
return [0, 1].map(function(retain) {
var buf = new Buffer(1)
buf.writeUInt8(
protocol.codes[type] << protocol.CMD_SHIFT |
(dup ? protocol.DUP_MASK : 0 ) |
qos << protocol.QOS_SHIFT | retain, 0, true)
return buf
});
});
});
}
/* Publish */
protocol.PUBLISH_HEADER = genHeader('publish');
/* SUBSCRIBE */
protocol.SUBSCRIBE_HEADER = genHeader('subscribe');
/* UNSUBSCRIBE */
protocol.UNSUBSCRIBE_HEADER = genHeader('unsubscribe');
/* Confirmations */
protocol.ACKS = {
unsuback: genHeader('unsuback'),
puback: genHeader('puback'),
pubcomp: genHeader('pubcomp'),
pubrel: genHeader('pubrel'),
pubrec: genHeader('pubrec')
};
protocol.SUBACK_HEADER = new Buffer([protocol.codes['suback'] << protocol.CMD_SHIFT]);
/* Protocol versions */
protocol.VERSION3 = new Buffer([3])
protocol.VERSION4 = new Buffer([4])
/* QOS */
protocol.QOS = [0, 1, 2].map(function(qos) {
return new Buffer([qos])
})
/* empty packets */
protocol.EMPTY = {
pingreq: new Buffer([protocol.codes['pingreq'] << 4, 0]),
pingresp: new Buffer([protocol.codes['pingresp'] << 4, 0]),
disconnect: new Buffer([protocol.codes['disconnect'] << 4, 0])
};

@@ -1,614 +0,57 @@

'use strict';
var protocol = require('./constants')
, empty = new Buffer(0)
var writeToStream = require('./writeToStream')
, EE = require('events').EventEmitter
, inherits = require('inherits')
function generate(packet) {
var stream = new Accumulator()
writeToStream(packet, stream)
return stream.concat()
}
switch (packet.cmd) {
case 'connect':
return connect(packet)
case 'connack':
return connack(packet)
case 'publish':
return publish(packet)
case 'puback':
case 'pubrec':
case 'pubrel':
case 'pubcomp':
case 'unsuback':
return confirmation(packet)
case 'subscribe':
return subscribe(packet)
case 'suback':
return suback(packet)
case 'unsubscribe':
return unsubscribe(packet)
case 'pingreq':
case 'pingresp':
case 'disconnect':
return emptyPacket(packet)
default:
throw new Error('unknown command')
}
function Accumulator() {
this._array = new Array(20)
this._i = 0
}
function connect(opts) {
var opts = opts || {}
, protocolId = opts.protocolId || 'MQTT'
, protocolVersion = opts.protocolVersion || 4
, will = opts.will
, clean = opts.clean
, keepalive = opts.keepalive || 0
, clientId = opts.clientId || ""
, username = opts.username
, password = opts.password
inherits(Accumulator, EE)
if (clean === undefined) {
clean = true
}
Accumulator.prototype.write = function (chunk) {
this._array[this._i++] = chunk
return true
};
Accumulator.prototype.concat = function () {
var length = 0
, lengths = new Array(this._array.length)
, list = this._array
, pos = 0
, i
, result;
// Must be a string and non-falsy
if (!protocolId ||
(typeof protocolId !== "string" && !Buffer.isBuffer(protocolId))) {
throw new Error('Invalid protocol id')
} else {
length += protocolId.length + 2
}
// Must be a 1 byte number
if (!protocolVersion ||
'number' !== typeof protocolVersion ||
protocolVersion > 255 ||
protocolVersion < 0) {
throw new Error('Invalid protocol version')
} else {
length += 1
}
// ClientId might be omitted in 3.1.1, but only if cleanSession is set to 1
if ((typeof clientId === "string" || Buffer.isBuffer(clientId)) &&
(clientId || protocolVersion == 4) &&
(clientId || clean)) {
length += clientId.length + 2
} else {
if(protocolVersion < 4) {
throw new Error('clientId must be supplied before 3.1.1');
}
if(clean == 0) {
throw new Error('clientId must be given if cleanSession set to 0');
}
}
// Must be a two byte number
if ('number' !== typeof keepalive ||
keepalive < 0 ||
keepalive > 65535) {
throw new Error('Invalid keepalive')
} else {
length += 2
}
// Connect flags
length += 1
// If will exists...
if (will) {
// It must be an object
if ('object' !== typeof will) {
throw new Error('Invalid will')
}
// It must have topic typeof string
if (!will.topic || 'string' !== typeof will.topic) {
throw new Error('Invalid will topic')
for (i = 0; i < list.length && list[i]; i++) {
if (typeof list[i] !== 'string') {
lengths[i] = list[i].length;
} else {
length += Buffer.byteLength(will.topic) + 2
lengths[i] = Buffer.byteLength(list[i]);
}
// Payload
if (will.payload && will.payload) {
if (will.payload.length >= 0) {
if ('string' === typeof will.payload) {
length += Buffer.byteLength(will.payload) + 2
} else {
length += will.payload.length + 2
}
} else {
throw new Error('Invalid will payload')
}
} else {
length += 2
}
length += lengths[i];
}
// Username
if (username) {
if (username.length) {
length += Buffer.byteLength(username) + 2
} else {
throw new Error('Invalid username')
}
}
result = new Buffer(length);
// Password
if (password) {
if (password.length) {
length += byteLength(password) + 2
for (i = 0; i < list.length && list[i]; i++) {
if (typeof list[i] !== 'string') {
list[i].copy(result, pos);
pos += lengths[i];
} else {
throw new Error('Invalid password')
result.write(list[i], pos);
pos += lengths[i];
}
}
var buffer = new Buffer(1 + calcLengthLength(length) + length)
, pos = 0
return result;
};
// Generate header
buffer.writeUInt8(protocol.codes['connect'] << protocol.CMD_SHIFT, pos++, true)
// Generate length
pos += writeLength(buffer, pos, length)
// Generate protocol ID
pos += writeStringOrBuffer(buffer, pos, protocolId)
buffer.writeUInt8(protocolVersion, pos++, true)
// Connect flags
var flags = 0
flags |= username ? protocol.USERNAME_MASK : 0
flags |= password ? protocol.PASSWORD_MASK : 0
flags |= (will && will.retain) ? protocol.WILL_RETAIN_MASK : 0
flags |= (will && will.qos) ?
will.qos << protocol.WILL_QOS_SHIFT : 0
flags |= will ? protocol.WILL_FLAG_MASK : 0
flags |= clean ? protocol.CLEAN_SESSION_MASK : 0
buffer.writeUInt8(flags, pos++, true)
// Keepalive
pos += writeNumber(buffer, pos, keepalive)
// Client ID
pos += writeStringOrBuffer(buffer, pos, clientId)
// Will
if (will) {
pos += writeString(buffer, pos, will.topic)
pos += writeStringOrBuffer(buffer, pos, will.payload)
}
// Username and password
if (username)
pos += writeStringOrBuffer(buffer, pos, username)
if (password)
pos += writeStringOrBuffer(buffer, pos, password)
return buffer
}
function connack(opts) {
var opts = opts || {}
, rc = opts.returnCode;
// Check return code
if ('number' !== typeof rc)
throw new Error('Invalid return code');
var buffer = new Buffer(4)
, pos = 0;
buffer.writeUInt8(protocol.codes['connack'] << protocol.CMD_SHIFT, pos++, true);
pos += writeLength(buffer, pos, 2);
buffer.writeUInt8(opts.sessionPresent && protocol.SESSIONPRESENT_MASK || 0, pos++, true);
buffer.writeUInt8(rc, pos++, true);
return buffer;
}
function publish(opts) {
var opts = opts || {}
, dup = opts.dup ? protocol.DUP_MASK : 0
, qos = opts.qos
, retain = opts.retain ? protocol.RETAIN_MASK : 0
, topic = opts.topic
, payload = opts.payload || empty
, id = opts.messageId;
var length = 0;
// Topic must be a non-empty string or Buffer
if (typeof topic === "string")
length += Buffer.byteLength(topic) + 2;
else if (Buffer.isBuffer(topic))
length += topic.length + 2;
else
throw new Error('Invalid topic');
// get the payload length
if (!Buffer.isBuffer(payload)) {
length += Buffer.byteLength(payload);
} else {
length += payload.length;
}
// Message id must a number if qos > 0
if (qos && 'number' !== typeof id) {
throw new Error('Invalid message id')
} else if (qos) {
length += 2;
}
var buffer = new Buffer(1 + calcLengthLength(length) + length)
, pos = 0;
// Header
buffer.writeUInt8(
protocol.codes['publish'] << protocol.CMD_SHIFT |
dup |
qos << protocol.QOS_SHIFT |
retain, pos++, true);
// Remaining length
pos += writeLength(buffer, pos, length);
// Topic
pos += writeStringOrBuffer(buffer, pos, topic);
// Message ID
if (qos > 0) {
pos += writeNumber(buffer, pos, id);
}
// Payload
if (!Buffer.isBuffer(payload)) {
writeStringNoPos(buffer, pos, payload);
} else {
writeBuffer(buffer, pos, payload);
}
return buffer;
}
/* Puback, pubrec, pubrel and pubcomp */
function confirmation(opts) {
var opts = opts || {}
, type = opts.cmd || 'puback'
, id = opts.messageId
, dup = (opts.dup && type === 'pubrel') ? protocol.DUP_MASK : 0
, qos = 0
if (type === 'pubrel')
qos = 1
// Check message ID
if ('number' !== typeof id)
throw new Error('Invalid message id');
var buffer = new Buffer(4)
, pos = 0;
// Header
buffer[pos++] =
protocol.codes[type] << protocol.CMD_SHIFT |
dup |
qos << protocol.QOS_SHIFT;
// Length
pos += writeLength(buffer, pos, 2);
// Message ID
pos += writeNumber(buffer, pos, id);
return buffer;
}
function subscribe(opts) {
var opts = opts || {}
, dup = opts.dup ? protocol.DUP_MASK : 0
, qos = opts.qos || 0
, id = opts.messageId
, subs = opts.subscriptions;
var length = 0;
// Check mid
if ('number' !== typeof id) {
throw new Error('Invalid message id');
} else {
length += 2;
}
// Check subscriptions
if ('object' === typeof subs && subs.length) {
for (var i = 0; i < subs.length; i += 1) {
var topic = subs[i].topic
, qos = subs[i].qos;
if ('string' !== typeof topic) {
throw new Error('Invalid subscriptions - invalid topic');
}
if ('number' !== typeof qos) {
throw new Error('Invalid subscriptions - invalid qos');
}
length += Buffer.byteLength(topic) + 2 + 1;
}
} else {
throw new Error('Invalid subscriptions');
}
var buffer = new Buffer(1 + calcLengthLength(length) + length)
, pos = 0;
// Generate header
buffer.writeUInt8(
protocol.codes['subscribe'] << protocol.CMD_SHIFT |
dup |
1 << protocol.QOS_SHIFT, pos++, true);
// Generate length
pos += writeLength(buffer, pos, length);
// Generate message ID
pos += writeNumber(buffer, pos, id);
// Generate subs
for (var i = 0; i < subs.length; i++) {
var sub = subs[i]
, topic = sub.topic
, qos = sub.qos;
// Write topic string
pos += writeString(buffer, pos, topic);
// Write qos
buffer.writeUInt8(qos, pos++, true);
}
return buffer;
}
function suback(opts) {
var opts = opts || {}
, id = opts.messageId
, granted = opts.granted;
var length = 0;
// Check message id
if ('number' !== typeof id) {
throw new Error('Invalid message id');
} else {
length += 2;
}
// Check granted qos vector
if ('object' === typeof granted && granted.length) {
for (var i = 0; i < granted.length; i += 1) {
if ('number' !== typeof granted[i]) {
throw new Error('Invalid qos vector');
}
length += 1;
}
} else {
throw new Error('Invalid qos vector');
}
var buffer = new Buffer(1 + calcLengthLength(length) + length)
, pos = 0;
// Header
buffer.writeUInt8(protocol.codes['suback'] << protocol.CMD_SHIFT, pos++, true);
// Length
pos += writeLength(buffer, pos, length);
// Message ID
pos += writeNumber(buffer, pos, id);
// Subscriptions
for (var i = 0; i < granted.length; i++) {
buffer.writeUInt8(granted[i], pos++, true);
}
return buffer;
}
function unsubscribe(opts) {
var opts = opts || {}
, id = opts.messageId
, dup = opts.dup ? protocol.DUP_MASK : 0
, unsubs = opts.unsubscriptions;
var length = 0;
// Check message id
if ('number' !== typeof id) {
throw new Error('Invalid message id');
} else {
length += 2;
}
// Check unsubs
if ('object' === typeof unsubs && unsubs.length) {
for (var i = 0; i < unsubs.length; i += 1) {
if ('string' !== typeof unsubs[i]) {
throw new Error('Invalid unsubscriptions');
}
length += Buffer.byteLength(unsubs[i]) + 2;
}
} else {
throw new Error('Invalid unsubscriptions');
}
var buffer = new Buffer(1 + calcLengthLength(length) + length)
, pos = 0;
// Header
buffer[pos++] =
protocol.codes['unsubscribe'] << protocol.CMD_SHIFT |
dup |
1 << protocol.QOS_SHIFT;
// Length
pos += writeLength(buffer, pos, length);
// Message ID
pos += writeNumber(buffer, pos, id);
// Unsubs
for (var i = 0; i < unsubs.length; i++) {
pos += writeString(buffer, pos, unsubs[i]);
}
return buffer;
}
function emptyPacket(opts) {
var buf = new Buffer(2);
buf[0] = protocol.codes[opts.cmd] << 4;
buf[1] = 0;
return buf;
}
/**
* calcLengthLength - calculate the length of the remaining
* length field
*
* @api private
*/
function calcLengthLength(length) {
if (length >= 0 && length < 128) {
return 1
} else if (length >= 128 && length < 16384) {
return 2
} else if (length >= 16384 && length < 2097152) {
return 3
} else if (length >= 2097152 && length < 268435456) {
return 4
} else {
return 0
}
}
/**
* writeLength - write an MQTT style length field to the buffer
*
* @param <Buffer> buffer - destination
* @param <Number> pos - offset
* @param <Number> length - length (>0)
* @returns <Number> number of bytes written
*
* @api private
*/
function writeLength(buffer, pos, length) {
var digit = 0
, origPos = pos
do {
digit = length % 128 | 0
length = length / 128 | 0
if (length > 0) {
digit = digit | 0x80
}
buffer.writeUInt8(digit, pos++, true)
} while (length > 0)
return pos - origPos
}
/**
* writeString - write a utf8 string to the buffer
*
* @param <Buffer> buffer - destination
* @param <Number> pos - offset
* @param <String> string - string to write
* @return <Number> number of bytes written
*
* @api private
*/
function writeString(buffer, pos, string) {
var strlen = Buffer.byteLength(string)
writeNumber(buffer, pos, strlen)
writeStringNoPos(buffer, pos + 2, string)
return strlen + 2
}
function writeStringNoPos(buffer, pos, string) {
buffer.write(string, pos)
}
/**
* write_buffer - write buffer to buffer
*
* @param <Buffer> buffer - dest buffer
* @param <Number> pos - offset
* @param <Buffer> src - source buffer
* @return <Number> number of bytes written
*
* @api private
*/
function writeBuffer(buffer, pos, src) {
src.copy(buffer, pos)
return src.length
}
/**
* writeNumber - write a two byte number to the buffer
*
* @param <Buffer> buffer - destination
* @param <Number> pos - offset
* @param <String> number - number to write
* @return <Number> number of bytes written
*
* @api private
*/
function writeNumber(buffer, pos, number) {
buffer.writeUInt8(number >> 8, pos, true)
buffer.writeUInt8(number & 0x00FF, pos + 1, true)
return 2
}
/**
* writeStringOrBuffer - write a String or Buffer with the its length prefix
*
* @param <Buffer> buffer - destination
* @param <Number> pos - offset
* @param <String> toWrite - String or Buffer
* @return <Number> number of bytes written
*/
function writeStringOrBuffer(buffer, pos, toWrite) {
var written = 0
if (toWrite && typeof toWrite === 'string') {
written += writeString(buffer, pos + written, toWrite)
} else if (toWrite) {
written += writeNumber(buffer, pos + written, toWrite.length)
written += writeBuffer(buffer, pos + written, toWrite)
} else {
written += writeNumber(buffer, pos + written, 0)
}
return written
}
function byteLength(bufOrString) {
if (Buffer.isBuffer(bufOrString)) {
return bufOrString.length
} else {
return Buffer.byteLength(bufOrString)
}
}
module.exports = generate

@@ -6,1 +6,2 @@

exports.generate = require('./generate')
exports.writeToStream = require('./writeToStream')
{
"name": "mqtt-packet",
"version": "3.4.4",
"version": "4.0.0",
"description": "Parse and generate MQTT packets like a breeze",

@@ -32,2 +32,3 @@ "main": "mqtt.js",

"devDependencies": {
"dev-null": "^0.1.1",
"faucet": "0.0.1",

@@ -38,5 +39,5 @@ "pre-commit": "^1.1.1",

"dependencies": {
"bl": "^0.9.1",
"bl": "^1.0.0",
"inherits": "^2.0.1"
}
}

@@ -90,2 +90,3 @@ mqtt-packet&nbsp;&nbsp;&nbsp;[![Build Status](https://travis-ci.org/mqttjs/mqtt-packet.png)](https://travis-ci.org/mqttjs/mqtt-packet)

* <a href="#generate"><code>mqtt#<b>generate()</b></code></a>
* <a href="#writeToStream"><code>mqtt#<b>writeToStream()</b></code></a>
* <a href="#parser"><code>mqtt#<b>parser()</b></code></a>

@@ -100,2 +101,11 @@

<a name="writeToStream">
### mqtt.writeToStream(object, stream)
Writes the mqtt packet defined by `object` to the given stream.
The object must be one of the ones specified by the [packets](#packets)
section. Emits an `Error` on the stream if a packet cannot be generated.
On node >= 12, this function automatically calls `cork()` on your stream,
and then it calls `uncork()` on the next tick.
<a name="parser">

@@ -102,0 +112,0 @@ ### mqtt.parser()

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc