Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

nats

Package Overview
Dependencies
Maintainers
3
Versions
195
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nats - npm Package Compare versions

Comparing version 1.2.2 to 1.2.3

14

index.d.ts
/*
* Copyright 2013-2018 The NATS Authors
* Copyright 2013-2019 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");

@@ -19,2 +19,3 @@ * you may not use this file except in compliance with the License.

export const version: string;

@@ -95,3 +96,2 @@

}
declare class Client extends events.EventEmitter {

@@ -119,4 +119,4 @@ /**

publish(subject: string, callback: Function):void;
publish(subject: string, msg: string | Buffer, callback: Function):void;
publish(subject: string, msg?: string | Buffer, reply?: string, callback?: Function):void;
publish(subject: string, msg: any, callback: Function):void;
publish(subject: string, msg: any, reply: string, callback?: Function):void;

@@ -148,4 +148,4 @@ /**

request(subject: string, callback: Function): number;
request(subject: string, msg: string | Buffer, callback: Function): number;
request(subject: string, msg?: string, options?: SubscribeOptions, callback?: Function): number;
request(subject: string, msg: any, callback: Function): number;
request(subject: string, msg: any, options: SubscribeOptions, callback: Function): number;

@@ -160,3 +160,3 @@ /**

*/
requestOne(subject: string, msg: string | Buffer, options?: SubscribeOptions, timeout?: number, callback?:Function) : number
requestOne(subject: string, msg: any, options: SubscribeOptions, timeout: number, callback?:Function) : number

@@ -163,0 +163,0 @@ /**

/*
* Copyright 2013-2018 The NATS Authors
* Copyright 2013-2019 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");

@@ -15,3 +15,4 @@ * you may not use this file except in compliance with the License.

*/
"use strict";
module.exports = require('./lib/nats');
/*
* Copyright 2013-2018 The NATS Authors
* Copyright 2013-2019 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");

@@ -16,3 +16,4 @@ * you may not use this file except in compliance with the License.

/* jslint node: true */
/* eslint no-sync: 0 */
'use strict';

@@ -23,3 +24,3 @@

*/
var net = require('net'),
const net = require('net'),
tls = require('tls'),

@@ -36,3 +37,3 @@ url = require('url'),

*/
var VERSION = '1.2.2',
const VERSION = '1.2.3',

@@ -172,3 +173,3 @@ DEFAULT_PORT = 4222,

*/
var createInbox = exports.createInbox = function() {
const createInbox = exports.createInbox = function() {
return ("_INBOX." + nuid.next());

@@ -238,5 +239,5 @@ };

function shuffle(array) {
for (var i = array.length - 1; i > 0; i--) {
var j = Math.floor(Math.random() * (i + 1));
var temp = array[i];
for (let i = array.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
const temp = array[i];
array[i] = array[j];

@@ -255,3 +256,3 @@ array[j] = temp;

Client.prototype.parseOptions = function(opts) {
var options = this.options = {
const options = this.options = {
verbose: false,

@@ -324,13 +325,11 @@ pedantic: false,

var client = this;
// Set user/pass as needed if in options.
client.user = options.user;
client.pass = options.pass;
this.user = options.user;
this.pass = options.pass;
// Set token as needed if in options.
client.token = options.token;
this.token = options.token;
// Authentication - make sure authentication is valid.
if (client.user && client.token) {
if (this.user && this.token) {
throw (new NatsError(BAD_AUTHENTICATION_MSG, BAD_AUTHENTICATION));

@@ -341,3 +340,3 @@ }

if (Buffer.isEncoding(options.encoding)) {
client.encoding = options.encoding;
this.encoding = options.encoding;
} else {

@@ -347,17 +346,17 @@ throw new NatsError(INVALID_ENCODING_MSG_PREFIX + options.encoding, INVALID_ENCODING);

// For cluster support
client.servers = [];
this.servers = [];
if (Array.isArray(options.servers)) {
options.servers.forEach(function(server) {
client.servers.push(new Server(url.parse(server)));
options.servers.forEach((server) => {
this.servers.push(new Server(url.parse(server)));
});
// Randomize if needed
if (options.noRandomize !== true) {
shuffle(client.servers);
shuffle(this.servers);
}
// if they gave an URL we should add it if different
if (options.url !== undefined && client.servers.indexOf(options.url) === -1) {
if (options.url !== undefined && this.servers.indexOf(options.url) === -1) {
// Make url first element so it is attempted first
client.servers.unshift(new Server(url.parse(options.url)));
this.servers.unshift(new Server(url.parse(options.url)));
}

@@ -368,3 +367,3 @@ } else {

}
client.servers.push(new Server(url.parse(options.url)));
this.servers.push(new Server(url.parse(options.url)));
}

@@ -374,3 +373,3 @@ // If we are not setup for tls, but were handed a url with a tls:// prefix

if (options.tls === false) {
client.servers.forEach(function(server) {
this.servers.forEach((server) => {
if (server.url.protocol === 'tls' || server.url.protocol === 'tls:') {

@@ -388,3 +387,3 @@ options.tls = true;

}
var u = url.parse(host);
const u = url.parse(host);
if (u.port === null || u.port == '') {

@@ -423,24 +422,23 @@ host += ":" + DEFAULT_PORT;

Client.prototype.selectServer = function() {
var client = this;
var server = client.servers.shift();
const server = this.servers.shift();
// Place in client context.
client.currentServer = server;
client.url = server.url;
this.currentServer = server;
this.url = server.url;
if ('auth' in server.url && !!server.url.auth) {
var auth = server.url.auth.split(':');
const auth = server.url.auth.split(':');
if (auth.length !== 1) {
if (client.options.user === undefined) {
client.user = auth[0];
if (this.options.user === undefined) {
this.user = auth[0];
}
if (client.options.pass === undefined) {
client.pass = auth[1];
if (this.options.pass === undefined) {
this.pass = auth[1];
}
} else {
if (client.options.token === undefined) {
client.token = auth[0];
if (this.options.token === undefined) {
this.token = auth[0];
}
}
}
client.servers.push(server);
this.servers.push(server);
};

@@ -490,4 +488,4 @@

Client.prototype.loadUserJWT = function() {
var contents = fs.readFileSync(this.options.usercreds);
var m = CREDS.exec(contents); // jwt
const contents = fs.readFileSync(this.options.usercreds);
const m = CREDS.exec(contents); // jwt
if (m === null) {

@@ -508,6 +506,6 @@ this.emit('error', new NatsError(NO_USER_JWT_IN_CREDS_MSG, NO_USER_JWT_IN_CREDS));

Client.prototype.loadKeyAndSignNonce = function(nonce) {
var contents = fs.readFileSync(this.options.usercreds);
var re = new RegExp(CREDS, 'g');
const contents = fs.readFileSync(this.options.usercreds);
const re = new RegExp(CREDS, 'g');
re.exec(contents); // jwt
var m = re.exec(contents); // seed
const m = re.exec(contents); // seed
if (m === null) {

@@ -518,3 +516,3 @@ this.emit('error', new NatsError(NO_SEED_IN_CREDS_MSG, NO_SEED_IN_CREDS));

}
var sk = nkeys.fromSeed(Buffer.from(m[1]));
const sk = nkeys.fromSeed(Buffer.from(m[1]));
return sk.sign(nonce);

@@ -555,3 +553,3 @@ };

// For now we will not capture an error on file not found etc.
var contents = fs.readFileSync(this.options.usercreds);
const contents = fs.readFileSync(this.options.usercreds);
if (CREDS.exec(contents) === null) {

@@ -563,3 +561,3 @@ this.emit('error', new NatsError(BAD_CREDS_MSG, BAD_CREDS));

// We have a valid file, set up callback handlers.
var client = this;
const client = this;
this.options.sig = function(nonce) {

@@ -599,4 +597,4 @@ return client.loadKeyAndSignNonce(nonce);

Client.prototype.connectCB = function() {
var wasReconnecting = this.reconnecting;
var event = (wasReconnecting === true) ? 'reconnect' : 'connect';
const wasReconnecting = this.reconnecting;
const event = (wasReconnecting === true) ? 'reconnect' : 'connect';
this.reconnecting = false;

@@ -612,3 +610,15 @@ this.reconnects = 0;

/**
* @api private
*/
Client.prototype.cancelHeartbeat = function() {
if (this.pingTimer) {
clearTimeout(this.pingTimer);
delete this.pingTimer;
}
};
/**
* @api private
*/
Client.prototype.scheduleHeartbeat = function() {

@@ -650,4 +660,3 @@ this.pingTimer = setTimeout(function(client) {

Client.prototype.setupHandlers = function() {
var client = this;
var stream = client.stream;
const stream = this.stream;

@@ -658,26 +667,26 @@ if (undefined === stream) {

stream.on('connect', function() {
if (client.pingTimer) {
clearTimeout(client.pingTimer);
delete client.pingTimer;
}
client.connected = true;
client.scheduleHeartbeat();
stream.on('connect', () => {
this.cancelHeartbeat();
this.connected = true;
this.scheduleHeartbeat();
});
stream.on('close', function(hadError) {
client.closeStream();
client.emit('disconnect');
if (client.closed === true ||
client.options.reconnect === false ||
((client.reconnects >= client.options.maxReconnectAttempts) && client.options.maxReconnectAttempts !== -1)) {
client.emit('close');
stream.on('close', (hadError) => {
this.closeStream();
if(stream.bytesRead > 0) {
this.emit('disconnect');
}
if (this.closed === true ||
this.options.reconnect === false ||
((this.reconnects >= this.options.maxReconnectAttempts) && this.options.maxReconnectAttempts !== -1)) {
this.cleanupTimers();
this.emit('close');
} else {
client.scheduleReconnect();
this.scheduleReconnect();
}
});
stream.on('error', function(exception) {
stream.on('error', (exception) => {
// If we were connected just return, close event will process
if (client.wasConnected === true && client.currentServer.didConnect === true) {
if (this.wasConnected === true && this.currentServer.didConnect === true) {
return;

@@ -688,11 +697,11 @@ }

// general have not connected to any server, remove it from
// this list. Unless overidden
if (client.wasConnected === false && client.currentServer.didConnect === false) {
// this list. Unless overridden
if (this.wasConnected === false && this.currentServer.didConnect === false) {
// We can override this behavior with waitOnFirstConnect, which will
// treat it like a reconnect scenario.
if (client.options.waitOnFirstConnect) {
if (this.options.waitOnFirstConnect) {
// Pretend to move us into a reconnect state.
client.currentServer.didConnect = true;
this.currentServer.didConnect = true;
} else {
client.servers.splice(client.servers.length - 1, 1);
this.servers.splice(this.servers.length - 1, 1);
}

@@ -703,20 +712,20 @@ }

// to the server and we only have one.
if (client.wasConnected === false && client.servers.length === 0) {
client.emit('error', new NatsError(CONN_ERR_MSG_PREFIX + exception, CONN_ERR, exception));
if (this.wasConnected === false && this.servers.length === 0) {
this.emit('error', new NatsError(CONN_ERR_MSG_PREFIX + exception, CONN_ERR, exception));
}
client.closeStream();
this.closeStream();
});
stream.on('data', function(data) {
stream.on('data', (data) => {
// If inbound exists, concat them together. We try to avoid this for split
// messages, so this should only really happen for a split control line.
// Long term answer is hand rolled parser and not regexp.
if (client.inbound) {
client.inbound = Buffer.concat([client.inbound, data]);
if (this.inbound) {
this.inbound = Buffer.concat([this.inbound, data]);
} else {
client.inbound = data;
this.inbound = data;
}
// Process the inbound queue.
client.processInbound();
this.processInbound();
});

@@ -733,3 +742,3 @@ };

// Queue the connect command.
var cs = {
const cs = {
'lang': 'node',

@@ -742,3 +751,3 @@ 'version': VERSION,

if (this.info.nonce !== undefined && this.options.sig !== undefined) {
var sig = this.options.sig(Buffer.from(this.info.nonce));
const sig = this.options.sig(Buffer.from(this.info.nonce));
cs.sig = sig.toString('base64');

@@ -792,13 +801,12 @@ }

// wanted to ensure their messages reach the server.
var pong = [];
var pend = [];
var pSize = 0;
var client = this;
if (client.pending !== null) {
var pongIndex = 0;
client.pending.forEach(function(cmd) {
var cmdLen = Buffer.isBuffer(cmd) ? cmd.length : Buffer.byteLength(cmd);
if (cmd === PING_REQUEST && client.pongs !== null && pongIndex < client.pongs.length) {
const pong = [];
const pend = [];
let pSize = 0;
if (this.pending !== null) {
let pongIndex = 0;
this.pending.forEach((cmd) => {
const cmdLen = Buffer.isBuffer(cmd) ? cmd.length : Buffer.byteLength(cmd);
if (cmd === PING_REQUEST && this.pongs !== null && pongIndex < this.pongs.length) {
// filter out any useless ping requests (no pong callback, nop flush)
var p = client.pongs[pongIndex++];
const p = this.pongs[pongIndex++];
if (p !== undefined) {

@@ -865,6 +873,3 @@ pend.push(cmd);

Client.prototype.close = function() {
if (this.pingTimer) {
clearTimeout(this.pingTimer);
delete this.pingTimer;
}
this.cleanupTimers();
this.closed = true;

@@ -882,2 +887,30 @@ this.removeAllListeners();

/**
* Cancels all the timers, ping, subs, requests.
* Should only be called on a close.
* @api private
*/
Client.prototype.cleanupTimers = function() {
this.cancelHeartbeat();
if(this.respmux && this.respmux.requestMap) {
for (const p in this.respmux.requestMap) {
if (this.respmux.requestMap.hasOwnProperty(p)) {
this.cancelMuxRequest(p);
}
}
}
if(this.subs) {
for (const p in this.subs) {
if (this.subs.hasOwnProperty(p)) {
const sub = this.subs[p];
if(sub.timeout) {
clearTimeout(sub.timeout);
delete sub.timeout;
}
}
}
}
};
/**
* Close down the stream and clear state.

@@ -915,7 +948,6 @@ *

var client = this;
var write = function(data) {
client.pending = [];
client.pSize = 0;
return client.stream.write(data);
const write = (data) => {
this.pending = [];
this.pSize = 0;
return this.stream.write(data);
};

@@ -927,4 +959,4 @@ if (!this.pBufs) {

// We have some or all Buffers. Figure out if we can optimize.
var allBufs = true;
for (var i = 0; i < this.pending.length; i++) {
let allBufs = true;
for (let i = 0; i < this.pending.length; i++) {
if (!Buffer.isBuffer(this.pending[i])) {

@@ -940,7 +972,7 @@ allBufs = false;

// We have a mix, so write each one individually.
var pending = this.pending;
const pending = this.pending;
this.pending = [];
this.pSize = 0;
var result = true;
for (i = 0; i < pending.length; i++) {
let result = true;
for (let i = 0; i < pending.length; i++) {
result = this.stream.write(pending[i]) && result;

@@ -960,6 +992,6 @@ }

Client.prototype.stripPendingSubs = function() {
var pending = this.pending;
const pending = this.pending;
this.pending = [];
this.pSize = 0;
for (var i = 0; i < pending.length; i++) {
for (let i = 0; i < pending.length; i++) {
if (!SUBRE.test(pending[i])) {

@@ -996,3 +1028,3 @@ // Re-queue the command.

if (this.pending.length === 1) {
var self = this;
const self = this;
setImmediate(function() {

@@ -1014,7 +1046,7 @@ self.flushPending();

Client.prototype.sendSubscriptions = function() {
var protos = "";
for (var sid in this.subs) {
let protos = "";
for (const sid in this.subs) {
if (this.subs.hasOwnProperty(sid)) {
var sub = this.subs[sid];
var proto;
const sub = this.subs[sid];
let proto;
if (sub.qgroup) {

@@ -1039,11 +1071,9 @@ proto = [SUB, sub.subject, sub.qgroup, sid + CR_LF];

Client.prototype.processInbound = function() {
var client = this;
// Hold any regex matches.
var m;
let m;
// For optional yield
var start;
let start;
if (!client.stream) {
if (!this.stream) {
// if we are here, the stream was reaped and errors raised

@@ -1055,19 +1085,18 @@ // if we continue.

// FIXME(dlc) client.stream.isPaused() causes 0.10 to fail
client.stream.resume();
this.stream.resume();
/* jshint -W083 */
if (client.options.yieldTime !== undefined) {
if (this.options.yieldTime !== undefined) {
start = Date.now();
}
while (!client.closed && client.inbound && client.inbound.length > 0) {
switch (client.pstate) {
case AWAITING_CONTROL:
while (!this.closed && this.inbound && this.inbound.length > 0) {
switch (this.pstate) {
case AWAITING_CONTROL: {
// Regex only works on strings, so convert once to be more efficient.
// Long term answer is a hand rolled parser, not regex.
var buf = client.inbound.toString('binary', 0, MAX_CONTROL_LINE_SIZE);
const buf = this.inbound.toString('binary', 0, MAX_CONTROL_LINE_SIZE);
if ((m = MSG.exec(buf)) !== null) {
client.payload = {
this.payload = {
subj: m[1],

@@ -1078,12 +1107,12 @@ sid: parseInt(m[2], 10),

};
client.payload.psize = client.payload.size + CR_LF_LEN;
client.pstate = AWAITING_MSG_PAYLOAD;
this.payload.psize = this.payload.size + CR_LF_LEN;
this.pstate = AWAITING_MSG_PAYLOAD;
} else if ((m = OK.exec(buf)) !== null) {
// Ignore for now..
} else if ((m = ERR.exec(buf)) !== null) {
client.processErr(m[1]);
this.processErr(m[1]);
return;
} else if ((m = PONG.exec(buf)) !== null) {
client.pout = 0;
var cb = client.pongs && client.pongs.shift();
this.pout = 0;
const cb = this.pongs && this.pongs.shift();
if (cb) {

@@ -1093,15 +1122,15 @@ cb();

} else if ((m = PING.exec(buf)) !== null) {
client.sendCommand(PONG_RESPONSE);
this.sendCommand(PONG_RESPONSE);
} else if ((m = INFO.exec(buf)) !== null) {
client.info = JSON.parse(m[1]);
this.info = JSON.parse(m[1]);
// Always try to read the connect_urls from info
client.processServerUpdate();
this.processServerUpdate();
// Process first INFO
if (client.infoReceived === false) {
if (this.infoReceived === false) {
// Check on TLS mismatch.
if (client.checkTLSMismatch() === true) {
if (this.checkTLSMismatch() === true) {
return;
}
if (client.checkNkeyMismatch() === true) {
if (this.checkNkeyMismatch() === true) {
return;

@@ -1111,34 +1140,34 @@ }

// Switch over to TLS as needed.
if (client.info.tls_required === true) {
var tlsOpts = {
socket: client.stream
if (this.info.tls_required === true) {
const tlsOpts = {
socket: this.stream
};
if ('object' === typeof client.options.tls) {
for (var key in client.options.tls) {
tlsOpts[key] = client.options.tls[key];
if ('object' === typeof this.options.tls) {
for (const key in this.options.tls) {
tlsOpts[key] = this.options.tls[key];
}
}
// if we have a stream, this is from an old connection, reap it
if (client.stream) {
client.stream.removeAllListeners();
if (this.stream) {
this.stream.removeAllListeners();
}
client.stream = tls.connect(tlsOpts, function() {
client.flushPending();
this.stream = tls.connect(tlsOpts, () => {
this.flushPending();
});
client.setupHandlers();
this.setupHandlers();
}
// Send the connect message and subscriptions immediately
client.sendConnect();
client.sendSubscriptions();
this.sendConnect();
this.sendSubscriptions();
client.pongs.unshift(function() {
client.connectCB();
this.pongs.unshift(() => {
this.connectCB();
});
client.stream.write(PING_REQUEST);
this.stream.write(PING_REQUEST);
// Mark as received
client.infoReceived = true;
client.stripPendingSubs();
client.flushPending();
this.infoReceived = true;
this.stripPendingSubs();
this.flushPending();
}

@@ -1151,4 +1180,5 @@ } else {

break;
}
case AWAITING_MSG_PAYLOAD:
case AWAITING_MSG_PAYLOAD: {

@@ -1160,9 +1190,9 @@ // If we do not have the complete message, hold onto the chunks

// simple concat above.
if (client.inbound.length < client.payload.psize) {
if (undefined === client.payload.chunks) {
client.payload.chunks = [];
if (this.inbound.length < this.payload.psize) {
if (undefined === this.payload.chunks) {
this.payload.chunks = [];
}
client.payload.chunks.push(client.inbound);
client.payload.psize -= client.inbound.length;
client.inbound = null;
this.payload.chunks.push(this.inbound);
this.payload.psize -= this.inbound.length;
this.inbound = null;
return;

@@ -1173,12 +1203,12 @@ }

// Check to see if we have existing chunks
if (client.payload.chunks) {
if (this.payload.chunks) {
client.payload.chunks.push(client.inbound.slice(0, client.payload.psize));
this.payload.chunks.push(this.inbound.slice(0, this.payload.psize));
// don't append trailing control characters
var mbuf = Buffer.concat(client.payload.chunks, client.payload.size);
const mbuf = Buffer.concat(this.payload.chunks, this.payload.size);
if (client.options.preserveBuffers) {
client.payload.msg = mbuf;
if (this.options.preserveBuffers) {
this.payload.msg = mbuf;
} else {
client.payload.msg = mbuf.toString(client.encoding);
this.payload.msg = mbuf.toString(this.encoding);
}

@@ -1188,6 +1218,6 @@

if (client.options.preserveBuffers) {
client.payload.msg = client.inbound.slice(0, client.payload.size);
if (this.options.preserveBuffers) {
this.payload.msg = this.inbound.slice(0, this.payload.size);
} else {
client.payload.msg = client.inbound.toString(client.encoding, 0, client.payload.size);
this.payload.msg = this.inbound.toString(this.encoding, 0, this.payload.size);
}

@@ -1198,20 +1228,20 @@

// Eat the size of the inbound that represents the message.
if (client.inbound.length === client.payload.psize) {
client.inbound = null;
if (this.inbound.length === this.payload.psize) {
this.inbound = null;
} else {
client.inbound = client.inbound.slice(client.payload.psize);
this.inbound = this.inbound.slice(this.payload.psize);
}
// process the message
client.processMsg();
this.processMsg();
// Reset
client.pstate = AWAITING_CONTROL;
client.payload = null;
this.pstate = AWAITING_CONTROL;
this.payload = null;
// Check to see if we have an option to yield for other events after yieldTime.
if (start !== undefined) {
if ((Date.now() - start) > client.options.yieldTime) {
client.stream.pause();
setImmediate(client.processInbound.bind(this));
if ((Date.now() - start) > this.options.yieldTime) {
this.stream.pause();
setImmediate(this.processInbound.bind(this));
return;

@@ -1221,2 +1251,3 @@ }

break;
}
}

@@ -1227,7 +1258,7 @@

// Chop inbound
var psize = m[0].length;
if (psize >= client.inbound.length) {
client.inbound = null;
const psize = m[0].length;
if (psize >= this.inbound.length) {
this.inbound = null;
} else {
client.inbound = client.inbound.slice(psize);
this.inbound = this.inbound.slice(psize);
}

@@ -1244,9 +1275,8 @@ }

Client.prototype.processServerUpdate = function() {
var client = this;
if (client.info.connect_urls && client.info.connect_urls.length > 0) {
if (this.info.connect_urls && this.info.connect_urls.length > 0) {
// parse the infos
var tmp = {};
client.info.connect_urls.forEach(function(server) {
var u = 'nats://' + server;
var s = new Server(url.parse(u));
const tmp = {};
this.info.connect_urls.forEach((server) => {
const u = 'nats://' + server;
const s = new Server(url.parse(u));
// implicit servers are ones added via the info connect_urls

@@ -1258,6 +1288,6 @@ s.implicit = true;

// remove implicit servers that are no longer reported
var toDelete = [];
client.servers.forEach(function(s, index) {
var u = s.url.href;
if (s.implicit && client.currentServer.url.href !== u && tmp[u] === undefined) {
const toDelete = [];
this.servers.forEach((s, index) => {
const u = s.url.href;
if (s.implicit && this.currentServer.url.href !== u && tmp[u] === undefined) {
// server was removed

@@ -1272,11 +1302,11 @@ toDelete.push(index);

toDelete.reverse();
toDelete.forEach(function(index) {
client.servers.splice(index, 1);
toDelete.forEach((index) => {
this.servers.splice(index, 1);
});
// remaining servers are new
var newURLs = [];
for (var k in tmp) {
const newURLs = [];
for (const k in tmp) {
if (tmp.hasOwnProperty(k)) {
client.servers.push(tmp[k]);
this.servers.push(tmp[k]);
newURLs.push(k);

@@ -1288,5 +1318,5 @@ }

// new reported servers useful for tests
client.emit('serversDiscovered', newURLs);
this.emit('serversDiscovered', newURLs);
// simpler version
client.emit('servers', newURLs);
this.emit('servers', newURLs);
}

@@ -1302,3 +1332,3 @@ }

Client.prototype.processMsg = function() {
var sub = this.subs[this.payload.sid];
const sub = this.subs[this.payload.sid];
if (sub !== undefined) {

@@ -1326,3 +1356,3 @@ sub.received += 1;

if (sub.callback) {
var msg = this.payload.msg;
let msg = this.payload.msg;
if (this.options.json) {

@@ -1356,3 +1386,3 @@ try {

// except stale connection and permission errors
var m = s ? s.toLowerCase() : '';
const m = s ? s.toLowerCase() : '';
if (m.indexOf(STALE_CONNECTION_ERR) !== -1) {

@@ -1455,3 +1485,3 @@ // closeStream() triggers a reconnect if allowed

// Hold PUB SUB [REPLY]
var psub;
let psub;
if (opt_reply === undefined) {

@@ -1465,3 +1495,3 @@ psub = 'PUB ' + subject + SPC;

if (!Buffer.isBuffer(msg)) {
var str = msg;
let str = msg;
if (this.options.json) {

@@ -1476,4 +1506,4 @@ try {

} else {
var b = Buffer.allocUnsafe(psub.length + msg.length + (2 * CR_LF_LEN) + msg.length.toString().length);
var len = b.write(psub + msg.length + CR_LF);
const b = Buffer.allocUnsafe(psub.length + msg.length + (2 * CR_LF_LEN) + msg.length.toString().length);
const len = b.write(psub + msg.length + CR_LF);
msg.copy(b, len);

@@ -1505,3 +1535,3 @@ b.write(CR_LF, len + msg.length);

}
var qgroup, max;
let qgroup, max;
if (typeof opts === 'function') {

@@ -1522,3 +1552,3 @@ callback = opts;

var proto;
let proto;
if (typeof qgroup === 'string') {

@@ -1562,3 +1592,3 @@ this.subs[this.ssid].qgroup = qgroup;

var proto;
let proto;
if (opt_max) {

@@ -1571,3 +1601,3 @@ proto = [UNSUB, sid, opt_max + CR_LF];

var sub = this.subs[sid];
const sub = this.subs[sid];
if (sub === undefined) {

@@ -1605,6 +1635,6 @@ return;

}
var sub = null;
let sub = null;
// check the sid is not a mux sid - which is always negative
if (sid < 0) {
var conf = this.getMuxRequestConfig(sid);
const conf = this.getMuxRequestConfig(sid);
if (conf && conf.timeout) {

@@ -1621,7 +1651,6 @@ // clear auto-set timeout

sub.expected = expected;
var that = this;
sub.timeout = setTimeout(function() {
sub.timeout = setTimeout(() => {
callback(sid);
// if callback fails unsubscribe will leak
that.unsubscribe(sid);
this.unsubscribe(sid);
}, timeout);

@@ -1668,12 +1697,11 @@ }

opt_options = opt_options || {};
var conf = this.initMuxRequestDetails(callback, opt_options.max);
const conf = this.initMuxRequestDetails(callback, opt_options.max);
this.publish(subject, opt_msg, conf.inbox);
if (opt_options.timeout) {
var client = this;
conf.timeout = setTimeout(function() {
conf.timeout = setTimeout(() => {
if (conf.callback) {
conf.callback(new NatsError(REQ_TIMEOUT_MSG_PREFIX + conf.id, REQ_TIMEOUT));
}
client.cancelMuxRequest(conf.token);
this.cancelMuxRequest(conf.token);
}, opt_options.timeout);

@@ -1700,4 +1728,4 @@ }

}
var inbox = createInbox();
var s = this.subscribe(inbox, opt_options, function(msg, reply) {
const inbox = this.createInbox();
const s = this.subscribe(inbox, opt_options, function(msg, reply) {
callback(msg, reply);

@@ -1771,8 +1799,7 @@ });

if (!this.respmux) {
var client = this;
var inbox = createInbox();
var ginbox = inbox + ".*";
var sid = this.subscribe(ginbox, function(msg, reply, subject) {
var token = client.extractToken(subject);
var conf = client.getMuxRequestConfig(token);
const inbox = this.createInbox();
const ginbox = inbox + ".*";
const sid = this.subscribe(ginbox, (msg, reply, subject) => {
const token = this.extractToken(subject);
const conf = this.getMuxRequestConfig(token);
if (conf) {

@@ -1785,4 +1812,4 @@ if (conf.callback) {

if (conf.received >= conf.expected) {
client.cancelMuxRequest(token);
client.emit('unsubscribe', sid, subject);
this.cancelMuxRequest(token);
this.emit('unsubscribe', sid, subject);
}

@@ -1808,7 +1835,7 @@ }

Client.prototype.initMuxRequestDetails = function(callback, expected) {
var ginbox = this.createResponseMux();
var token = nuid.next();
var inbox = ginbox + '.' + token;
const ginbox = this.createResponseMux();
const token = nuid.next();
const inbox = ginbox + '.' + token;
var conf = {
const conf = {
token: token,

@@ -1836,6 +1863,6 @@ callback: callback,

if (typeof token === 'number') {
var entry = null;
for (var p in this.respmux.requestMap) {
let entry = null;
for (const p in this.respmux.requestMap) {
if (this.respmux.requestMap.hasOwnProperty(p)) {
var v = this.respmux.requestMap[p];
const v = this.respmux.requestMap[p];
if (v.id === token) {

@@ -1860,3 +1887,3 @@ entry = v;

Client.prototype.cancelMuxRequest = function(token) {
var conf = this.getMuxRequestConfig(token);
const conf = this.getMuxRequestConfig(token);
if (conf) {

@@ -1893,3 +1920,3 @@ if (conf.timeout) {

var sid = this.request(subject, opt_msg, opt_options, callback);
const sid = this.request(subject, opt_msg, opt_options, callback);
this.timeout(sid, timeout, 1, function() {

@@ -1933,5 +1960,4 @@ callback(new NatsError(REQ_TIMEOUT_MSG_PREFIX + sid, REQ_TIMEOUT));

Client.prototype.scheduleReconnect = function() {
var client = this;
// Just return if no more servers
if (client.servers.length === 0) {
if (this.servers.length === 0) {
return;

@@ -1941,13 +1967,13 @@ }

// for the first time.
if (client.wasConnected === true) {
client.reconnecting = true;
if (this.wasConnected === true) {
this.reconnecting = true;
}
// Only stall if we have connected before.
var wait = 0;
if (client.servers[0].didConnect === true) {
let wait = 0;
if (this.servers[0].didConnect === true) {
wait = this.options.reconnectTimeWait;
}
setTimeout(function() {
client.reconnect();
setTimeout(() => {
this.reconnect();
}, wait);
};
{
"name": "nats",
"version": "1.2.2",
"version": "1.2.3",
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system",

@@ -33,11 +33,11 @@ "keywords": [

"depcheck:unused": "dependency-check ./package.json --unused --no-dev lib/*",
"test:unit": "mkdir -p reports/ && NODE_ENV=test multi='spec=- xunit=reports/mocha-xunit.xml' istanbul cover _mocha -- -R mocha-multi --timeout 10000 --slow 750 && istanbul check-coverage",
"test:unit": "mkdir -p reports/ && NODE_ENV=test multi='spec=- xunit=reports/mocha-xunit.xml' nyc mocha --timeout 10000 --slow 750",
"test": "npm run depcheck && npm run depcheck:unused && npm run lint && npm run test:unit",
"coveralls": "npm run cover -- --report lcovonly && cat ./reports/coverage/lcov.info | coveralls",
"cover": "istanbul cover _mocha",
"lint": "if-node-version >=4 eslint ./lib ./examples ./benchmark ./test",
"coveralls": "npm run test && nyc report --reporter=text-lcov | coveralls",
"cover": "nyc report --reporter=html && open coverage/index.html",
"lint": "eslint ./lib ./examples ./benchmark ./test",
"fmt": "js-beautify -n --config crockford.jscsrc -r lib/* test/*.js test/support/*.js examples/* benchmark/*.js"
},
"engines": {
"node": ">= 4.5.0"
"node": ">= 6.5.0"
},

@@ -49,2 +49,3 @@ "dependencies": {

"devDependencies": {
"@types/node": "^11.11.6",
"minimist": "^1.2.0",

@@ -54,4 +55,3 @@ "coveralls": "^3.0.2",

"eslint": "^5.10.0",
"if-node-version": "^1.1.1",
"istanbul": "^0.4.5",
"nyc": "^13.3.0",
"js-beautify": "^1.6.12",

@@ -65,3 +65,13 @@ "jshint": "^2.9.6",

},
"typings": "./index.d.ts"
"typings": "./index.d.ts",
"nyc": {
"include": [
"lib/**"
],
"exclude": [
"test/**",
"examples/**",
"benchmark/**"
]
}
}

@@ -9,2 +9,3 @@ # NATS - Node.js Client

[![npm](https://img.shields.io/npm/v/nats.svg)](https://www.npmjs.com/package/nats)
[![npm](https://img.shields.io/npm/dt/nats.svg)](https://www.npmjs.com/package/nats)
[![npm](https://img.shields.io/npm/dm/nats.svg)](https://www.npmjs.com/package/nats)

@@ -68,2 +69,57 @@

## JSON
The `json` connect property makes it easier to exchange JSON data with other
clients.
```javascript
var nc = NATS.connect({json: true});
nc.on('connect', function() {
nc.on('error', function(err) {
console.log(err);
});
nc.subscribe("greeting", function(msg, reply) {
// msg is a parsed JSON object object
if(msg.name && msg.reply) {
nc.publish(reply, {greeting: "hello " + msg.name});
}
});
// As with all inputs from unknown sources, if you don't trust the data
// you should verify it prior to accessing it. While JSON is safe because
// it doesn't export functions, it is still possible for a client to
// cause issues to a downstream consumer that is not written carefully
nc.subscribe("unsafe", function(msg) {
// for example a client could inject a bogus `toString` property
// which could cause your client to crash should you try to
// concatenation with the `+` like this:
// console.log("received", msg + "here");
// `TypeError: Cannot convert object to primitive value`
// Note that simple `console.log(msg)` is fine.
if (msg.hasOwnProperty('toString')) {
console.log('tricky - trying to crash me:', msg.toString);
return;
}
// of course this is no different than using a value that is
// expected in one format (say a number), but the client provides
// a string:
if (isNaN(msg.amount) === false) {
// do something with the number
}
//...
});
// the bad guy
nc.publish("unsafe", {toString: "no good"});
nc.flush(function() {
nc.close();
});
});
```
## Wildcard Subscriptions

@@ -158,3 +214,3 @@

// each time that NATS connects.
var nc = NATS.connect('connect.ngs.global', NATS.creds("./myid.creds");
var nc = NATS.connect('connect.ngs.global', NATS.creds("./myid.creds"));

@@ -161,0 +217,0 @@ // Setting nkey and signing callback directly.

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc