Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mqtt

Package Overview
Dependencies
Maintainers
7
Versions
203
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mqtt - npm Package Compare versions

Comparing version 5.0.0-beta.4 to 5.0.0

2

build/package.json
{
"name": "mqtt",
"description": "A library for the MQTT protocol",
"version": "5.0.0-beta.4",
"version": "5.0.0",
"contributors": [

@@ -6,0 +6,0 @@ "Adam Rudd <adamvrr@gmail.com>",

/// <reference types="node" />
/// <reference types="node" />
import { IAuthPacket, IConnackPacket, IDisconnectPacket, IPublishPacket, Packet, QoS, UserProperties } from 'mqtt-packet';
import { IAuthPacket, IConnackPacket, IDisconnectPacket, IPublishPacket, ISubscribePacket, Packet, QoS, IConnectPacket } from 'mqtt-packet';
import { IMessageIdProvider } from './default-message-id-provider';

@@ -23,3 +23,3 @@ import { DuplexOptions } from 'readable-stream';

}
export type AckHandler = (topic: string, message: Buffer, packet: any, cb: (error?: Error, code?: number) => void) => void;
export type AckHandler = (topic: string, message: Buffer, packet: any, cb: (error: Error | number, code?: number) => void) => void;
export interface IClientOptions extends ISecureClientOptions {

@@ -30,3 +30,3 @@ encoding?: BufferEncoding;

my?: any;
manualConnect?: any;
manualConnect?: boolean;
authPacket?: Partial<IAuthPacket>;

@@ -45,11 +45,4 @@ writeCache?: boolean;

wsOptions?: ClientOptions | ClientRequestArgs | DuplexOptions;
keepalive?: number;
clientId?: string;
protocolId?: string;
protocolVersion?: number;
clean?: boolean;
reconnectPeriod?: number;
connectTimeout?: number;
username?: string;
password?: Buffer | string;
incomingStore?: Store;

@@ -68,34 +61,15 @@ outgoingStore?: Store;

resubscribe?: boolean;
will?: {
topic: string;
payload: Buffer | string;
qos: QoS;
retain: boolean;
properties?: {
willDelayInterval?: number;
payloadFormatIndicator?: boolean;
messageExpiryInterval?: number;
contentType?: string;
responseTopic?: string;
correlationData?: Buffer;
userProperties?: UserProperties;
};
authPacket?: any;
manualConnect?: boolean;
};
transformWsUrl?: (url: string, options: IClientOptions, client: MqttClient) => string;
properties?: {
sessionExpiryInterval?: number;
receiveMaximum?: number;
maximumPacketSize?: number;
topicAliasMaximum?: number;
requestResponseInformation?: boolean;
requestProblemInformation?: boolean;
userProperties?: UserProperties;
authenticationMethod?: string;
authenticationData?: Buffer;
};
messageIdProvider?: IMessageIdProvider;
browserBufferTimeout?: number;
objectMode?: boolean;
clientId?: string;
protocolVersion?: IConnectPacket['protocolVersion'];
protocolId?: IConnectPacket['protocolId'];
clean?: boolean;
keepalive?: number;
username?: string;
password?: Buffer | string;
will?: IConnectPacket['will'];
properties?: IConnectPacket['properties'];
}

@@ -106,30 +80,5 @@ export interface IClientPublishOptions {

dup?: boolean;
properties?: {
payloadFormatIndicator?: boolean;
messageExpiryInterval?: number;
topicAlias?: number;
responseTopic?: string;
correlationData?: Buffer;
userProperties?: UserProperties;
subscriptionIdentifier?: number;
contentType?: string;
};
properties?: IPublishPacket['properties'];
cbStorePut?: StorePutCallback;
}
export interface IClientSubscribeOptions {
qos: QoS;
nl?: boolean;
rap?: boolean;
rh?: number;
properties?: {
subscriptionIdentifier?: number;
userProperties?: UserProperties;
};
}
export interface IClientSubscribeProperties {
properties?: {
subscriptionIdentifier?: number;
userProperties?: UserProperties;
};
}
export interface IClientReconnectOptions {

@@ -139,11 +88,6 @@ incomingStore?: Store;

}
export interface ISubscriptionGrant {
topic: string;
qos: QoS | number;
nl?: boolean;
rap?: boolean;
rh?: number;
export interface IClientSubscribeProperties {
properties?: ISubscribePacket['properties'];
}
export interface ISubscriptionRequest extends IClientSubscribeProperties {
topic: string;
export interface IClientSubscribeOptions extends IClientSubscribeProperties {
qos: QoS;

@@ -154,10 +98,10 @@ nl?: boolean;

}
export interface ISubscriptioOptions extends IClientSubscribeProperties {
qos: QoS;
nl?: boolean;
rap?: boolean;
rh?: number;
export interface ISubscriptionRequest extends IClientSubscribeOptions {
topic: string;
}
export interface ISubscriptionGrant extends Omit<ISubscriptionRequest, 'qos' | 'properties'> {
qos: QoS | 128;
}
export type ISubscriptionMap = {
[topic: string]: ISubscriptioOptions;
[topic: string]: IClientSubscribeOptions;
} & {

@@ -235,2 +179,4 @@ resubscribe?: boolean;

publish(topic: string, message: string | Buffer, opts?: IClientPublishOptions, callback?: DoneCallback): MqttClient;
publishAsync(topic: string, message: string | Buffer): Promise<void>;
publishAsync(topic: string, message: string | Buffer, opts?: IClientPublishOptions): Promise<void>;
subscribe(topicObject: string | string[] | ISubscriptionMap): MqttClient;

@@ -240,2 +186,4 @@ subscribe(topicObject: string | string[] | ISubscriptionMap, callback?: ClientSubscribeCallback): MqttClient;

subscribe(topicObject: string | string[] | ISubscriptionMap, opts?: IClientSubscribeOptions | IClientSubscribeProperties, callback?: ClientSubscribeCallback): MqttClient;
subscribeAsync(topicObject: string | string[] | ISubscriptionMap): Promise<ISubscriptionGrant[]>;
subscribeAsync(topicObject: string | string[] | ISubscriptionMap, opts?: IClientSubscribeOptions | IClientSubscribeProperties): Promise<ISubscriptionGrant[]>;
unsubscribe(topic: string | string[]): MqttClient;

@@ -245,2 +193,4 @@ unsubscribe(topic: string | string[], opts?: IClientSubscribeOptions): MqttClient;

unsubscribe(topic: string | string[], opts?: IClientSubscribeOptions, callback?: PacketCallback): MqttClient;
unsubscribeAsync(topic: string | string[]): Promise<void>;
unsubscribeAsync(topic: string | string[], opts?: IClientSubscribeOptions): Promise<void>;
end(cb?: DoneCallback): MqttClient;

@@ -251,2 +201,6 @@ end(force?: boolean): MqttClient;

end(force?: boolean, opts?: Partial<IDisconnectPacket>, cb?: DoneCallback): MqttClient;
endAsync(): Promise<void>;
endAsync(force?: boolean): Promise<void>;
endAsync(opts?: Partial<IDisconnectPacket>): Promise<void>;
endAsync(force?: boolean, opts?: Partial<IDisconnectPacket>): Promise<void>;
removeOutgoingMessage(messageId: number): MqttClient;

@@ -253,0 +207,0 @@ reconnect(opts?: Pick<IClientOptions, 'incomingStore' | 'outgoingStore'>): MqttClient;

@@ -208,2 +208,3 @@ "use strict";

connect() {
var _a;
const writable = new readable_stream_1.Writable();

@@ -273,4 +274,16 @@ const parser = mqtt_packet_1.default.parser(this.options);

this.log('connect: sending packet `connect`');
const connectPacket = Object.create(this.options);
connectPacket.cmd = 'connect';
const connectPacket = {
cmd: 'connect',
protocolId: this.options.protocolId,
protocolVersion: this.options.protocolVersion,
clean: this.options.clean,
clientId: this.options.clientId,
keepalive: this.options.keepalive,
username: this.options.username,
password: this.options.password,
properties: this.options.properties,
};
if (this.options.will) {
connectPacket.will = Object.assign(Object.assign({}, this.options.will), { payload: (_a = this.options.will) === null || _a === void 0 ? void 0 : _a.payload });
}
if (this.topicAliasRecv) {

@@ -313,4 +326,5 @@ if (!connectPacket.properties) {

callback = opts;
opts = {};
opts = null;
}
opts = opts || {};
const defaultOpts = {

@@ -376,2 +390,14 @@ qos: 0,

}
publishAsync(topic, message, opts) {
return new Promise((resolve, reject) => {
this.publish(topic, message, opts, (err) => {
if (err) {
reject(err);
}
else {
resolve();
}
});
});
}
subscribe(topicObject, opts, callback) {

@@ -510,2 +536,14 @@ const version = this.options.protocolVersion;

}
subscribeAsync(topicObject, opts) {
return new Promise((resolve, reject) => {
this.subscribe(topicObject, opts, (err, granted) => {
if (err) {
reject(err);
}
else {
resolve(granted);
}
});
});
}
unsubscribe(topic, opts, callback) {

@@ -570,22 +608,29 @@ if (typeof topic === 'string') {

}
unsubscribeAsync(topic, opts) {
return new Promise((resolve, reject) => {
this.unsubscribe(topic, opts, (err) => {
if (err) {
reject(err);
}
else {
resolve();
}
});
});
}
end(force, opts, cb) {
this.log('end :: (%s)', this.options.clientId);
if (force == null || typeof force !== 'boolean') {
cb = (opts || this.noop);
cb = cb || opts;
opts = force;
force = false;
if (typeof opts !== 'object') {
cb = opts;
opts = null;
if (typeof cb !== 'function') {
cb = this.noop;
}
}
}
if (typeof opts !== 'object') {
cb = opts;
cb = cb || opts;
opts = null;
}
this.log('end :: cb? %s', !!cb);
cb = cb || this.noop;
if (!cb || typeof cb !== 'function') {
cb = this.noop;
}
const closeStores = () => {

@@ -632,2 +677,14 @@ this.log('end :: closeStores: closing incoming and outgoing stores');

}
endAsync(force, opts) {
return new Promise((resolve, reject) => {
this.end(force, opts, (err) => {
if (err) {
reject(err);
}
else {
resolve();
}
});
});
}
removeOutgoingMessage(messageId) {

@@ -634,0 +691,0 @@ if (this.outgoing[messageId]) {

@@ -5,2 +5,6 @@ import MqttClient, { IClientOptions } from '../client';

declare function connect(brokerUrl: string, opts?: IClientOptions): MqttClient;
declare function connectAsync(brokerUrl: string): Promise<MqttClient>;
declare function connectAsync(opts: IClientOptions): Promise<MqttClient>;
declare function connectAsync(brokerUrl: string, opts?: IClientOptions): Promise<MqttClient>;
export default connect;
export { connectAsync };

@@ -6,6 +6,7 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.connectAsync = void 0;
const debug_1 = __importDefault(require("debug"));
const url_1 = __importDefault(require("url"));
const client_1 = __importDefault(require("../client"));
const is_browser_1 = __importDefault(require("../is-browser"));
const debug_1 = __importDefault(require("debug"));
const debug = (0, debug_1.default)('mqttjs');

@@ -134,3 +135,37 @@ const protocols = {};

}
function connectAsync(brokerUrl, opts, allowRetries = true) {
return new Promise((resolve, reject) => {
const client = connect(brokerUrl, opts);
const promiseResolutionListeners = {
connect: (connack) => {
removePromiseResolutionListeners();
resolve(client);
},
end: () => {
removePromiseResolutionListeners();
resolve(client);
},
error: (err) => {
removePromiseResolutionListeners();
client.end();
reject(err);
},
};
if (allowRetries === false) {
promiseResolutionListeners.close = () => {
promiseResolutionListeners.error(new Error("Couldn't connect to server"));
};
}
function removePromiseResolutionListeners() {
Object.keys(promiseResolutionListeners).forEach((eventName) => {
client.off(eventName, promiseResolutionListeners[eventName]);
});
}
Object.keys(promiseResolutionListeners).forEach((eventName) => {
client.on(eventName, promiseResolutionListeners[eventName]);
});
});
}
exports.connectAsync = connectAsync;
exports.default = connect;
//# sourceMappingURL=index.js.map

@@ -102,3 +102,3 @@ "use strict";

const webSocketStream = ws_1.default.createWebSocketStream(socket, options.wsOptions);
webSocketStream.url = url;
webSocketStream['url'] = url;
socket.on('close', () => {

@@ -105,0 +105,0 @@ webSocketStream.destroy();

@@ -0,9 +1,10 @@

/// <reference types="node" />
import type { Packet } from 'mqtt-packet';
import type internal from 'stream';
import type MqttClient from './client';
import type { IClientOptions } from './client';
import type MqttClient from './client';
import { Packet } from 'mqtt-packet';
import { Duplex, Writable } from 'readable-stream';
export type DoneCallback = (error?: Error) => void;
export type GenericCallback<T> = (error?: Error, result?: T) => void;
export type VoidCallback = () => void;
export type IStream = Duplex | Writable;
export type IStream = internal.Duplex;
export type StreamBuilder = (client: MqttClient, opts?: IClientOptions) => IStream;

@@ -10,0 +11,0 @@ export type Callback = () => void;

@@ -1,2 +0,3 @@

export default class UniqueMessageIdProvider {
import { IMessageIdProvider } from './default-message-id-provider';
export default class UniqueMessageIdProvider implements IMessageIdProvider {
private numberAllocator;

@@ -7,5 +8,5 @@ private lastId;

getLastAllocated(): number;
register(messageId: number): Boolean;
register(messageId: number): boolean;
deallocate(messageId: number): void;
clear(): void;
}

@@ -5,5 +5,7 @@ import MqttClient from './lib/client';

import Store, { IStore } from './lib/store';
import connect from './lib/connect';
import connect, { connectAsync } from './lib/connect';
export declare const Client: typeof MqttClient;
export { connect, MqttClient, Store, DefaultMessageIdProvider, UniqueMessageIdProvider, IStore, };
export { connect, connectAsync, MqttClient, Store, DefaultMessageIdProvider, UniqueMessageIdProvider, IStore, };
export * from './lib/client';
export * from './lib/shared';
export { ReasonCodes } from './lib/handlers/ack';

@@ -13,2 +13,14 @@ "use strict";

}));
var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) {
Object.defineProperty(o, "default", { enumerable: true, value: v });
}) : function(o, v) {
o["default"] = v;
});
var __importStar = (this && this.__importStar) || function (mod) {
if (mod && mod.__esModule) return mod;
var result = {};
if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k);
__setModuleDefault(result, mod);
return result;
};
var __exportStar = (this && this.__exportStar) || function(m, exports) {

@@ -21,3 +33,3 @@ for (var p in m) if (p !== "default" && !Object.prototype.hasOwnProperty.call(exports, p)) __createBinding(exports, m, p);

Object.defineProperty(exports, "__esModule", { value: true });
exports.UniqueMessageIdProvider = exports.DefaultMessageIdProvider = exports.Store = exports.MqttClient = exports.connect = exports.Client = void 0;
exports.ReasonCodes = exports.UniqueMessageIdProvider = exports.DefaultMessageIdProvider = exports.Store = exports.MqttClient = exports.connectAsync = exports.connect = exports.Client = void 0;
const client_1 = __importDefault(require("./lib/client"));

@@ -31,6 +43,10 @@ exports.MqttClient = client_1.default;

exports.Store = store_1.default;
const connect_1 = __importDefault(require("./lib/connect"));
const connect_1 = __importStar(require("./lib/connect"));
exports.connect = connect_1.default;
Object.defineProperty(exports, "connectAsync", { enumerable: true, get: function () { return connect_1.connectAsync; } });
exports.Client = client_1.default;
__exportStar(require("./lib/client"), exports);
__exportStar(require("./lib/shared"), exports);
var ack_1 = require("./lib/handlers/ack");
Object.defineProperty(exports, "ReasonCodes", { enumerable: true, get: function () { return ack_1.ReasonCodes; } });
//# sourceMappingURL=mqtt.js.map

@@ -7,3 +7,3 @@ Usage: mqtt subscribe [opts] [topic]

-p/--port PORT the broker port
-i/--client-id ID the client id
-i/--clientId ID the client id
-q/--qos 0/1/2 the QoS of the message

@@ -10,0 +10,0 @@ --no-clean do not discard any pending message for

{
"name": "mqtt",
"description": "A library for the MQTT protocol",
"version": "5.0.0-beta.4",
"version": "5.0.0",
"contributors": [

@@ -6,0 +6,0 @@ "Adam Rudd <adamvrr@gmail.com>",

@@ -313,8 +313,13 @@ # ![mqtt.js](https://raw.githubusercontent.com/mqttjs/MQTT.js/137ee0e3940c1f01049a30248c70f24dc6e6f829/MQTT.js.png)

- [`mqtt.connect()`](#connect)
- [`mqtt.connectAsync()`](#connect-async)
- [`mqtt.Client()`](#client)
- [`mqtt.Client#connect()`](#client-connect)
- [`mqtt.Client#publish()`](#publish)
- [`mqtt.Client#publishAsync()`](#publish-async)
- [`mqtt.Client#subscribe()`](#subscribe)
- [`mqtt.Client#subscribeAsync()`](#subscribe-async)
- [`mqtt.Client#unsubscribe()`](#unsubscribe)
- [`mqtt.Client#unsubscribeAsync()`](#unsubscribe-async)
- [`mqtt.Client#end()`](#end)
- [`mqtt.Client#endAsync()`](#end-async)
- [`mqtt.Client#removeOutgoingMessage()`](#removeOutgoingMessage)

@@ -354,2 +359,8 @@ - [`mqtt.Client#reconnect()`](#reconnect)

<a name="connect-async"></a>
### connectAsync([url], options)
Async [`connect`](#connect). Returns a `Promise` that resolves to a `mqtt.Client` instance.
---

@@ -579,2 +590,8 @@

<a name="publish-async"></a>
### mqtt.Client#publishAsync(topic, message, [options])
Async [`publish`](#publish). Returns a `Promise<void>`.
---

@@ -607,2 +624,8 @@

<a name="subscribe-async"></a>
### mqtt.Client#subscribeAsync(topic/topic array/topic object, [options])
Async [`subscribe`](#subscribe). Returns a `Promise<granted[]>`.
---

@@ -622,2 +645,8 @@

<a name="unsubscribe-async"></a>
### mqtt.Client#unsubscribeAsync(topic/topic array, [options])
Async [`unsubscribe`](#unsubscribe). Returns a `Promise<void>`.
---

@@ -644,2 +673,8 @@

<a name="end-async"></a>
### mqtt.Client#endAsync([force], [options])
Async [`end`](#end). Returns a `Promise<void>`.
---

@@ -646,0 +681,0 @@

@@ -15,3 +15,2 @@ /**

QoS,
UserProperties,
ISubackPacket,

@@ -120,6 +119,8 @@ IConnectPacket,

packet: any,
cb: (error?: Error, code?: number) => void,
cb: (error: Error | number, code?: number) => void,
) => void
export interface IClientOptions extends ISecureClientOptions {
/** CLIENT PROPERTIES */
/** Encoding to use. Example 'binary' */

@@ -134,3 +135,3 @@ encoding?: BufferEncoding

/** Manually call `connect` after creating client instance */
manualConnect?: any
manualConnect?: boolean
/** Custom auth packet properties */

@@ -140,4 +141,5 @@ authPacket?: Partial<IAuthPacket>

writeCache?: boolean
/** Should be set to `host` */
servername?: string
/** The default protocol to use when using `servers` and no protocol is specified */
defaultProtocol?: MqttProtocol

@@ -163,23 +165,4 @@ /** Support clientId passed in the query string of the url */

wsOptions?: ClientOptions | ClientRequestArgs | DuplexOptions
/**
* 10 seconds, set to 0 to disable
*/
keepalive?: number
/**
* 'mqttjs_' + Math.random().toString(16).substr(2, 8)
*/
clientId?: string
/**
* 'MQTT'
*/
protocolId?: string
/**
* 4
*/
protocolVersion?: number
/**
* true, set to false to receive QoS 1 and 2 messages while offline
*/
clean?: boolean
/**
* 1000 milliseconds, interval between two reconnections

@@ -192,11 +175,4 @@ */

connectTimeout?: number
/**
* the username required by your broker, if any
*/
username?: string
/**
* the password required by your broker, if any
*/
password?: Buffer | string
/**
* a Store for the incoming packets

@@ -216,6 +192,12 @@ */

/** automatically use topic alias */
autoUseTopicAlias?: boolean
/** automatically assign topic alias */
autoAssignTopicAlias?: boolean
/** Set to false to disable ping reschedule. When enabled ping messages are rescheduled on each message sent */
reschedulePings?: boolean
/** List of broker servers. On each reconnect try the next server will be used */
servers?: Array<{

@@ -238,40 +220,4 @@ host: string

resubscribe?: boolean
/**
* a message that will sent by the broker automatically when the client disconnect badly.
*/
will?: {
/**
* the topic to publish
*/
topic: string
/**
* the message to publish
*/
payload: Buffer | string
/**
* the QoS
*/
qos: QoS
/**
* the retain flag
*/
retain: boolean
/*
* properies object of will
* */
properties?: {
willDelayInterval?: number
payloadFormatIndicator?: boolean
messageExpiryInterval?: number
contentType?: string
responseTopic?: string
correlationData?: Buffer
userProperties?: UserProperties
}
authPacket?: any
/** Prevent to call `connect` in constructor */
manualConnect?: boolean
}
/** when defined this function will be called to transform the url string generated by MqttClient from provided options */
transformWsUrl?: (

@@ -282,18 +228,56 @@ url: string,

) => string
properties?: {
sessionExpiryInterval?: number
receiveMaximum?: number
maximumPacketSize?: number
topicAliasMaximum?: number
requestResponseInformation?: boolean
requestProblemInformation?: boolean
userProperties?: UserProperties
authenticationMethod?: string
authenticationData?: Buffer
}
/** Custom message id provider */
messageIdProvider?: IMessageIdProvider
/** When using websockets, this is the timeout used when writing to socket. Default 1000 (1s) */
browserBufferTimeout?: number
/**
* When using websockets, this sets the `objectMode` option.
* When in objectMode, streams can push Strings and Buffers
* as well as any other JavaScript object.
* Another major difference is that when in objectMode,
* the internal buffering algorithm counts objects rather than bytes.
* This means if we have a Transform stream with the highWaterMark option set to 5,
* the stream will only buffer a maximum of 5 objects internally
*/
objectMode?: boolean
/** CONNECT PACKET PROPERTIES */
/**
* 'mqttjs_' + Math.random().toString(16).substr(2, 8)
*/
clientId?: string
/**
* 3=MQTT 3.1 4=MQTT 3.1.1 5=MQTT 5.0. Defaults to 4
*/
protocolVersion?: IConnectPacket['protocolVersion']
/**
* 'MQTT'
*/
protocolId?: IConnectPacket['protocolId']
/**
* true, set to false to receive QoS 1 and 2 messages while offline
*/
clean?: boolean
/**
* 10 seconds, set to 0 to disable
*/
keepalive?: number
/**
* the username required by your broker, if any
*/
username?: string
/**
* the password required by your broker, if any
*/
password?: Buffer | string
/**
* a message that will sent by the broker automatically when the client disconnect badly.
*/
will?: IConnectPacket['will']
/** see `connect` packet: https://github.com/mqttjs/mqtt-packet/blob/master/types/index.d.ts#L65 */
properties?: IConnectPacket['properties']
}

@@ -317,12 +301,3 @@

*/
properties?: {
payloadFormatIndicator?: boolean
messageExpiryInterval?: number
topicAlias?: number
responseTopic?: string
correlationData?: Buffer
userProperties?: UserProperties
subscriptionIdentifier?: number
contentType?: string
}
properties?: IPublishPacket['properties']
/**

@@ -333,36 +308,3 @@ * callback called when message is put into `outgoingStore`

}
export interface IClientSubscribeOptions {
/**
* the QoS
*/
qos: QoS
/*
* no local flag
* */
nl?: boolean
/*
* Retain As Published flag
* */
rap?: boolean
/*
* Retain Handling option
* */
rh?: number
/*
* MQTT 5.0 properies object of subscribe
* */
properties?: {
subscriptionIdentifier?: number
userProperties?: UserProperties
}
}
export interface IClientSubscribeProperties {
/*
* MQTT 5.0 properies object of subscribe
* */
properties?: {
subscriptionIdentifier?: number
userProperties?: UserProperties
}
}
export interface IClientReconnectOptions {

@@ -378,12 +320,14 @@ /**

}
export interface IClientSubscribeProperties {
/*
* MQTT 5.0 properies object of subscribe
* */
properties?: ISubscribePacket['properties']
}
export interface ISubscriptionGrant {
export interface IClientSubscribeOptions extends IClientSubscribeProperties {
/**
* is a subscribed to topic
* the QoS
*/
topic: string
/**
* is the granted qos level on it, may return 128 on error
*/
qos: QoS | number
qos: QoS
/*

@@ -402,3 +346,3 @@ * no local flag

}
export interface ISubscriptionRequest extends IClientSubscribeProperties {
export interface ISubscriptionRequest extends IClientSubscribeOptions {
/**

@@ -408,27 +352,12 @@ * is a subscribed to topic

topic: string
}
export interface ISubscriptionGrant
extends Omit<ISubscriptionRequest, 'qos' | 'properties'> {
/**
* is the granted qos level on it
* is the granted qos level on it, may return 128 on error
*/
qos: QoS
/*
* no local flag
* */
nl?: boolean
/*
* Retain As Published flag
* */
rap?: boolean
/*
* Retain Handling option
* */
rh?: number
qos: QoS | 128
}
export interface ISubscriptioOptions extends IClientSubscribeProperties {
qos: QoS
nl?: boolean
rap?: boolean
rh?: number
}
export type ISubscriptionMap = {

@@ -438,3 +367,3 @@ /**

*/
[topic: string]: ISubscriptioOptions
[topic: string]: IClientSubscribeOptions
} & {

@@ -865,4 +794,22 @@ resubscribe?: boolean

this.log('connect: sending packet `connect`')
const connectPacket: IConnectPacket = Object.create(this.options)
connectPacket.cmd = 'connect'
const connectPacket: IConnectPacket = {
cmd: 'connect',
protocolId: this.options.protocolId,
protocolVersion: this.options.protocolVersion,
clean: this.options.clean,
clientId: this.options.clientId,
keepalive: this.options.keepalive,
username: this.options.username,
password: this.options.password as Buffer,
properties: this.options.properties,
}
if (this.options.will) {
connectPacket.will = {
...this.options.will,
payload: this.options.will?.payload as Buffer,
}
}
if (this.topicAliasRecv) {

@@ -969,5 +916,7 @@ if (!connectPacket.properties) {

callback = opts as DoneCallback
opts = {} as IClientPublishOptions
opts = null
}
opts = opts || {}
// default opts

@@ -1041,5 +990,28 @@ const defaultOpts: IClientPublishOptions = {

}
return this
}
public publishAsync(topic: string, message: string | Buffer): Promise<void>
public publishAsync(
topic: string,
message: string | Buffer,
opts?: IClientPublishOptions,
): Promise<void>
public publishAsync(
topic: string,
message: string | Buffer,
opts?: IClientPublishOptions,
): Promise<void> {
return new Promise((resolve, reject) => {
this.publish(topic, message, opts, (err) => {
if (err) {
reject(err)
} else {
resolve()
}
})
})
}
/**

@@ -1220,3 +1192,3 @@ * subscribe - subscribe to <topic>

if (this.options.reconnectPeriod > 0) {
const topic: ISubscriptioOptions = { qos: sub.qos }
const topic: IClientSubscribeOptions = { qos: sub.qos }
if (version === 5) {

@@ -1267,2 +1239,24 @@ topic.nl = sub.nl || false

public subscribeAsync(
topicObject: string | string[] | ISubscriptionMap,
): Promise<ISubscriptionGrant[]>
public subscribeAsync(
topicObject: string | string[] | ISubscriptionMap,
opts?: IClientSubscribeOptions | IClientSubscribeProperties,
): Promise<ISubscriptionGrant[]>
public subscribeAsync(
topicObject: string | string[] | ISubscriptionMap,
opts?: IClientSubscribeOptions | IClientSubscribeProperties,
): Promise<ISubscriptionGrant[]> {
return new Promise((resolve, reject) => {
this.subscribe(topicObject, opts, (err, granted) => {
if (err) {
reject(err)
} else {
resolve(granted)
}
})
})
}
/**

@@ -1373,2 +1367,22 @@ * unsubscribe - unsubscribe from topic(s)

public unsubscribeAsync(topic: string | string[]): Promise<void>
public unsubscribeAsync(
topic: string | string[],
opts?: IClientSubscribeOptions,
): Promise<void>
public unsubscribeAsync(
topic: string | string[],
opts?: IClientSubscribeOptions,
): Promise<void> {
return new Promise((resolve, reject) => {
this.unsubscribe(topic, opts, (err) => {
if (err) {
reject(err)
} else {
resolve()
}
})
})
}
/**

@@ -1401,16 +1415,9 @@ * end - close connection

if (force == null || typeof force !== 'boolean') {
cb = (opts || this.noop) as DoneCallback
cb = cb || (opts as DoneCallback)
opts = force as Partial<IDisconnectPacket>
force = false
if (typeof opts !== 'object') {
cb = opts
opts = null
if (typeof cb !== 'function') {
cb = this.noop
}
}
}
if (typeof opts !== 'object') {
cb = opts
cb = cb || opts
opts = null

@@ -1420,4 +1427,7 @@ }

this.log('end :: cb? %s', !!cb)
cb = cb || this.noop
if (!cb || typeof cb !== 'function') {
cb = this.noop
}
const closeStores = () => {

@@ -1493,2 +1503,24 @@ this.log('end :: closeStores: closing incoming and outgoing stores')

public endAsync(): Promise<void>
public endAsync(force?: boolean): Promise<void>
public endAsync(opts?: Partial<IDisconnectPacket>): Promise<void>
public endAsync(
force?: boolean,
opts?: Partial<IDisconnectPacket>,
): Promise<void>
public endAsync(
force?: boolean | Partial<IDisconnectPacket>,
opts?: Partial<IDisconnectPacket>,
): Promise<void> {
return new Promise((resolve, reject) => {
this.end(force as boolean, opts, (err) => {
if (err) {
reject(err)
} else {
resolve()
}
})
})
}
/**

@@ -1627,3 +1659,3 @@ * removeOutgoingMessage - remove a message in outgoing store

private _checkDisconnecting(callback: GenericCallback<any>) {
private _checkDisconnecting(callback?: GenericCallback<any>) {
if (this.disconnecting) {

@@ -1630,0 +1662,0 @@ if (callback && callback !== this.noop) {

import { Buffer } from 'buffer'
import { Duplex, Transform } from 'readable-stream'
import { Transform } from 'readable-stream'
import duplexify, { Duplexify } from 'duplexify'

@@ -125,5 +125,5 @@ import { StreamBuilder } from '../shared'

return stream as unknown as Duplex
return stream
}
export default buildStream
/* eslint-disable @typescript-eslint/no-var-requires */
import _debug from 'debug'
import url from 'url'
import MqttClient, { IClientOptions, MqttProtocol } from '../client'
import MqttClient, {
IClientOptions,
MqttClientEventCallbacks,
MqttProtocol,
} from '../client'
import IS_BROWSER from '../is-browser'
import Store from '../store'
import DefaultMessageIdProvider from '../default-message-id-provider'
import UniqueMessageIdProvider from '../unique-message-id-provider'
import _debug from 'debug'
import { StreamBuilder } from '../shared'

@@ -180,2 +181,62 @@

function connectAsync(brokerUrl: string): Promise<MqttClient>
function connectAsync(opts: IClientOptions): Promise<MqttClient>
function connectAsync(
brokerUrl: string,
opts?: IClientOptions,
): Promise<MqttClient>
function connectAsync(
brokerUrl: string | IClientOptions,
opts?: IClientOptions,
allowRetries = true,
): Promise<MqttClient> {
return new Promise((resolve, reject) => {
const client = connect(brokerUrl as string, opts)
const promiseResolutionListeners: Partial<MqttClientEventCallbacks> = {
connect: (connack) => {
removePromiseResolutionListeners()
resolve(client) // Resolve on connect
},
end: () => {
removePromiseResolutionListeners()
resolve(client) // Resolve on end
},
error: (err) => {
removePromiseResolutionListeners()
client.end()
reject(err) // Reject on error
},
}
// If retries are not allowed, reject on close
if (allowRetries === false) {
promiseResolutionListeners.close = () => {
promiseResolutionListeners.error(
new Error("Couldn't connect to server"),
)
}
}
// Remove listeners added to client by this promise
function removePromiseResolutionListeners() {
Object.keys(promiseResolutionListeners).forEach((eventName) => {
client.off(
eventName as keyof MqttClientEventCallbacks,
promiseResolutionListeners[eventName],
)
})
}
// Add listeners to client
Object.keys(promiseResolutionListeners).forEach((eventName) => {
client.on(
eventName as keyof MqttClientEventCallbacks,
promiseResolutionListeners[eventName],
)
})
})
}
export default connect
export { connectAsync }

@@ -20,5 +20,5 @@ import { StreamBuilder } from '../shared'

debug('port %d and host %s', port, host)
return net.createConnection(port, host) as unknown as Duplex
return net.createConnection(port, host)
}
export default buildStream

@@ -5,3 +5,2 @@ import tls from 'tls'

import { StreamBuilder } from '../shared'
import { Duplex } from 'readable-stream'

@@ -30,3 +29,2 @@ const debug = _debug('mqttjs:tls')

const connection = tls.connect(opts)
/* eslint no-use-before-define: [2, "nofunc"] */
connection.on('secureConnect', () => {

@@ -55,5 +53,5 @@ if (opts.rejectUnauthorized && !connection.authorized) {

connection.on('error', handleTLSerrors)
return connection as unknown as Duplex
return connection
}
export default buildStream

@@ -7,3 +7,3 @@ import { StreamBuilder } from '../shared'

import duplexify from 'duplexify'
import { Duplex, DuplexOptions, Transform } from 'readable-stream'
import { DuplexOptions, Transform } from 'readable-stream'
import IS_BROWSER from '../is-browser'

@@ -141,9 +141,8 @@ import MqttClient, { IClientOptions } from '../client'

)
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
webSocketStream.url = url
webSocketStream['url'] = url
socket.on('close', () => {
webSocketStream.destroy()
})
return webSocketStream as Duplex
return webSocketStream
}

@@ -150,0 +149,0 @@

import { StreamBuilder } from '../shared'
import { Buffer } from 'buffer'
import { Duplex, Transform } from 'readable-stream'
import { Transform } from 'readable-stream'
import duplexify, { Duplexify } from 'duplexify'

@@ -138,5 +138,5 @@ import MqttClient, { IClientOptions } from '../client'

return stream as unknown as Duplex
return stream
}
export default buildStream

@@ -17,4 +17,4 @@ export interface IMessageIdProvider {

* If the messageId has already been occupied, then return false.
* @param {Number} num - The messageId to request use.
* @return {Boolean} - If `num` was not occupied, then return true, otherwise return false.
* @param {number} num - The messageId to request use.
* @return {boolean} - If `num` was not occupied, then return true, otherwise return false.
*/

@@ -21,0 +21,0 @@ register(num: number): boolean

@@ -109,3 +109,3 @@ import { IPublishPacket } from 'mqtt-packet'

if (error) {
return client.emit('error', error)
return client.emit('error', error as Error)
}

@@ -147,3 +147,3 @@ if (validReasonCodes.indexOf(code) === -1) {

if (error) {
return client.emit('error', error)
return client.emit('error', error as Error)
}

@@ -150,0 +150,0 @@ if (validReasonCodes.indexOf(code) === -1) {

@@ -0,5 +1,5 @@

import type { Packet } from 'mqtt-packet'
import type internal from 'stream'
import type MqttClient from './client'
import type { IClientOptions } from './client'
import type MqttClient from './client'
import { Packet } from 'mqtt-packet'
import { Duplex, Writable } from 'readable-stream'

@@ -12,3 +12,3 @@ export type DoneCallback = (error?: Error) => void

export type IStream = Duplex | Writable
export type IStream = internal.Duplex

@@ -15,0 +15,0 @@ export type StreamBuilder = (

import { NumberAllocator } from 'number-allocator'
import { IMessageIdProvider } from './default-message-id-provider'

@@ -7,3 +8,3 @@ /**

*/
export default class UniqueMessageIdProvider {
export default class UniqueMessageIdProvider implements IMessageIdProvider {
private numberAllocator: NumberAllocator

@@ -45,3 +46,3 @@

register(messageId: number) {
return this.numberAllocator.use(messageId)
return this.numberAllocator.use(messageId) as boolean
}

@@ -48,0 +49,0 @@

@@ -12,3 +12,3 @@ /*

import Store, { IStore } from './lib/store'
import connect from './lib/connect'
import connect, { connectAsync } from './lib/connect'

@@ -18,2 +18,3 @@ export const Client = MqttClient

connect,
connectAsync,
MqttClient,

@@ -26,1 +27,3 @@ Store,

export * from './lib/client'
export * from './lib/shared'
export { ReasonCodes } from './lib/handlers/ack'

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc