Comparing version 0.3.2 to 0.3.3
@@ -6,3 +6,3 @@ var mqtt = require('../..') | ||
client.subscribe('messages'); | ||
client.subscribe('presence'); | ||
client.publish('presence', 'bin hier'); | ||
@@ -9,0 +9,0 @@ client.on('message', function (topic, message) { |
var mqtt = require('../..') | ||
, client = mqtt.createClient(); | ||
client.publish('messages', 'hello!'); | ||
client.publish('presence', 'hello!'); | ||
client.end(); |
var mqtt = require('../..') | ||
, client = mqtt.createClient(); | ||
client.subscribe('test'); | ||
client.subscribe('presence'); | ||
client.on('message', function(topic, message) { | ||
console.log(message); | ||
}); |
@@ -146,3 +146,3 @@ /** | ||
// MqttConnection | ||
this.conn = this.stream.pipe(new Connection( { encoding: this.options.encoding })); | ||
this.conn = this.stream.pipe(new Connection()); | ||
@@ -149,0 +149,0 @@ // Set encoding of incoming publish payloads |
@@ -7,3 +7,4 @@ var events = require('events') | ||
, Stream = require("stream").Stream | ||
, Writable = require("stream").Writable; | ||
, Writable = require("stream").Writable | ||
, bops = require('bops'); | ||
@@ -21,5 +22,10 @@ if (!Writable) { | ||
this.generate = generate; | ||
this._bops = bops; | ||
Writable.call(this); | ||
var options = {}; | ||
options.objectMode = typeof global.WebSocket !== 'undefined'; | ||
Writable.call(this, options); | ||
this._newPacket(); | ||
@@ -75,3 +81,3 @@ | ||
Connection.prototype._parseLength = function() { | ||
var result = true, data = this.data, byte; | ||
var result = true, data = this.data, readByte; | ||
@@ -84,10 +90,10 @@ if (this.packet.length === undefined) { | ||
byte = data[this.index++]; | ||
readByte = bops.readUInt8(data, this.index++); | ||
while (this.tmp.pos++ < 4) { | ||
this.tmp.length += | ||
this.tmp.mul * (byte & protocol.LENGTH_MASK); | ||
this.tmp.mul * (readByte & protocol.LENGTH_MASK); | ||
this.tmp.mul *= 0x80; | ||
if ((byte & protocol.LENGTH_FIN_MASK) === 0) { | ||
if ((readByte & protocol.LENGTH_FIN_MASK) === 0) { | ||
break; | ||
@@ -101,3 +107,3 @@ } | ||
byte = data[this.index++]; | ||
readByte = bops.readUInt8(data, this.index++); | ||
} | ||
@@ -122,3 +128,3 @@ | ||
if (this.partialPayload) { | ||
this.data = Buffer.concat([this.partialPayload, this.data]); | ||
this.data = bops.join([this.partialPayload, this.data]); | ||
this.partialPayload = null; | ||
@@ -132,3 +138,3 @@ } | ||
if (this.index < this.data.length) { | ||
this.partialPayload = this.data.slice(this.index); | ||
this.partialPayload = bops.subarray(this.data, this.index); | ||
} | ||
@@ -148,3 +154,3 @@ | ||
" 'use strict'; \n" + | ||
" var buf = this.data.slice(this.index, this.index + this.packet.length); \n" + | ||
" var buf = this._bops.subarray(this.data, this.index, this.index + this.packet.length); \n" + | ||
" var result = null; \n" + | ||
@@ -196,3 +202,3 @@ " this.index += this.packet.length; \n" + | ||
if (that.data.length > that.index) { | ||
that._write(that.data.slice(that.index), encoding, done); | ||
that._write(bops.subarray(that.data, that.index), encoding, done); | ||
} else { | ||
@@ -199,0 +205,0 @@ done(); |
var protocol = require('./protocol'); | ||
var bops = require('bops'); | ||
@@ -90,7 +91,7 @@ // Connect | ||
var buffer = new Buffer(1 + calc_length_length(length) + length) | ||
var buffer = bops.create(1 + calc_length_length(length) + length) | ||
, pos = 0; | ||
// Generate header | ||
buffer[pos++] = protocol.codes['connect'] << protocol.CMD_SHIFT; | ||
bops.writeUInt8(buffer, protocol.codes['connect'] << protocol.CMD_SHIFT, pos++); | ||
@@ -102,3 +103,3 @@ // Generate length | ||
pos += write_string(buffer, pos, protocolId); | ||
buffer[pos++] = protocolVersion; | ||
bops.writeUInt8(buffer, protocolVersion, pos++); | ||
@@ -115,3 +116,3 @@ // Connect flags | ||
buffer[pos++] = flags; | ||
bops.writeUInt8(buffer, flags, pos++); | ||
@@ -148,6 +149,6 @@ // Keepalive | ||
var buffer = new Buffer(4) | ||
var buffer = bops.create(4) | ||
, pos = 0; | ||
buffer[pos++] = protocol.codes['connack'] << protocol.CMD_SHIFT; | ||
bops.writeUInt8(buffer, protocol.codes['connack'] << protocol.CMD_SHIFT, pos++); | ||
pos += write_length(buffer, pos, 2); | ||
@@ -160,3 +161,3 @@ pos += write_number(buffer, pos, rc); | ||
// Publish | ||
var empty = new Buffer(0); | ||
var empty = bops.create(0); | ||
module.exports.publish = function(opts) { | ||
@@ -181,3 +182,3 @@ var opts = opts || {} | ||
// get the payload length | ||
if (!Buffer.isBuffer(payload)) { | ||
if (!bops.is(payload)) { | ||
length += Buffer.byteLength(payload); | ||
@@ -195,3 +196,3 @@ } else { | ||
var buffer = new Buffer(1 + calc_length_length(length) + length) | ||
var buffer = bops.create(1 + calc_length_length(length) + length) | ||
, pos = 0; | ||
@@ -218,4 +219,4 @@ | ||
// Payload | ||
if (!Buffer.isBuffer(payload)) { | ||
buffer.write(payload, pos); | ||
if (!bops.is(payload)) { | ||
write_string_no_pos(buffer, pos, payload); | ||
} else { | ||
@@ -240,3 +241,3 @@ write_buffer(buffer, pos, payload); | ||
var buffer = new Buffer(4) | ||
var buffer = bops.create(4) | ||
, pos = 0; | ||
@@ -304,10 +305,10 @@ | ||
var buffer = new Buffer(1 + calc_length_length(length) + length) | ||
var buffer = bops.create(1 + calc_length_length(length) + length) | ||
, pos = 0; | ||
// Generate header | ||
buffer[pos++] = | ||
bops.writeUInt8(buffer, | ||
protocol.codes['subscribe'] << protocol.CMD_SHIFT | | ||
dup | | ||
1 << protocol.QOS_SHIFT; | ||
1 << protocol.QOS_SHIFT, pos++); | ||
@@ -329,3 +330,3 @@ // Generate length | ||
// Write qos | ||
buffer[pos++] = qos; | ||
bops.writeUInt8(buffer, qos, pos++); | ||
} | ||
@@ -362,7 +363,7 @@ | ||
var buffer = new Buffer(1 + calc_length_length(length) + length) | ||
var buffer = bops.create(1 + calc_length_length(length) + length) | ||
, pos = 0; | ||
// Header | ||
buffer[pos++] = protocol.codes['suback'] << protocol.CMD_SHIFT; | ||
bops.writeUInt8(buffer, protocol.codes['suback'] << protocol.CMD_SHIFT, pos++); | ||
@@ -377,3 +378,3 @@ // Length | ||
for (var i = 0; i < granted.length; i++) { | ||
buffer[pos++] = granted[i]; | ||
bops.writeUInt8(buffer, granted[i], pos++); | ||
} | ||
@@ -411,3 +412,3 @@ | ||
var buffer = new Buffer(1 + calc_length_length(length) + length) | ||
var buffer = bops.create(1 + calc_length_length(length) + length) | ||
, pos = 0; | ||
@@ -449,3 +450,3 @@ | ||
return function(opts) { | ||
var buf = new Buffer(2); | ||
var buf = bops.create(2); | ||
buf[0] = protocol.codes[type] << 4; | ||
@@ -500,3 +501,3 @@ buf[1] = 0; | ||
} | ||
buffer[pos++] = digit; | ||
bops.writeUInt8(buffer, digit, pos++); | ||
} while (length > 0); | ||
@@ -519,10 +520,19 @@ | ||
function write_string(buffer, pos, string) { | ||
var strlen = string.length; | ||
var strlen = Buffer.byteLength(string); | ||
write_number(buffer, pos, strlen); | ||
buffer.write(string, pos+2); | ||
write_string_no_pos(buffer, pos + 2, string); | ||
return strlen + 2; | ||
}; | ||
function write_string_no_pos(buffer, pos, string) { | ||
if (Buffer.isBuffer(buffer)) { | ||
buffer.write(string, pos); | ||
} else { | ||
var bufString = bops.from(string, 'utf8'); | ||
bops.copy(bufString, buffer, pos, 0, bufString.length); | ||
} | ||
} | ||
/** | ||
@@ -540,3 +550,3 @@ * write_buffer - write buffer to buffer | ||
function write_buffer(buffer, pos, src) { | ||
src.copy(buffer, pos); | ||
bops.copy(src, buffer, pos); | ||
return src.length; | ||
@@ -556,6 +566,6 @@ } | ||
function write_number(buffer, pos, number) { | ||
buffer[pos] = number >> 8; | ||
buffer[pos+1] = number & 0x00FF; | ||
bops.writeUInt8(buffer, number >> 8, pos); | ||
bops.writeUInt8(buffer, number & 0x00FF, pos + 1); | ||
return 2; | ||
}; |
/* Parse - packet parsing */ | ||
var protocol = require('./protocol'); | ||
var bops = require('bops'); | ||
var utils = require('./parsing_utils'); | ||
@@ -7,9 +9,10 @@ var parser = module.exports; | ||
parser.header = function(buf, packet) { | ||
packet.cmd = protocol.types[buf[0] >> protocol.CMD_SHIFT]; | ||
packet.retain = (buf[0] & protocol.RETAIN_MASK) !== 0; | ||
packet.qos = (buf[0] >> protocol.QOS_SHIFT) & protocol.QOS_MASK; | ||
packet.dup = (buf[0] & protocol.DUP_MASK) !== 0; | ||
var zero = bops.readUInt8(buf, 0); | ||
packet.cmd = protocol.types[zero >> protocol.CMD_SHIFT]; | ||
packet.retain = (zero & protocol.RETAIN_MASK) !== 0; | ||
packet.qos = (zero >> protocol.QOS_SHIFT) & protocol.QOS_MASK; | ||
packet.dup = (zero & protocol.DUP_MASK) !== 0; | ||
return packet; | ||
}; | ||
parser.connect = function(buf, packet) { | ||
@@ -34,17 +37,17 @@ parser._pos = 0; | ||
if(parser._pos > parser._len) return null; | ||
packet.protocolVersion = buf[parser._pos]; | ||
packet.protocolVersion = bops.readUInt8(buf, parser._pos); | ||
parser._pos += 1; | ||
// Parse connect flags | ||
flags.username = (buf[parser._pos] & protocol.USERNAME_MASK); | ||
flags.password = (buf[parser._pos] & protocol.PASSWORD_MASK); | ||
flags.will = (buf[parser._pos] & protocol.WILL_FLAG_MASK); | ||
flags.username = (bops.readUInt8(buf, parser._pos) & protocol.USERNAME_MASK); | ||
flags.password = (bops.readUInt8(buf, parser._pos) & protocol.PASSWORD_MASK); | ||
flags.will = (bops.readUInt8(buf, parser._pos) & protocol.WILL_FLAG_MASK); | ||
if(flags.will) { | ||
packet.will = {}; | ||
packet.will.retain = (buf[parser._pos] & protocol.WILL_RETAIN_MASK) !== 0; | ||
packet.will.qos = (buf[parser._pos] & protocol.WILL_QOS_MASK) >> protocol.WILL_QOS_SHIFT; | ||
packet.will.retain = (bops.readUInt8(buf, parser._pos) & protocol.WILL_RETAIN_MASK) !== 0; | ||
packet.will.qos = (bops.readUInt8(buf, parser._pos) & protocol.WILL_QOS_MASK) >> protocol.WILL_QOS_SHIFT; | ||
} | ||
packet.clean = (buf[parser._pos] & protocol.CLEAN_SESSION_MASK) !== 0; | ||
packet.clean = (bops.readUInt8(buf, parser._pos) & protocol.CLEAN_SESSION_MASK) !== 0; | ||
parser._pos += 1; | ||
@@ -115,11 +118,5 @@ | ||
} | ||
utils.parseEncodedPayload(parser, buf, encoding, packet); | ||
// Parse the payload | ||
/* No checks - whatever remains in the packet is the payload */ | ||
if (encoding !== 'binary') { | ||
packet.payload = buf.toString(encoding, parser._pos, parser._len); | ||
} else { | ||
packet.payload = buf.slice(parser._pos, parser._len); | ||
} | ||
return packet; | ||
@@ -228,3 +225,3 @@ } | ||
var result = buf.readUInt16BE(parser._pos); | ||
var result = bops.readUInt16BE(buf, parser._pos); | ||
parser._pos += 2; | ||
@@ -235,7 +232,9 @@ return result; | ||
parser.parse_string = function(buf) { | ||
var length = parser.parse_num(buf); | ||
var length = parser.parse_num(buf) | ||
, result; | ||
if(length === null || length + parser._pos > parser._len) return null; | ||
var result = buf.toString('utf8', parser._pos, parser._pos + length); | ||
result = utils.toString(parser, buf, length); | ||
parser._pos += length; | ||
@@ -242,0 +241,0 @@ |
@@ -5,3 +5,3 @@ { | ||
"description": "A library for the MQTT protocol", | ||
"version": "0.3.2", | ||
"version": "0.3.3", | ||
"contributors": [ | ||
@@ -31,4 +31,9 @@ "Matteo Collina <matteo.collina@gmail.com> (https://github.com/mcollina)" | ||
}, | ||
"browser": { | ||
"./lib/mqtt.js": "./lib/browser.js", | ||
"./lib/parsing_utils.js": "./lib/parsing_utils_browser.js" | ||
}, | ||
"dependencies": { | ||
"readable-stream": "~1.0.2" | ||
"readable-stream": "~1.0.2", | ||
"bops": "~0.0.7" | ||
}, | ||
@@ -35,0 +40,0 @@ "devDependencies": { |
@@ -28,2 +28,35 @@ # mqtt.js [![Build Status](https://travis-ci.org/adamvr/MQTT.js.png)](https://travis-ci.org/adamvr/MQTT.js) | ||
## Example | ||
First you will need to install and run a broker, such as | ||
[Mosquitto](http://mosquitto.org) or | ||
[Mosca](http://mcollina.github.io/mosca/), and launch it. | ||
For the sake of simplicity, let's put the subscriber and the publisher in the same file: | ||
```js | ||
var mqtt = require('mqtt') | ||
client = mqtt.createClient(1883, 'localhost'); | ||
client.subscribe('presence'); | ||
client.publish('presence', 'Hello mqtt'); | ||
client.on('message', function (topic, message) { | ||
console.log(message); | ||
}); | ||
client.end(); | ||
``` | ||
output: | ||
``` | ||
Hello mqtt | ||
``` | ||
If you do not want to install a separate broker, you can try using the | ||
[server/orig](https://github.com/adamvr/MQTT.js/blob/master/examples/server/orig.js) | ||
example. | ||
It implements enough of the semantics of the MQTT protocol to | ||
run the example. | ||
## Documentation | ||
@@ -30,0 +63,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Native code
Supply chain riskContains native code (e.g., compiled binaries or shared libraries). Including native code can obscure malicious behavior.
Found 3 instances in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Native code
Supply chain riskContains native code (e.g., compiled binaries or shared libraries). Including native code can obscure malicious behavior.
Found 1 instance in 1 package
465855
87
4807
161
2
24
+ Addedbops@~0.0.7
+ Addedbase64-js@0.0.2(transitive)
+ Addedbops@0.0.7(transitive)
+ Addedto-utf8@0.0.1(transitive)