Comparing version 0.1.9 to 0.1.10
import { PacketStream } from './packet-stream'; | ||
export declare abstract class MqttPacket { | ||
get remainingPacketLength(): number; | ||
get packetType(): number; | ||
@@ -10,3 +11,3 @@ set identifier(value: number | null); | ||
protected packetFlags: number; | ||
protected remainingPacketLength: number; | ||
protected _remainingPacketLength: number; | ||
private _identifier; | ||
@@ -13,0 +14,0 @@ private static nextId; |
@@ -8,5 +8,8 @@ "use strict"; | ||
this.packetFlags = 0; | ||
this.remainingPacketLength = 0; | ||
this._remainingPacketLength = 0; | ||
this._packetType = packetType; | ||
} | ||
get remainingPacketLength() { | ||
return this._remainingPacketLength; | ||
} | ||
get packetType() { | ||
@@ -61,3 +64,3 @@ return this._packetType; | ||
if (this.hasIdentifier && !this.inlineIdentifier) | ||
this.remainingPacketLength += 2; | ||
this._remainingPacketLength += 2; | ||
this.writeRemainingLength(stream); | ||
@@ -74,3 +77,3 @@ if (this.hasIdentifier && !this.inlineIdentifier) | ||
readRemainingLength(stream) { | ||
this.remainingPacketLength = 0; | ||
this._remainingPacketLength = 0; | ||
let multiplier = 1; | ||
@@ -80,5 +83,5 @@ let encodedByte; | ||
encodedByte = stream.readByte(); | ||
this.remainingPacketLength += (encodedByte & 0x7f) * multiplier; | ||
this._remainingPacketLength += (encodedByte & 0x7f) * multiplier; | ||
if (multiplier > 128 * 128 * 128) { | ||
throw new Error(`Invalid length @${stream.position}/${stream.length}; currentLength: ${this.remainingPacketLength}`); | ||
throw new Error(`Invalid length @${stream.position}/${stream.length}; currentLength: ${this._remainingPacketLength}`); | ||
} | ||
@@ -89,3 +92,3 @@ multiplier *= 0x80; | ||
writeRemainingLength(stream) { | ||
let num = this.remainingPacketLength; | ||
let num = this._remainingPacketLength; | ||
let digit = 0; | ||
@@ -92,0 +95,0 @@ do { |
@@ -8,10 +8,4 @@ /// <reference types="node" /> | ||
protected errorCallback: (e: Error) => void; | ||
protected lock: import("./mqtt.utilities").Lock; | ||
mapping: [number, () => MqttPacket][]; | ||
/** | ||
* Some workaround for async requests: | ||
* This prevents the execution if there's already something in the buffer. | ||
* Note: if something fails, this will lock forever | ||
* @type {{unlock: () => void; resolve: null; lock: () => void; locked: boolean}} | ||
*/ | ||
private lock; | ||
constructor(errorCallback?: (e: Error) => void, debug?: ((msg: string) => void) | undefined); | ||
@@ -18,0 +12,0 @@ reset(): void; |
@@ -5,8 +5,9 @@ "use strict"; | ||
const packet_stream_1 = require("./packet-stream"); | ||
const Bluebird = require("bluebird"); | ||
const errors_1 = require("./errors"); | ||
const packets_1 = require("./packets"); | ||
const mqtt_utilities_1 = require("./mqtt.utilities"); | ||
class MqttParser { | ||
constructor(errorCallback, debug) { | ||
this.debug = debug; | ||
this.lock = mqtt_utilities_1.createLock(); | ||
this.mapping = [ | ||
@@ -27,32 +28,2 @@ [mqtt_constants_1.PacketTypes.TYPE_CONNACK, () => new packets_1.ConnectResponsePacket()], | ||
]; | ||
/** | ||
* Some workaround for async requests: | ||
* This prevents the execution if there's already something in the buffer. | ||
* Note: if something fails, this will lock forever | ||
* @type {{unlock: () => void; resolve: null; lock: () => void; locked: boolean}} | ||
*/ | ||
this.lock = { | ||
locked: false, | ||
lock() { | ||
this.locked = true; | ||
}, | ||
unlock() { | ||
this.locked = false; | ||
if (this.resolve) { | ||
this.resolve(); | ||
this.resolve = null; | ||
} | ||
}, | ||
resolve: null, | ||
wait() { | ||
if (this.locked) { | ||
return new Promise(resolve => { | ||
this.resolve = resolve; | ||
}); | ||
} | ||
else { | ||
return Promise.resolve(); | ||
} | ||
}, | ||
}; | ||
this.stream = packet_stream_1.PacketStream.empty(); | ||
@@ -65,6 +36,6 @@ /* eslint @typescript-eslint/no-empty-function: "off" */ | ||
this.lock.locked = false; | ||
this.lock.resolve = null; | ||
this.lock.resolver = null; | ||
} | ||
async parse(data) { | ||
var _a, _b, _c, _d; | ||
var _a, _b, _c, _d, _e, _f, _g, _h; | ||
await this.lock.wait(); | ||
@@ -92,3 +63,3 @@ this.lock.lock(); | ||
let exitParser = false; | ||
await Bluebird.try(() => { | ||
try { | ||
packet.read(this.stream); | ||
@@ -98,12 +69,11 @@ results.push(packet); | ||
startPos = this.stream.position; | ||
}) | ||
.catch(errors_1.EndOfStreamError, e => { | ||
var _a, _b; | ||
(_b = (_a = this).debug) === null || _b === void 0 ? void 0 : _b.call(_a, `End of stream: ${e.stack}`); | ||
this.stream.position = startPos; | ||
exitParser = true; | ||
}) | ||
.catch((e) => { | ||
var _a, _b; | ||
(_b = (_a = this).debug) === null || _b === void 0 ? void 0 : _b.call(_a, `Error in parser (type: ${type}): | ||
} | ||
catch (e) { | ||
if (e instanceof errors_1.EndOfStreamError) { | ||
(_d = (_c = this).debug) === null || _d === void 0 ? void 0 : _d.call(_c, `EOS:\n ${packet.remainingPacketLength} got: ${this.stream.length} (+) ${data.byteLength};\n return: ${startPos};`); | ||
this.stream.position = startPos; | ||
exitParser = true; | ||
} | ||
else { | ||
(_f = (_e = this).debug) === null || _f === void 0 ? void 0 : _f.call(_e, `Error in parser (type: ${type}): | ||
${e.stack}; | ||
@@ -113,6 +83,7 @@ exiting; | ||
stream: ${this.stream.data.toString('base64')}`); | ||
this.errorCallback(e); | ||
exitParser = true; | ||
this.stream = packet_stream_1.PacketStream.empty(); | ||
}); | ||
this.errorCallback(e); | ||
exitParser = true; | ||
this.stream = packet_stream_1.PacketStream.empty(); | ||
} | ||
} | ||
if (exitParser) | ||
@@ -123,3 +94,3 @@ break; | ||
catch (e) { | ||
(_d = (_c = this).debug) === null || _d === void 0 ? void 0 : _d.call(_c, `Error in parser: | ||
(_h = (_g = this).debug) === null || _h === void 0 ? void 0 : _h.call(_g, `Error in parser: | ||
${e.stack}; | ||
@@ -126,0 +97,0 @@ resetting; |
@@ -34,1 +34,15 @@ import { ListenerInfo } from './mqtt.types'; | ||
export declare const isDisconnect: (target: MqttPacket) => target is DisconnectRequestPacket; | ||
/** | ||
* Some workaround for async requests: | ||
* This prevents the execution if there's already something in the buffer. | ||
* Note: if something fails, this will lock forever | ||
* @type {{unlock: () => void; resolve: null; lock: () => void; locked: boolean}} | ||
*/ | ||
export declare function createLock(): Lock; | ||
export interface Lock { | ||
resolver: Function | null; | ||
locked: boolean; | ||
lock: () => void; | ||
unlock: () => void; | ||
wait: () => Promise<void>; | ||
} |
@@ -67,2 +67,35 @@ "use strict"; | ||
exports.isDisconnect = (target) => isPacket(target, mqtt_constants_1.PacketTypes.TYPE_DISCONNECT); | ||
/** | ||
* Some workaround for async requests: | ||
* This prevents the execution if there's already something in the buffer. | ||
* Note: if something fails, this will lock forever | ||
* @type {{unlock: () => void; resolve: null; lock: () => void; locked: boolean}} | ||
*/ | ||
function createLock() { | ||
return { | ||
locked: false, | ||
lock() { | ||
this.locked = true; | ||
}, | ||
unlock() { | ||
this.locked = false; | ||
if (this.resolver) { | ||
this.resolver(); | ||
this.resolver = null; | ||
} | ||
}, | ||
resolver: null, | ||
wait() { | ||
if (this.locked) { | ||
return new Promise(resolve => { | ||
this.resolver = resolve; | ||
}); | ||
} | ||
else { | ||
return Promise.resolve(); | ||
} | ||
}, | ||
}; | ||
} | ||
exports.createLock = createLock; | ||
//# sourceMappingURL=mqtt.utilities.js.map |
@@ -52,3 +52,3 @@ "use strict"; | ||
data.writeString(password); | ||
this.remainingPacketLength = data.length; | ||
this._remainingPacketLength = data.length; | ||
super.write(stream); | ||
@@ -55,0 +55,0 @@ stream.write(data.data); |
@@ -73,3 +73,3 @@ "use strict"; | ||
const data = this.writeIdentifier(packet_stream_1.PacketStream.empty().writeString(this._topic)).write(this._payload); | ||
this.remainingPacketLength = data.length; | ||
this._remainingPacketLength = data.length; | ||
super.write(stream); | ||
@@ -76,0 +76,0 @@ stream.write(data.data); |
@@ -40,3 +40,3 @@ "use strict"; | ||
.writeByte(this._qosLevel); | ||
this.remainingPacketLength = data.length; | ||
this._remainingPacketLength = data.length; | ||
super.write(stream); | ||
@@ -43,0 +43,0 @@ stream.write(data.data); |
@@ -29,3 +29,3 @@ "use strict"; | ||
const data = packet_stream_1.PacketStream.empty().writeString(this._topic); | ||
this.remainingPacketLength = data.length; | ||
this._remainingPacketLength = data.length; | ||
super.write(stream); | ||
@@ -32,0 +32,0 @@ stream.write(data.data); |
{ | ||
"name": "mqtts", | ||
"version": "0.1.9", | ||
"version": "0.1.10", | ||
"description": "MQTT client in Typescript", | ||
@@ -30,3 +30,3 @@ "main": "dist/index.js", | ||
"dependencies": { | ||
"bluebird": "^3.7.2", | ||
"@types/ws": "^7.2.1", | ||
"debug": "^4.1.1", | ||
@@ -37,7 +37,5 @@ "lodash": "^4.17.15", | ||
"ts-xor": "^1.0.8", | ||
"ws": "^7.2.1", | ||
"@types/ws": "^7.2.1" | ||
"ws": "^7.2.1" | ||
}, | ||
"devDependencies": { | ||
"@types/bluebird": "^3.5.29", | ||
"@types/debug": "^4.1.5", | ||
@@ -44,0 +42,0 @@ "@types/jest": "^25.1.2", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
130135
7
13
2040
- Removedbluebird@^3.7.2
- Removedbluebird@3.7.2(transitive)