Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mqtt

Package Overview
Dependencies
Maintainers
2
Versions
203
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mqtt - npm Package Compare versions

Comparing version 0.3.8 to 0.3.9

test/helpers/tls-csr.pem

21

lib/client.js

@@ -38,4 +38,4 @@ /**

if (!this instanceof MqttClient) {
return new MqttClient(stream, options);
if (!(this instanceof MqttClient)) {
return new MqttClient(streamBuilder, options);
}

@@ -154,3 +154,3 @@

// Suppress connection errors
this.stream.on('error', nop);
this.stream.on('error', nop)

@@ -377,2 +377,4 @@ // Echo stream close

this.emit('reconnect');
if (this.conn) {

@@ -393,2 +395,3 @@ this.conn.removeAllListeners();

if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) {
this.emit('offline');
that.reconnectTimer = setInterval(function () {

@@ -415,5 +418,9 @@ that._reconnect();

*/
MqttClient.prototype._cleanUp = function() {
MqttClient.prototype._cleanUp = function(forced) {
this.conn.disconnect();
this.stream.end();
if (forced) {
this.stream.destroy();
} else {
this.stream.end();
}
if (this.pingTimer !== null) {

@@ -468,4 +475,4 @@ clearInterval(this.pingTimer);

} else {
this._cleanUp();
this.emit('close');
// do a forced cleanup since socket will be in bad shape
this._cleanUp(true);
}

@@ -472,0 +479,0 @@ };

@@ -78,13 +78,17 @@ var protocol = require('./protocol');

// Username
if (username && 'string' !== typeof username) {
return new Error('Invalid username');
} else if (username) {
length += username.length + 2;
}
if (username) {
if (username.length) {
length += username.length + 2;
} else {
return new Error('Invalid username')
}
}
// Password
if (password && 'string' !== typeof password) {
return new Error('Invalid password');
} else if (password) {
length += password.length + 2;
if (password) {
if (password.length) {
length += password.length + 2;
} else {
return new Error('Invalid password')
}
}

@@ -130,5 +134,16 @@

// Username and password
if (username) pos += write_string(buffer, pos, username);
if (password) pos += write_string(buffer, pos, password);
if (username && typeof username === 'string')
pos += write_string(buffer, pos, username);
else if (username) {
pos += write_number(buffer, pos, username.length);
pos += write_buffer(buffer, pos, username);
}
if (password && typeof password === 'string')
pos += write_string(buffer, pos, password);
else if (password) {
pos += write_number(buffer, pos, password.length);
pos += write_buffer(buffer, pos, password);
}
return buffer;

@@ -135,0 +150,0 @@ };

@@ -35,3 +35,3 @@ /*

host = brokerUrl.host;
host = brokerUrl.hostname;
query = brokerUrl.query;

@@ -128,4 +128,2 @@

}
tls_opts.rejectUnauthorized = false;

@@ -135,30 +133,41 @@ if (opts.keyPath && opts.certPath) {

tls_opts.cert = fs.readFileSync(opts.certPath);
tls_opts.ca = [];
if (opts.ca) {
for (var i = 0;i<opts.ca.length;i++) {
tls_opts.ca[i] = fs.readFileSync(opts.ca[i]);
}
}
tls_opts.ca = [];
if (opts.ca) {
for (var i = 0;i<opts.ca.length;i++) {
tls_opts.ca[i] = fs.readFileSync(opts.ca[i]);
}
tls_opts.rejectUnauthorized = opts.rejectUnauthorized || false;
}
tls_opts.rejectUnauthorized = opts.rejectUnauthorized || false;
tls_opts.port = port
tls_opts.host = host
builder = function() {
var tls_client = tls.connect(port, host, tls_opts, function() {
if (process.env.NODE_DEBUG) {
if (tls_client.authorized) {
console.log("Connection authorized by a Certificate Authority.");
} else {
console.log("Connection not authorized: " + tls_client.authorizationError)
}
}
})
var tls_client = tls.connect(tls_opts)
tls_client.on('secureConnect', function() {
if (tls_opts.rejectUnauthorized && !tls_client.authorized) {
throw new Error('TLS not authorized');
tls_client.emit('error', new Error('TLS not authorized'));
} else {
tls_client.removeListener('error', handleTLSerrors)
}
});
function handleTLSerrors(err) {
// How can I get verify this error is a tls error?
if (tls_opts.rejectUnauthorized) {
mqttClient.emit('error', err)
}
// close this connection to match the behaviour of net
// otherwise all we get is an error from the tls_client
// and close event doesn't fire. This is a work around
// to enable the reconnect code to work the same as with
// net.createConnection
tls_client.end();
}
tls_client.on('error', handleTLSerrors)
return tls_client;

@@ -165,0 +174,0 @@ };

@@ -17,3 +17,3 @@ /* Parse - packet parsing */

parser.connect = function(buf, packet) {
parser.connect = function(buf, packet, encoding) {
parser._pos = 0;

@@ -77,3 +77,3 @@ parser._len = buf.length;

if(flags.username) {
username = this.parse_string(buf);
username = this.parse_string(buf, encoding);
if(username === null) return new Error('Parse error - cannot parse username');

@@ -85,3 +85,3 @@ packet.username = username;

if(flags.password) {
password = this.parse_string(buf);
password = this.parse_string(buf, encoding);
if(password === null) return ;

@@ -200,3 +200,3 @@ packet.password = password;

// Parse topic
topic = parser.parse_string(buf, parser._len, parser._pos);
topic = parser.parse_string(buf);
if(topic === null) return new Error('Parse error - cannot parse topic');

@@ -231,3 +231,3 @@

parser.parse_string = function(buf) {
parser.parse_string = function(buf, encoding) {
var length = parser.parse_num(buf)

@@ -238,3 +238,3 @@ , result;

result = utils.toString(parser, buf, length);
result = utils.toString(parser, buf, length, encoding);

@@ -241,0 +241,0 @@ parser._pos += length;

@@ -13,4 +13,9 @@ var bops = require('bops');

module.exports.toString = function(parser, buf, length) {
module.exports.toString = function(parser, buf, length, encoding) {
if (encoding !== 'binary') {
buf = bops.to(buf, encoding);
}
return bops.to(bops.subarray(buf, parser._pos, parser._pos + length), 'utf8');
};

@@ -13,4 +13,8 @@

module.exports.toString = function(parser, buf, length) {
return buf.toString('utf8', parser._pos, parser._pos + length);
module.exports.toString = function(parser, buf, length, encoding) {
if (encoding !== 'binary') {
return buf.toString(encoding || 'utf8', parser._pos, parser._pos + length);
} else {
return buf.slice(parser._pos, parser._pos + length);
}
};

@@ -5,3 +5,3 @@ {

"description": "A library for the MQTT protocol",
"version": "0.3.8",
"version": "0.3.9",
"contributors": [

@@ -24,2 +24,3 @@ "Matteo Collina <matteo.collina@gmail.com> (https://github.com/mcollina)"

},
"pre-commit": "test",
"bin": {

@@ -42,4 +43,6 @@ "mqtt_pub": "./bin/mqtt_pub",

"mocha": "*",
"should": "*"
"should": "*",
"pre-commit": "0.0.7",
"sinon": "~1.10.0"
}
}

@@ -5,3 +5,4 @@

*/
var should = require('should');
var should = require('should')
, MqttClient = require('../lib/client');

@@ -13,4 +14,7 @@ module.exports = function(server, createClient, port) {

client.stream.end();
client.once('connect', function() {
client.stream.end();
});
client.once('close', function() {
client.end()
done();

@@ -24,2 +28,3 @@ });

client.stream.once('close', function() {
client.end();
if (!client.connected) {

@@ -41,2 +46,3 @@ done();

should.not.exist(client.pingTimer);
client.end();
done();

@@ -83,2 +89,3 @@ });

server.once('client', function(client) {
client.disconnect()
done();

@@ -94,2 +101,3 @@ });

packet.clientId.should.match(/mqttjs.*/);
client.disconnect()
done();

@@ -106,2 +114,3 @@ });

packet.clean.should.be.true;
client.disconnect();
done();

@@ -119,2 +128,3 @@ });

packet.clientId.should.match(/testclient/);
client.disconnect();
done();

@@ -133,2 +143,3 @@ });

packet.clean.should.be.false;
client.disconnect();
done();

@@ -146,3 +157,3 @@ });

} catch(err) {
process.nextTick(done);
done();
}

@@ -157,2 +168,3 @@ });

packet.clientId.should.match(/testclient/);
client.disconnect();
done();

@@ -165,3 +177,6 @@ });

var client = createClient(port);
client.once('connect', done);
client.once('connect', function() {
client.end();
done();
});
client.once('error', done);

@@ -178,2 +193,3 @@ });

}
client.end();
});

@@ -188,2 +204,3 @@ });

client.once('error', function(error) {
client.end();
done();

@@ -194,6 +211,8 @@ });

it('should have different client ids', function() {
var client1 = createClient(port).options.clientId
, client2 = createClient(port).options.clientId;
var client1 = createClient(port)
, client2 = createClient(port);
client1.should.not.equal(client2);
client1.options.clientId.should.not.equal(client2.options.clientId);
client1.end();
client2.end();
});

@@ -380,2 +399,3 @@ });

should.exist(client.pingTimer);
client.end()
done();

@@ -388,2 +408,3 @@ });

should.not.exist(client.pingTimer);
client.end()
done();

@@ -393,9 +414,11 @@ });

it('should reconnect if pingresp is not sent', function(done) {
var client = createClient(port, {keepalive:1, reconnectPeriod: 200});
var client = createClient(port, {keepalive:1, reconnectPeriod: 50});
// Fake no pingresp being send by stubbing the _handlePingresp function
client._handlePingresp = function () {};
client.once('close', function() {
client.once('connect', function() {
client.once('connect', function() {
true.should.equal(true);
done();
client.end()
done();
});

@@ -689,2 +712,42 @@ });

it('should emit \'reconnect\' when reconnecting', function(done) {
var client = createClient(port)
, tryReconnect = true
, reconnectEvent = false;
client.on('reconnect', function() {
reconnectEvent = true;
});
client.on('connect', function() {
if (tryReconnect) {
client.stream.end();
tryReconnect = false;
} else {
reconnectEvent.should.equal(true);
done();
}
});
});
it('should emit \'offline\' after going offline', function(done) {
var client = createClient(port)
, tryReconnect = true
, offlineEvent = false;
client.on('offline', function() {
offlineEvent = true;
});
client.on('connect', function() {
if (tryReconnect) {
client.stream.end();
tryReconnect = false;
} else {
offlineEvent.should.equal(true);
done();
}
});
});
it('should not reconnect if it was ended by the user', function(done) {

@@ -714,7 +777,7 @@ var client = createClient(port);

it('should allow specification of a reconnect period', function(done) {
this.timeout(2200);
var client = createClient(port, {reconnectPeriod: 2000})
var period = 200
, client = createClient(port, {reconnectPeriod: period})
, reconnect = false;
var start = process.hrtime()
var start = Date.now()
, end;

@@ -727,4 +790,5 @@

} else {
end = process.hrtime(start);
if (end[0] === 2) {
client.end();
end = Date.now()
if (end - start >= period) {
// Connected in about 2 seconds, that's good enough

@@ -731,0 +795,0 @@ done();

@@ -6,2 +6,3 @@ /**

var mqtt = require('..')
, should = require('should')
, abstractClientTests = require("./abstract_client");

@@ -77,6 +78,24 @@

describe('MqttClient', function() {
describe('creating', function() {
it('should allow instantiation of MqttClient without the \'new\' operator' , function(done) {
should(function() {
var client;
try {
client = mqtt.MqttClient(function() {
throw Error('break');
}, {});
} catch (err) {
if (err.message !== 'break') {
throw err;
}
done();
}
}).not.throw("Object #<Object> has no method '_setupStream'");
});
});
abstractClientTests(server, createClient, port);
describe('message ids', function() {
it('should increment the message id', function() {

@@ -83,0 +102,0 @@ var client = createClient();

/**
* Testing requires
* Testing requires
*/

@@ -30,9 +30,9 @@ var should = require('should')

var fixture = [
16, 18, // Header
16, 18, // Header
0, 6, // Protocol id length
77, 81, 73, 115, 100, 112, // Protocol id
3, // Protocol version
0, // Connect flags
0, 30, // Keepalive
0, 4, //Client id length
0, // Connect flags
0, 30, // Keepalive
0, 4, //Client id length
116, 101, 115, 116 // Client id

@@ -72,16 +72,16 @@ ];

var fixture = [
16, 54, // Header
0, 6, // Protocol id length
77, 81, 73, 115, 100, 112, // Protocol id
3, // Protocol version
246, // Connect flags
0, 30, // Keepalive
0, 4, // Client id length
116, 101, 115, 116, // Client id
0, 5, // will topic length
116, 111, 112, 105, 99, // will topic
16, 54, // Header
0, 6, // Protocol id length
77, 81, 73, 115, 100, 112, // Protocol id
3, // Protocol version
246, // Connect flags
0, 30, // Keepalive
0, 4, // Client id length
116, 101, 115, 116, // Client id
0, 5, // will topic length
116, 111, 112, 105, 99, // will topic
0, 7, // will payload length
112, 97, 121, 108, 111, 97, 100, // will payload
0, 8, // username length
117, 115, 101, 114, 110, 97, 109, 101, // username
0, 8, // username length
117, 115, 101, 114, 110, 97, 109, 101, // username
0, 8, // password length

@@ -99,2 +99,45 @@ 112, 97, 115, 115, 119, 111, 114, 100 //password

it('should handle binary username/password', function(done) {
var expected = {
cmd: "connect",
retain: false,
qos: 0,
dup: false,
length: 28,
protocolId: "MQIsdp",
protocolVersion: 3,
clean: false,
keepalive: 30,
clientId: "test",
username: new Buffer([12, 13, 14]),
password: new Buffer([15, 16, 17]),
};
var fixture = [
16, 28, // Header
0, 6, // Protocol id length
77, 81, 73, 115, 100, 112, // Protocol id
3, // Protocol version
0x80 | 0x40, // Connect flags
0, 30, // Keepalive
0, 4, //Client id length
116, 101, 115, 116, // Client id
0, 3, // username length
12, 13, 14, // username
0, 3, // password length
15, 16, 17 //password
];
this.stream.write(new Buffer(fixture));
this.conn.setPacketEncoding('binary');
this.conn.once('connect', function(packet) {
packet.username.toString('hex').should.eql(expected.username.toString('hex'));
packet.password.toString('hex').should.eql(expected.password.toString('hex'));
done();
});
});
describe('parse errors', function() {

@@ -127,3 +170,3 @@ it('should say protocol not parseable', function(done) {

}
var fixture = [32, 2, 0, 0];

@@ -148,3 +191,3 @@

}
var fixture = [32, 2, 0, 5];

@@ -174,3 +217,3 @@

var fixture = [
48, 10, // Header
48, 10, // Header
0, 4, // Topic length

@@ -285,3 +328,3 @@ 116, 101, 115, 116, // Topic (test)

var fixture = [
48, 6, // Header
48, 6, // Header
0, 4, // Topic length

@@ -312,3 +355,3 @@ 116, 101, 115, 116 // Topic

var fixture1 = [
48, 10, // Header
48, 10, // Header
0, 4, // Topic length

@@ -516,4 +559,4 @@ 116, 101, 115, 116 // Topic (test)

dup: false,
length: 5,
granted: [0, 1, 2],
length: 6,
granted: [0, 1, 2, 128],
messageId: 6

@@ -523,5 +566,5 @@ };

var fixture = [
144, 5, // Header
144, 6, // Header
0, 6, // Message id
0, 1, 2 // Granted qos (0, 1, 2)
0, 1, 2, 128 // Granted qos (0, 1, 2) and a rejected being 0x80
];

@@ -570,3 +613,3 @@

});
describe('unsuback', function() {

@@ -573,0 +616,0 @@ it('should fire a unsuback event', function(done) {

@@ -91,2 +91,37 @@ /**

it('should send a connect packet with binary username/password', function(done) {
var expected = new Buffer([
16, 28, // Header
0, 6, 77, 81, 73, 115, 100, 112, // Protocol Id
3, // Protocol version
0x40 | 0x80, // Connect flags
0, 30, // Keepalive
0, 4, // Client id length
116, 101, 115, 116, // Client Id
0, 3, // username length
12, 13, 14, // username
0, 3, // password length
15, 16, 17 //password
]);
var fixture = {
protocolId: 'MQIsdp',
protocolVersion: 3,
clientId: 'test',
keepalive: 30,
username: new Buffer([12, 13, 14]),
password: new Buffer([15, 16, 17])
};
this.conn.setPacketEncoding('binary');
this.conn.connect(fixture);
var that = this;
this.stream.on('readable', function() {
var packet = that.stream.read();
packet.should.eql(expected);
done();
});
});
describe('invalid options', function () {

@@ -93,0 +128,0 @@ describe('protocol id', function () {

@@ -6,3 +6,4 @@ /**

var should = require('should')
, net = require('net');
, net = require('net')
, sinon = require('sinon');

@@ -52,2 +53,8 @@ /**

it('should return an MqttClient with correct host when called with a host and port', function () {
sinon.spy(mqtt, "createClient");
var c = mqtt.connect('tcp://user:pass@localhost:1883');
mqtt.createClient.calledWith('1883', 'localhost').should.be.ok;
});
it('should throw an error when connect is called without a brokerUrl', function () {

@@ -68,2 +75,4 @@ (function(){

c.on('error', function() {});
c.should.be.instanceOf(mqtt.MqttClient);

@@ -75,2 +84,4 @@ });

c.on('error', function() {});
c.should.be.instanceOf(mqtt.MqttClient);

@@ -98,2 +109,4 @@ });

c.on('error', function() {});
c.should.be.instanceOf(mqtt.MqttClient);

@@ -108,2 +121,4 @@ });

c.on('error', function() {});
c.should.be.instanceOf(mqtt.MqttClient);

@@ -119,2 +134,4 @@ });

c.on('error', function() {});
c.should.be.instanceOf(mqtt.MqttClient);

@@ -130,2 +147,4 @@ });

c.on('error', function() {});
c.should.be.instanceOf(mqtt.MqttClient);

@@ -132,0 +151,0 @@ });

@@ -21,2 +21,4 @@

var WRONG_CERT = __dirname + '/helpers/wrong-cert.pem';
/**

@@ -30,2 +32,3 @@ * Test server

} else {
server.emit('connect', client);
client.connack({returnCode: 0});

@@ -80,2 +83,49 @@ }

abstractClientTests(server, createClient, port);
if (!process.version.match(/^v0.8/)) {
describe('with secure parameters', function() {
it('should validate successfully the CA', function (done) {
var client = createClient(port, {
ca: [CERT],
rejectUnauthorized: true
});
client.on('error', done)
server.once('connect', function(client) {
done();
});
});
it('should validate unsuccessfully the CA', function (done) {
var client = createClient(port, {
ca: [WRONG_CERT],
rejectUnauthorized: true
});
server.once('connect', function(client) {
done(new Error('it should not happen'));
});
client.once('error', function() {
done()
})
});
it('should emit close on TLS error', function (done) {
var client = createClient(port, {
ca: [WRONG_CERT],
rejectUnauthorized: true
});
client.on('error', function() {})
// TODO node v0.8.x emits multiple close events
client.once('close', function() {
done()
})
});
})
}
});

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc