Comparing version 0.3.8 to 0.3.9
@@ -38,4 +38,4 @@ /** | ||
if (!this instanceof MqttClient) { | ||
return new MqttClient(stream, options); | ||
if (!(this instanceof MqttClient)) { | ||
return new MqttClient(streamBuilder, options); | ||
} | ||
@@ -154,3 +154,3 @@ | ||
// Suppress connection errors | ||
this.stream.on('error', nop); | ||
this.stream.on('error', nop) | ||
@@ -377,2 +377,4 @@ // Echo stream close | ||
this.emit('reconnect'); | ||
if (this.conn) { | ||
@@ -393,2 +395,3 @@ this.conn.removeAllListeners(); | ||
if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) { | ||
this.emit('offline'); | ||
that.reconnectTimer = setInterval(function () { | ||
@@ -415,5 +418,9 @@ that._reconnect(); | ||
*/ | ||
MqttClient.prototype._cleanUp = function() { | ||
MqttClient.prototype._cleanUp = function(forced) { | ||
this.conn.disconnect(); | ||
this.stream.end(); | ||
if (forced) { | ||
this.stream.destroy(); | ||
} else { | ||
this.stream.end(); | ||
} | ||
if (this.pingTimer !== null) { | ||
@@ -468,4 +475,4 @@ clearInterval(this.pingTimer); | ||
} else { | ||
this._cleanUp(); | ||
this.emit('close'); | ||
// do a forced cleanup since socket will be in bad shape | ||
this._cleanUp(true); | ||
} | ||
@@ -472,0 +479,0 @@ }; |
@@ -78,13 +78,17 @@ var protocol = require('./protocol'); | ||
// Username | ||
if (username && 'string' !== typeof username) { | ||
return new Error('Invalid username'); | ||
} else if (username) { | ||
length += username.length + 2; | ||
} | ||
if (username) { | ||
if (username.length) { | ||
length += username.length + 2; | ||
} else { | ||
return new Error('Invalid username') | ||
} | ||
} | ||
// Password | ||
if (password && 'string' !== typeof password) { | ||
return new Error('Invalid password'); | ||
} else if (password) { | ||
length += password.length + 2; | ||
if (password) { | ||
if (password.length) { | ||
length += password.length + 2; | ||
} else { | ||
return new Error('Invalid password') | ||
} | ||
} | ||
@@ -130,5 +134,16 @@ | ||
// Username and password | ||
if (username) pos += write_string(buffer, pos, username); | ||
if (password) pos += write_string(buffer, pos, password); | ||
if (username && typeof username === 'string') | ||
pos += write_string(buffer, pos, username); | ||
else if (username) { | ||
pos += write_number(buffer, pos, username.length); | ||
pos += write_buffer(buffer, pos, username); | ||
} | ||
if (password && typeof password === 'string') | ||
pos += write_string(buffer, pos, password); | ||
else if (password) { | ||
pos += write_number(buffer, pos, password.length); | ||
pos += write_buffer(buffer, pos, password); | ||
} | ||
return buffer; | ||
@@ -135,0 +150,0 @@ }; |
@@ -35,3 +35,3 @@ /* | ||
host = brokerUrl.host; | ||
host = brokerUrl.hostname; | ||
query = brokerUrl.query; | ||
@@ -128,4 +128,2 @@ | ||
} | ||
tls_opts.rejectUnauthorized = false; | ||
@@ -135,30 +133,41 @@ if (opts.keyPath && opts.certPath) { | ||
tls_opts.cert = fs.readFileSync(opts.certPath); | ||
tls_opts.ca = []; | ||
if (opts.ca) { | ||
for (var i = 0;i<opts.ca.length;i++) { | ||
tls_opts.ca[i] = fs.readFileSync(opts.ca[i]); | ||
} | ||
} | ||
tls_opts.ca = []; | ||
if (opts.ca) { | ||
for (var i = 0;i<opts.ca.length;i++) { | ||
tls_opts.ca[i] = fs.readFileSync(opts.ca[i]); | ||
} | ||
tls_opts.rejectUnauthorized = opts.rejectUnauthorized || false; | ||
} | ||
tls_opts.rejectUnauthorized = opts.rejectUnauthorized || false; | ||
tls_opts.port = port | ||
tls_opts.host = host | ||
builder = function() { | ||
var tls_client = tls.connect(port, host, tls_opts, function() { | ||
if (process.env.NODE_DEBUG) { | ||
if (tls_client.authorized) { | ||
console.log("Connection authorized by a Certificate Authority."); | ||
} else { | ||
console.log("Connection not authorized: " + tls_client.authorizationError) | ||
} | ||
} | ||
}) | ||
var tls_client = tls.connect(tls_opts) | ||
tls_client.on('secureConnect', function() { | ||
if (tls_opts.rejectUnauthorized && !tls_client.authorized) { | ||
throw new Error('TLS not authorized'); | ||
tls_client.emit('error', new Error('TLS not authorized')); | ||
} else { | ||
tls_client.removeListener('error', handleTLSerrors) | ||
} | ||
}); | ||
function handleTLSerrors(err) { | ||
// How can I get verify this error is a tls error? | ||
if (tls_opts.rejectUnauthorized) { | ||
mqttClient.emit('error', err) | ||
} | ||
// close this connection to match the behaviour of net | ||
// otherwise all we get is an error from the tls_client | ||
// and close event doesn't fire. This is a work around | ||
// to enable the reconnect code to work the same as with | ||
// net.createConnection | ||
tls_client.end(); | ||
} | ||
tls_client.on('error', handleTLSerrors) | ||
return tls_client; | ||
@@ -165,0 +174,0 @@ }; |
@@ -17,3 +17,3 @@ /* Parse - packet parsing */ | ||
parser.connect = function(buf, packet) { | ||
parser.connect = function(buf, packet, encoding) { | ||
parser._pos = 0; | ||
@@ -77,3 +77,3 @@ parser._len = buf.length; | ||
if(flags.username) { | ||
username = this.parse_string(buf); | ||
username = this.parse_string(buf, encoding); | ||
if(username === null) return new Error('Parse error - cannot parse username'); | ||
@@ -85,3 +85,3 @@ packet.username = username; | ||
if(flags.password) { | ||
password = this.parse_string(buf); | ||
password = this.parse_string(buf, encoding); | ||
if(password === null) return ; | ||
@@ -200,3 +200,3 @@ packet.password = password; | ||
// Parse topic | ||
topic = parser.parse_string(buf, parser._len, parser._pos); | ||
topic = parser.parse_string(buf); | ||
if(topic === null) return new Error('Parse error - cannot parse topic'); | ||
@@ -231,3 +231,3 @@ | ||
parser.parse_string = function(buf) { | ||
parser.parse_string = function(buf, encoding) { | ||
var length = parser.parse_num(buf) | ||
@@ -238,3 +238,3 @@ , result; | ||
result = utils.toString(parser, buf, length); | ||
result = utils.toString(parser, buf, length, encoding); | ||
@@ -241,0 +241,0 @@ parser._pos += length; |
@@ -13,4 +13,9 @@ var bops = require('bops'); | ||
module.exports.toString = function(parser, buf, length) { | ||
module.exports.toString = function(parser, buf, length, encoding) { | ||
if (encoding !== 'binary') { | ||
buf = bops.to(buf, encoding); | ||
} | ||
return bops.to(bops.subarray(buf, parser._pos, parser._pos + length), 'utf8'); | ||
}; |
@@ -13,4 +13,8 @@ | ||
module.exports.toString = function(parser, buf, length) { | ||
return buf.toString('utf8', parser._pos, parser._pos + length); | ||
module.exports.toString = function(parser, buf, length, encoding) { | ||
if (encoding !== 'binary') { | ||
return buf.toString(encoding || 'utf8', parser._pos, parser._pos + length); | ||
} else { | ||
return buf.slice(parser._pos, parser._pos + length); | ||
} | ||
}; |
@@ -5,3 +5,3 @@ { | ||
"description": "A library for the MQTT protocol", | ||
"version": "0.3.8", | ||
"version": "0.3.9", | ||
"contributors": [ | ||
@@ -24,2 +24,3 @@ "Matteo Collina <matteo.collina@gmail.com> (https://github.com/mcollina)" | ||
}, | ||
"pre-commit": "test", | ||
"bin": { | ||
@@ -42,4 +43,6 @@ "mqtt_pub": "./bin/mqtt_pub", | ||
"mocha": "*", | ||
"should": "*" | ||
"should": "*", | ||
"pre-commit": "0.0.7", | ||
"sinon": "~1.10.0" | ||
} | ||
} |
@@ -5,3 +5,4 @@ | ||
*/ | ||
var should = require('should'); | ||
var should = require('should') | ||
, MqttClient = require('../lib/client'); | ||
@@ -13,4 +14,7 @@ module.exports = function(server, createClient, port) { | ||
client.stream.end(); | ||
client.once('connect', function() { | ||
client.stream.end(); | ||
}); | ||
client.once('close', function() { | ||
client.end() | ||
done(); | ||
@@ -24,2 +28,3 @@ }); | ||
client.stream.once('close', function() { | ||
client.end(); | ||
if (!client.connected) { | ||
@@ -41,2 +46,3 @@ done(); | ||
should.not.exist(client.pingTimer); | ||
client.end(); | ||
done(); | ||
@@ -83,2 +89,3 @@ }); | ||
server.once('client', function(client) { | ||
client.disconnect() | ||
done(); | ||
@@ -94,2 +101,3 @@ }); | ||
packet.clientId.should.match(/mqttjs.*/); | ||
client.disconnect() | ||
done(); | ||
@@ -106,2 +114,3 @@ }); | ||
packet.clean.should.be.true; | ||
client.disconnect(); | ||
done(); | ||
@@ -119,2 +128,3 @@ }); | ||
packet.clientId.should.match(/testclient/); | ||
client.disconnect(); | ||
done(); | ||
@@ -133,2 +143,3 @@ }); | ||
packet.clean.should.be.false; | ||
client.disconnect(); | ||
done(); | ||
@@ -146,3 +157,3 @@ }); | ||
} catch(err) { | ||
process.nextTick(done); | ||
done(); | ||
} | ||
@@ -157,2 +168,3 @@ }); | ||
packet.clientId.should.match(/testclient/); | ||
client.disconnect(); | ||
done(); | ||
@@ -165,3 +177,6 @@ }); | ||
var client = createClient(port); | ||
client.once('connect', done); | ||
client.once('connect', function() { | ||
client.end(); | ||
done(); | ||
}); | ||
client.once('error', done); | ||
@@ -178,2 +193,3 @@ }); | ||
} | ||
client.end(); | ||
}); | ||
@@ -188,2 +204,3 @@ }); | ||
client.once('error', function(error) { | ||
client.end(); | ||
done(); | ||
@@ -194,6 +211,8 @@ }); | ||
it('should have different client ids', function() { | ||
var client1 = createClient(port).options.clientId | ||
, client2 = createClient(port).options.clientId; | ||
var client1 = createClient(port) | ||
, client2 = createClient(port); | ||
client1.should.not.equal(client2); | ||
client1.options.clientId.should.not.equal(client2.options.clientId); | ||
client1.end(); | ||
client2.end(); | ||
}); | ||
@@ -380,2 +399,3 @@ }); | ||
should.exist(client.pingTimer); | ||
client.end() | ||
done(); | ||
@@ -388,2 +408,3 @@ }); | ||
should.not.exist(client.pingTimer); | ||
client.end() | ||
done(); | ||
@@ -393,9 +414,11 @@ }); | ||
it('should reconnect if pingresp is not sent', function(done) { | ||
var client = createClient(port, {keepalive:1, reconnectPeriod: 200}); | ||
var client = createClient(port, {keepalive:1, reconnectPeriod: 50}); | ||
// Fake no pingresp being send by stubbing the _handlePingresp function | ||
client._handlePingresp = function () {}; | ||
client.once('close', function() { | ||
client.once('connect', function() { | ||
client.once('connect', function() { | ||
true.should.equal(true); | ||
done(); | ||
client.end() | ||
done(); | ||
}); | ||
@@ -689,2 +712,42 @@ }); | ||
it('should emit \'reconnect\' when reconnecting', function(done) { | ||
var client = createClient(port) | ||
, tryReconnect = true | ||
, reconnectEvent = false; | ||
client.on('reconnect', function() { | ||
reconnectEvent = true; | ||
}); | ||
client.on('connect', function() { | ||
if (tryReconnect) { | ||
client.stream.end(); | ||
tryReconnect = false; | ||
} else { | ||
reconnectEvent.should.equal(true); | ||
done(); | ||
} | ||
}); | ||
}); | ||
it('should emit \'offline\' after going offline', function(done) { | ||
var client = createClient(port) | ||
, tryReconnect = true | ||
, offlineEvent = false; | ||
client.on('offline', function() { | ||
offlineEvent = true; | ||
}); | ||
client.on('connect', function() { | ||
if (tryReconnect) { | ||
client.stream.end(); | ||
tryReconnect = false; | ||
} else { | ||
offlineEvent.should.equal(true); | ||
done(); | ||
} | ||
}); | ||
}); | ||
it('should not reconnect if it was ended by the user', function(done) { | ||
@@ -714,7 +777,7 @@ var client = createClient(port); | ||
it('should allow specification of a reconnect period', function(done) { | ||
this.timeout(2200); | ||
var client = createClient(port, {reconnectPeriod: 2000}) | ||
var period = 200 | ||
, client = createClient(port, {reconnectPeriod: period}) | ||
, reconnect = false; | ||
var start = process.hrtime() | ||
var start = Date.now() | ||
, end; | ||
@@ -727,4 +790,5 @@ | ||
} else { | ||
end = process.hrtime(start); | ||
if (end[0] === 2) { | ||
client.end(); | ||
end = Date.now() | ||
if (end - start >= period) { | ||
// Connected in about 2 seconds, that's good enough | ||
@@ -731,0 +795,0 @@ done(); |
@@ -6,2 +6,3 @@ /** | ||
var mqtt = require('..') | ||
, should = require('should') | ||
, abstractClientTests = require("./abstract_client"); | ||
@@ -77,6 +78,24 @@ | ||
describe('MqttClient', function() { | ||
describe('creating', function() { | ||
it('should allow instantiation of MqttClient without the \'new\' operator' , function(done) { | ||
should(function() { | ||
var client; | ||
try { | ||
client = mqtt.MqttClient(function() { | ||
throw Error('break'); | ||
}, {}); | ||
} catch (err) { | ||
if (err.message !== 'break') { | ||
throw err; | ||
} | ||
done(); | ||
} | ||
}).not.throw("Object #<Object> has no method '_setupStream'"); | ||
}); | ||
}); | ||
abstractClientTests(server, createClient, port); | ||
describe('message ids', function() { | ||
it('should increment the message id', function() { | ||
@@ -83,0 +102,0 @@ var client = createClient(); |
/** | ||
* Testing requires | ||
* Testing requires | ||
*/ | ||
@@ -30,9 +30,9 @@ var should = require('should') | ||
var fixture = [ | ||
16, 18, // Header | ||
16, 18, // Header | ||
0, 6, // Protocol id length | ||
77, 81, 73, 115, 100, 112, // Protocol id | ||
3, // Protocol version | ||
0, // Connect flags | ||
0, 30, // Keepalive | ||
0, 4, //Client id length | ||
0, // Connect flags | ||
0, 30, // Keepalive | ||
0, 4, //Client id length | ||
116, 101, 115, 116 // Client id | ||
@@ -72,16 +72,16 @@ ]; | ||
var fixture = [ | ||
16, 54, // Header | ||
0, 6, // Protocol id length | ||
77, 81, 73, 115, 100, 112, // Protocol id | ||
3, // Protocol version | ||
246, // Connect flags | ||
0, 30, // Keepalive | ||
0, 4, // Client id length | ||
116, 101, 115, 116, // Client id | ||
0, 5, // will topic length | ||
116, 111, 112, 105, 99, // will topic | ||
16, 54, // Header | ||
0, 6, // Protocol id length | ||
77, 81, 73, 115, 100, 112, // Protocol id | ||
3, // Protocol version | ||
246, // Connect flags | ||
0, 30, // Keepalive | ||
0, 4, // Client id length | ||
116, 101, 115, 116, // Client id | ||
0, 5, // will topic length | ||
116, 111, 112, 105, 99, // will topic | ||
0, 7, // will payload length | ||
112, 97, 121, 108, 111, 97, 100, // will payload | ||
0, 8, // username length | ||
117, 115, 101, 114, 110, 97, 109, 101, // username | ||
0, 8, // username length | ||
117, 115, 101, 114, 110, 97, 109, 101, // username | ||
0, 8, // password length | ||
@@ -99,2 +99,45 @@ 112, 97, 115, 115, 119, 111, 114, 100 //password | ||
it('should handle binary username/password', function(done) { | ||
var expected = { | ||
cmd: "connect", | ||
retain: false, | ||
qos: 0, | ||
dup: false, | ||
length: 28, | ||
protocolId: "MQIsdp", | ||
protocolVersion: 3, | ||
clean: false, | ||
keepalive: 30, | ||
clientId: "test", | ||
username: new Buffer([12, 13, 14]), | ||
password: new Buffer([15, 16, 17]), | ||
}; | ||
var fixture = [ | ||
16, 28, // Header | ||
0, 6, // Protocol id length | ||
77, 81, 73, 115, 100, 112, // Protocol id | ||
3, // Protocol version | ||
0x80 | 0x40, // Connect flags | ||
0, 30, // Keepalive | ||
0, 4, //Client id length | ||
116, 101, 115, 116, // Client id | ||
0, 3, // username length | ||
12, 13, 14, // username | ||
0, 3, // password length | ||
15, 16, 17 //password | ||
]; | ||
this.stream.write(new Buffer(fixture)); | ||
this.conn.setPacketEncoding('binary'); | ||
this.conn.once('connect', function(packet) { | ||
packet.username.toString('hex').should.eql(expected.username.toString('hex')); | ||
packet.password.toString('hex').should.eql(expected.password.toString('hex')); | ||
done(); | ||
}); | ||
}); | ||
describe('parse errors', function() { | ||
@@ -127,3 +170,3 @@ it('should say protocol not parseable', function(done) { | ||
} | ||
var fixture = [32, 2, 0, 0]; | ||
@@ -148,3 +191,3 @@ | ||
} | ||
var fixture = [32, 2, 0, 5]; | ||
@@ -174,3 +217,3 @@ | ||
var fixture = [ | ||
48, 10, // Header | ||
48, 10, // Header | ||
0, 4, // Topic length | ||
@@ -285,3 +328,3 @@ 116, 101, 115, 116, // Topic (test) | ||
var fixture = [ | ||
48, 6, // Header | ||
48, 6, // Header | ||
0, 4, // Topic length | ||
@@ -312,3 +355,3 @@ 116, 101, 115, 116 // Topic | ||
var fixture1 = [ | ||
48, 10, // Header | ||
48, 10, // Header | ||
0, 4, // Topic length | ||
@@ -516,4 +559,4 @@ 116, 101, 115, 116 // Topic (test) | ||
dup: false, | ||
length: 5, | ||
granted: [0, 1, 2], | ||
length: 6, | ||
granted: [0, 1, 2, 128], | ||
messageId: 6 | ||
@@ -523,5 +566,5 @@ }; | ||
var fixture = [ | ||
144, 5, // Header | ||
144, 6, // Header | ||
0, 6, // Message id | ||
0, 1, 2 // Granted qos (0, 1, 2) | ||
0, 1, 2, 128 // Granted qos (0, 1, 2) and a rejected being 0x80 | ||
]; | ||
@@ -570,3 +613,3 @@ | ||
}); | ||
describe('unsuback', function() { | ||
@@ -573,0 +616,0 @@ it('should fire a unsuback event', function(done) { |
@@ -91,2 +91,37 @@ /** | ||
it('should send a connect packet with binary username/password', function(done) { | ||
var expected = new Buffer([ | ||
16, 28, // Header | ||
0, 6, 77, 81, 73, 115, 100, 112, // Protocol Id | ||
3, // Protocol version | ||
0x40 | 0x80, // Connect flags | ||
0, 30, // Keepalive | ||
0, 4, // Client id length | ||
116, 101, 115, 116, // Client Id | ||
0, 3, // username length | ||
12, 13, 14, // username | ||
0, 3, // password length | ||
15, 16, 17 //password | ||
]); | ||
var fixture = { | ||
protocolId: 'MQIsdp', | ||
protocolVersion: 3, | ||
clientId: 'test', | ||
keepalive: 30, | ||
username: new Buffer([12, 13, 14]), | ||
password: new Buffer([15, 16, 17]) | ||
}; | ||
this.conn.setPacketEncoding('binary'); | ||
this.conn.connect(fixture); | ||
var that = this; | ||
this.stream.on('readable', function() { | ||
var packet = that.stream.read(); | ||
packet.should.eql(expected); | ||
done(); | ||
}); | ||
}); | ||
describe('invalid options', function () { | ||
@@ -93,0 +128,0 @@ describe('protocol id', function () { |
@@ -6,3 +6,4 @@ /** | ||
var should = require('should') | ||
, net = require('net'); | ||
, net = require('net') | ||
, sinon = require('sinon'); | ||
@@ -52,2 +53,8 @@ /** | ||
it('should return an MqttClient with correct host when called with a host and port', function () { | ||
sinon.spy(mqtt, "createClient"); | ||
var c = mqtt.connect('tcp://user:pass@localhost:1883'); | ||
mqtt.createClient.calledWith('1883', 'localhost').should.be.ok; | ||
}); | ||
it('should throw an error when connect is called without a brokerUrl', function () { | ||
@@ -68,2 +75,4 @@ (function(){ | ||
c.on('error', function() {}); | ||
c.should.be.instanceOf(mqtt.MqttClient); | ||
@@ -75,2 +84,4 @@ }); | ||
c.on('error', function() {}); | ||
c.should.be.instanceOf(mqtt.MqttClient); | ||
@@ -98,2 +109,4 @@ }); | ||
c.on('error', function() {}); | ||
c.should.be.instanceOf(mqtt.MqttClient); | ||
@@ -108,2 +121,4 @@ }); | ||
c.on('error', function() {}); | ||
c.should.be.instanceOf(mqtt.MqttClient); | ||
@@ -119,2 +134,4 @@ }); | ||
c.on('error', function() {}); | ||
c.should.be.instanceOf(mqtt.MqttClient); | ||
@@ -130,2 +147,4 @@ }); | ||
c.on('error', function() {}); | ||
c.should.be.instanceOf(mqtt.MqttClient); | ||
@@ -132,0 +151,0 @@ }); |
@@ -21,2 +21,4 @@ | ||
var WRONG_CERT = __dirname + '/helpers/wrong-cert.pem'; | ||
/** | ||
@@ -30,2 +32,3 @@ * Test server | ||
} else { | ||
server.emit('connect', client); | ||
client.connack({returnCode: 0}); | ||
@@ -80,2 +83,49 @@ } | ||
abstractClientTests(server, createClient, port); | ||
if (!process.version.match(/^v0.8/)) { | ||
describe('with secure parameters', function() { | ||
it('should validate successfully the CA', function (done) { | ||
var client = createClient(port, { | ||
ca: [CERT], | ||
rejectUnauthorized: true | ||
}); | ||
client.on('error', done) | ||
server.once('connect', function(client) { | ||
done(); | ||
}); | ||
}); | ||
it('should validate unsuccessfully the CA', function (done) { | ||
var client = createClient(port, { | ||
ca: [WRONG_CERT], | ||
rejectUnauthorized: true | ||
}); | ||
server.once('connect', function(client) { | ||
done(new Error('it should not happen')); | ||
}); | ||
client.once('error', function() { | ||
done() | ||
}) | ||
}); | ||
it('should emit close on TLS error', function (done) { | ||
var client = createClient(port, { | ||
ca: [WRONG_CERT], | ||
rejectUnauthorized: true | ||
}); | ||
client.on('error', function() {}) | ||
// TODO node v0.8.x emits multiple close events | ||
client.once('close', function() { | ||
done() | ||
}) | ||
}); | ||
}) | ||
} | ||
}); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
181743
68
5318
5
4