Comparing version 1.2.2 to 1.2.3
/* | ||
* Copyright 2013-2018 The NATS Authors | ||
* Copyright 2013-2019 The NATS Authors | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
@@ -19,2 +19,3 @@ * you may not use this file except in compliance with the License. | ||
export const version: string; | ||
@@ -95,3 +96,2 @@ | ||
} | ||
declare class Client extends events.EventEmitter { | ||
@@ -119,4 +119,4 @@ /** | ||
publish(subject: string, callback: Function):void; | ||
publish(subject: string, msg: string | Buffer, callback: Function):void; | ||
publish(subject: string, msg?: string | Buffer, reply?: string, callback?: Function):void; | ||
publish(subject: string, msg: any, callback: Function):void; | ||
publish(subject: string, msg: any, reply: string, callback?: Function):void; | ||
@@ -148,4 +148,4 @@ /** | ||
request(subject: string, callback: Function): number; | ||
request(subject: string, msg: string | Buffer, callback: Function): number; | ||
request(subject: string, msg?: string, options?: SubscribeOptions, callback?: Function): number; | ||
request(subject: string, msg: any, callback: Function): number; | ||
request(subject: string, msg: any, options: SubscribeOptions, callback: Function): number; | ||
@@ -160,3 +160,3 @@ /** | ||
*/ | ||
requestOne(subject: string, msg: string | Buffer, options?: SubscribeOptions, timeout?: number, callback?:Function) : number | ||
requestOne(subject: string, msg: any, options: SubscribeOptions, timeout: number, callback?:Function) : number | ||
@@ -163,0 +163,0 @@ /** |
/* | ||
* Copyright 2013-2018 The NATS Authors | ||
* Copyright 2013-2019 The NATS Authors | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
@@ -15,3 +15,4 @@ * you may not use this file except in compliance with the License. | ||
*/ | ||
"use strict"; | ||
module.exports = require('./lib/nats'); |
522
lib/nats.js
/* | ||
* Copyright 2013-2018 The NATS Authors | ||
* Copyright 2013-2019 The NATS Authors | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
@@ -16,3 +16,4 @@ * you may not use this file except in compliance with the License. | ||
/* jslint node: true */ | ||
/* eslint no-sync: 0 */ | ||
'use strict'; | ||
@@ -23,3 +24,3 @@ | ||
*/ | ||
var net = require('net'), | ||
const net = require('net'), | ||
tls = require('tls'), | ||
@@ -36,3 +37,3 @@ url = require('url'), | ||
*/ | ||
var VERSION = '1.2.2', | ||
const VERSION = '1.2.3', | ||
@@ -172,3 +173,3 @@ DEFAULT_PORT = 4222, | ||
*/ | ||
var createInbox = exports.createInbox = function() { | ||
const createInbox = exports.createInbox = function() { | ||
return ("_INBOX." + nuid.next()); | ||
@@ -238,5 +239,5 @@ }; | ||
function shuffle(array) { | ||
for (var i = array.length - 1; i > 0; i--) { | ||
var j = Math.floor(Math.random() * (i + 1)); | ||
var temp = array[i]; | ||
for (let i = array.length - 1; i > 0; i--) { | ||
const j = Math.floor(Math.random() * (i + 1)); | ||
const temp = array[i]; | ||
array[i] = array[j]; | ||
@@ -255,3 +256,3 @@ array[j] = temp; | ||
Client.prototype.parseOptions = function(opts) { | ||
var options = this.options = { | ||
const options = this.options = { | ||
verbose: false, | ||
@@ -324,13 +325,11 @@ pedantic: false, | ||
var client = this; | ||
// Set user/pass as needed if in options. | ||
client.user = options.user; | ||
client.pass = options.pass; | ||
this.user = options.user; | ||
this.pass = options.pass; | ||
// Set token as needed if in options. | ||
client.token = options.token; | ||
this.token = options.token; | ||
// Authentication - make sure authentication is valid. | ||
if (client.user && client.token) { | ||
if (this.user && this.token) { | ||
throw (new NatsError(BAD_AUTHENTICATION_MSG, BAD_AUTHENTICATION)); | ||
@@ -341,3 +340,3 @@ } | ||
if (Buffer.isEncoding(options.encoding)) { | ||
client.encoding = options.encoding; | ||
this.encoding = options.encoding; | ||
} else { | ||
@@ -347,17 +346,17 @@ throw new NatsError(INVALID_ENCODING_MSG_PREFIX + options.encoding, INVALID_ENCODING); | ||
// For cluster support | ||
client.servers = []; | ||
this.servers = []; | ||
if (Array.isArray(options.servers)) { | ||
options.servers.forEach(function(server) { | ||
client.servers.push(new Server(url.parse(server))); | ||
options.servers.forEach((server) => { | ||
this.servers.push(new Server(url.parse(server))); | ||
}); | ||
// Randomize if needed | ||
if (options.noRandomize !== true) { | ||
shuffle(client.servers); | ||
shuffle(this.servers); | ||
} | ||
// if they gave an URL we should add it if different | ||
if (options.url !== undefined && client.servers.indexOf(options.url) === -1) { | ||
if (options.url !== undefined && this.servers.indexOf(options.url) === -1) { | ||
// Make url first element so it is attempted first | ||
client.servers.unshift(new Server(url.parse(options.url))); | ||
this.servers.unshift(new Server(url.parse(options.url))); | ||
} | ||
@@ -368,3 +367,3 @@ } else { | ||
} | ||
client.servers.push(new Server(url.parse(options.url))); | ||
this.servers.push(new Server(url.parse(options.url))); | ||
} | ||
@@ -374,3 +373,3 @@ // If we are not setup for tls, but were handed a url with a tls:// prefix | ||
if (options.tls === false) { | ||
client.servers.forEach(function(server) { | ||
this.servers.forEach((server) => { | ||
if (server.url.protocol === 'tls' || server.url.protocol === 'tls:') { | ||
@@ -388,3 +387,3 @@ options.tls = true; | ||
} | ||
var u = url.parse(host); | ||
const u = url.parse(host); | ||
if (u.port === null || u.port == '') { | ||
@@ -423,24 +422,23 @@ host += ":" + DEFAULT_PORT; | ||
Client.prototype.selectServer = function() { | ||
var client = this; | ||
var server = client.servers.shift(); | ||
const server = this.servers.shift(); | ||
// Place in client context. | ||
client.currentServer = server; | ||
client.url = server.url; | ||
this.currentServer = server; | ||
this.url = server.url; | ||
if ('auth' in server.url && !!server.url.auth) { | ||
var auth = server.url.auth.split(':'); | ||
const auth = server.url.auth.split(':'); | ||
if (auth.length !== 1) { | ||
if (client.options.user === undefined) { | ||
client.user = auth[0]; | ||
if (this.options.user === undefined) { | ||
this.user = auth[0]; | ||
} | ||
if (client.options.pass === undefined) { | ||
client.pass = auth[1]; | ||
if (this.options.pass === undefined) { | ||
this.pass = auth[1]; | ||
} | ||
} else { | ||
if (client.options.token === undefined) { | ||
client.token = auth[0]; | ||
if (this.options.token === undefined) { | ||
this.token = auth[0]; | ||
} | ||
} | ||
} | ||
client.servers.push(server); | ||
this.servers.push(server); | ||
}; | ||
@@ -490,4 +488,4 @@ | ||
Client.prototype.loadUserJWT = function() { | ||
var contents = fs.readFileSync(this.options.usercreds); | ||
var m = CREDS.exec(contents); // jwt | ||
const contents = fs.readFileSync(this.options.usercreds); | ||
const m = CREDS.exec(contents); // jwt | ||
if (m === null) { | ||
@@ -508,6 +506,6 @@ this.emit('error', new NatsError(NO_USER_JWT_IN_CREDS_MSG, NO_USER_JWT_IN_CREDS)); | ||
Client.prototype.loadKeyAndSignNonce = function(nonce) { | ||
var contents = fs.readFileSync(this.options.usercreds); | ||
var re = new RegExp(CREDS, 'g'); | ||
const contents = fs.readFileSync(this.options.usercreds); | ||
const re = new RegExp(CREDS, 'g'); | ||
re.exec(contents); // jwt | ||
var m = re.exec(contents); // seed | ||
const m = re.exec(contents); // seed | ||
if (m === null) { | ||
@@ -518,3 +516,3 @@ this.emit('error', new NatsError(NO_SEED_IN_CREDS_MSG, NO_SEED_IN_CREDS)); | ||
} | ||
var sk = nkeys.fromSeed(Buffer.from(m[1])); | ||
const sk = nkeys.fromSeed(Buffer.from(m[1])); | ||
return sk.sign(nonce); | ||
@@ -555,3 +553,3 @@ }; | ||
// For now we will not capture an error on file not found etc. | ||
var contents = fs.readFileSync(this.options.usercreds); | ||
const contents = fs.readFileSync(this.options.usercreds); | ||
if (CREDS.exec(contents) === null) { | ||
@@ -563,3 +561,3 @@ this.emit('error', new NatsError(BAD_CREDS_MSG, BAD_CREDS)); | ||
// We have a valid file, set up callback handlers. | ||
var client = this; | ||
const client = this; | ||
this.options.sig = function(nonce) { | ||
@@ -599,4 +597,4 @@ return client.loadKeyAndSignNonce(nonce); | ||
Client.prototype.connectCB = function() { | ||
var wasReconnecting = this.reconnecting; | ||
var event = (wasReconnecting === true) ? 'reconnect' : 'connect'; | ||
const wasReconnecting = this.reconnecting; | ||
const event = (wasReconnecting === true) ? 'reconnect' : 'connect'; | ||
this.reconnecting = false; | ||
@@ -612,3 +610,15 @@ this.reconnects = 0; | ||
/** | ||
* @api private | ||
*/ | ||
Client.prototype.cancelHeartbeat = function() { | ||
if (this.pingTimer) { | ||
clearTimeout(this.pingTimer); | ||
delete this.pingTimer; | ||
} | ||
}; | ||
/** | ||
* @api private | ||
*/ | ||
Client.prototype.scheduleHeartbeat = function() { | ||
@@ -650,4 +660,3 @@ this.pingTimer = setTimeout(function(client) { | ||
Client.prototype.setupHandlers = function() { | ||
var client = this; | ||
var stream = client.stream; | ||
const stream = this.stream; | ||
@@ -658,26 +667,26 @@ if (undefined === stream) { | ||
stream.on('connect', function() { | ||
if (client.pingTimer) { | ||
clearTimeout(client.pingTimer); | ||
delete client.pingTimer; | ||
} | ||
client.connected = true; | ||
client.scheduleHeartbeat(); | ||
stream.on('connect', () => { | ||
this.cancelHeartbeat(); | ||
this.connected = true; | ||
this.scheduleHeartbeat(); | ||
}); | ||
stream.on('close', function(hadError) { | ||
client.closeStream(); | ||
client.emit('disconnect'); | ||
if (client.closed === true || | ||
client.options.reconnect === false || | ||
((client.reconnects >= client.options.maxReconnectAttempts) && client.options.maxReconnectAttempts !== -1)) { | ||
client.emit('close'); | ||
stream.on('close', (hadError) => { | ||
this.closeStream(); | ||
if(stream.bytesRead > 0) { | ||
this.emit('disconnect'); | ||
} | ||
if (this.closed === true || | ||
this.options.reconnect === false || | ||
((this.reconnects >= this.options.maxReconnectAttempts) && this.options.maxReconnectAttempts !== -1)) { | ||
this.cleanupTimers(); | ||
this.emit('close'); | ||
} else { | ||
client.scheduleReconnect(); | ||
this.scheduleReconnect(); | ||
} | ||
}); | ||
stream.on('error', function(exception) { | ||
stream.on('error', (exception) => { | ||
// If we were connected just return, close event will process | ||
if (client.wasConnected === true && client.currentServer.didConnect === true) { | ||
if (this.wasConnected === true && this.currentServer.didConnect === true) { | ||
return; | ||
@@ -688,11 +697,11 @@ } | ||
// general have not connected to any server, remove it from | ||
// this list. Unless overidden | ||
if (client.wasConnected === false && client.currentServer.didConnect === false) { | ||
// this list. Unless overridden | ||
if (this.wasConnected === false && this.currentServer.didConnect === false) { | ||
// We can override this behavior with waitOnFirstConnect, which will | ||
// treat it like a reconnect scenario. | ||
if (client.options.waitOnFirstConnect) { | ||
if (this.options.waitOnFirstConnect) { | ||
// Pretend to move us into a reconnect state. | ||
client.currentServer.didConnect = true; | ||
this.currentServer.didConnect = true; | ||
} else { | ||
client.servers.splice(client.servers.length - 1, 1); | ||
this.servers.splice(this.servers.length - 1, 1); | ||
} | ||
@@ -703,20 +712,20 @@ } | ||
// to the server and we only have one. | ||
if (client.wasConnected === false && client.servers.length === 0) { | ||
client.emit('error', new NatsError(CONN_ERR_MSG_PREFIX + exception, CONN_ERR, exception)); | ||
if (this.wasConnected === false && this.servers.length === 0) { | ||
this.emit('error', new NatsError(CONN_ERR_MSG_PREFIX + exception, CONN_ERR, exception)); | ||
} | ||
client.closeStream(); | ||
this.closeStream(); | ||
}); | ||
stream.on('data', function(data) { | ||
stream.on('data', (data) => { | ||
// If inbound exists, concat them together. We try to avoid this for split | ||
// messages, so this should only really happen for a split control line. | ||
// Long term answer is hand rolled parser and not regexp. | ||
if (client.inbound) { | ||
client.inbound = Buffer.concat([client.inbound, data]); | ||
if (this.inbound) { | ||
this.inbound = Buffer.concat([this.inbound, data]); | ||
} else { | ||
client.inbound = data; | ||
this.inbound = data; | ||
} | ||
// Process the inbound queue. | ||
client.processInbound(); | ||
this.processInbound(); | ||
}); | ||
@@ -733,3 +742,3 @@ }; | ||
// Queue the connect command. | ||
var cs = { | ||
const cs = { | ||
'lang': 'node', | ||
@@ -742,3 +751,3 @@ 'version': VERSION, | ||
if (this.info.nonce !== undefined && this.options.sig !== undefined) { | ||
var sig = this.options.sig(Buffer.from(this.info.nonce)); | ||
const sig = this.options.sig(Buffer.from(this.info.nonce)); | ||
cs.sig = sig.toString('base64'); | ||
@@ -792,13 +801,12 @@ } | ||
// wanted to ensure their messages reach the server. | ||
var pong = []; | ||
var pend = []; | ||
var pSize = 0; | ||
var client = this; | ||
if (client.pending !== null) { | ||
var pongIndex = 0; | ||
client.pending.forEach(function(cmd) { | ||
var cmdLen = Buffer.isBuffer(cmd) ? cmd.length : Buffer.byteLength(cmd); | ||
if (cmd === PING_REQUEST && client.pongs !== null && pongIndex < client.pongs.length) { | ||
const pong = []; | ||
const pend = []; | ||
let pSize = 0; | ||
if (this.pending !== null) { | ||
let pongIndex = 0; | ||
this.pending.forEach((cmd) => { | ||
const cmdLen = Buffer.isBuffer(cmd) ? cmd.length : Buffer.byteLength(cmd); | ||
if (cmd === PING_REQUEST && this.pongs !== null && pongIndex < this.pongs.length) { | ||
// filter out any useless ping requests (no pong callback, nop flush) | ||
var p = client.pongs[pongIndex++]; | ||
const p = this.pongs[pongIndex++]; | ||
if (p !== undefined) { | ||
@@ -865,6 +873,3 @@ pend.push(cmd); | ||
Client.prototype.close = function() { | ||
if (this.pingTimer) { | ||
clearTimeout(this.pingTimer); | ||
delete this.pingTimer; | ||
} | ||
this.cleanupTimers(); | ||
this.closed = true; | ||
@@ -882,2 +887,30 @@ this.removeAllListeners(); | ||
/** | ||
* Cancels all the timers, ping, subs, requests. | ||
* Should only be called on a close. | ||
* @api private | ||
*/ | ||
Client.prototype.cleanupTimers = function() { | ||
this.cancelHeartbeat(); | ||
if(this.respmux && this.respmux.requestMap) { | ||
for (const p in this.respmux.requestMap) { | ||
if (this.respmux.requestMap.hasOwnProperty(p)) { | ||
this.cancelMuxRequest(p); | ||
} | ||
} | ||
} | ||
if(this.subs) { | ||
for (const p in this.subs) { | ||
if (this.subs.hasOwnProperty(p)) { | ||
const sub = this.subs[p]; | ||
if(sub.timeout) { | ||
clearTimeout(sub.timeout); | ||
delete sub.timeout; | ||
} | ||
} | ||
} | ||
} | ||
}; | ||
/** | ||
* Close down the stream and clear state. | ||
@@ -915,7 +948,6 @@ * | ||
var client = this; | ||
var write = function(data) { | ||
client.pending = []; | ||
client.pSize = 0; | ||
return client.stream.write(data); | ||
const write = (data) => { | ||
this.pending = []; | ||
this.pSize = 0; | ||
return this.stream.write(data); | ||
}; | ||
@@ -927,4 +959,4 @@ if (!this.pBufs) { | ||
// We have some or all Buffers. Figure out if we can optimize. | ||
var allBufs = true; | ||
for (var i = 0; i < this.pending.length; i++) { | ||
let allBufs = true; | ||
for (let i = 0; i < this.pending.length; i++) { | ||
if (!Buffer.isBuffer(this.pending[i])) { | ||
@@ -940,7 +972,7 @@ allBufs = false; | ||
// We have a mix, so write each one individually. | ||
var pending = this.pending; | ||
const pending = this.pending; | ||
this.pending = []; | ||
this.pSize = 0; | ||
var result = true; | ||
for (i = 0; i < pending.length; i++) { | ||
let result = true; | ||
for (let i = 0; i < pending.length; i++) { | ||
result = this.stream.write(pending[i]) && result; | ||
@@ -960,6 +992,6 @@ } | ||
Client.prototype.stripPendingSubs = function() { | ||
var pending = this.pending; | ||
const pending = this.pending; | ||
this.pending = []; | ||
this.pSize = 0; | ||
for (var i = 0; i < pending.length; i++) { | ||
for (let i = 0; i < pending.length; i++) { | ||
if (!SUBRE.test(pending[i])) { | ||
@@ -996,3 +1028,3 @@ // Re-queue the command. | ||
if (this.pending.length === 1) { | ||
var self = this; | ||
const self = this; | ||
setImmediate(function() { | ||
@@ -1014,7 +1046,7 @@ self.flushPending(); | ||
Client.prototype.sendSubscriptions = function() { | ||
var protos = ""; | ||
for (var sid in this.subs) { | ||
let protos = ""; | ||
for (const sid in this.subs) { | ||
if (this.subs.hasOwnProperty(sid)) { | ||
var sub = this.subs[sid]; | ||
var proto; | ||
const sub = this.subs[sid]; | ||
let proto; | ||
if (sub.qgroup) { | ||
@@ -1039,11 +1071,9 @@ proto = [SUB, sub.subject, sub.qgroup, sid + CR_LF]; | ||
Client.prototype.processInbound = function() { | ||
var client = this; | ||
// Hold any regex matches. | ||
var m; | ||
let m; | ||
// For optional yield | ||
var start; | ||
let start; | ||
if (!client.stream) { | ||
if (!this.stream) { | ||
// if we are here, the stream was reaped and errors raised | ||
@@ -1055,19 +1085,18 @@ // if we continue. | ||
// FIXME(dlc) client.stream.isPaused() causes 0.10 to fail | ||
client.stream.resume(); | ||
this.stream.resume(); | ||
/* jshint -W083 */ | ||
if (client.options.yieldTime !== undefined) { | ||
if (this.options.yieldTime !== undefined) { | ||
start = Date.now(); | ||
} | ||
while (!client.closed && client.inbound && client.inbound.length > 0) { | ||
switch (client.pstate) { | ||
case AWAITING_CONTROL: | ||
while (!this.closed && this.inbound && this.inbound.length > 0) { | ||
switch (this.pstate) { | ||
case AWAITING_CONTROL: { | ||
// Regex only works on strings, so convert once to be more efficient. | ||
// Long term answer is a hand rolled parser, not regex. | ||
var buf = client.inbound.toString('binary', 0, MAX_CONTROL_LINE_SIZE); | ||
const buf = this.inbound.toString('binary', 0, MAX_CONTROL_LINE_SIZE); | ||
if ((m = MSG.exec(buf)) !== null) { | ||
client.payload = { | ||
this.payload = { | ||
subj: m[1], | ||
@@ -1078,12 +1107,12 @@ sid: parseInt(m[2], 10), | ||
}; | ||
client.payload.psize = client.payload.size + CR_LF_LEN; | ||
client.pstate = AWAITING_MSG_PAYLOAD; | ||
this.payload.psize = this.payload.size + CR_LF_LEN; | ||
this.pstate = AWAITING_MSG_PAYLOAD; | ||
} else if ((m = OK.exec(buf)) !== null) { | ||
// Ignore for now.. | ||
} else if ((m = ERR.exec(buf)) !== null) { | ||
client.processErr(m[1]); | ||
this.processErr(m[1]); | ||
return; | ||
} else if ((m = PONG.exec(buf)) !== null) { | ||
client.pout = 0; | ||
var cb = client.pongs && client.pongs.shift(); | ||
this.pout = 0; | ||
const cb = this.pongs && this.pongs.shift(); | ||
if (cb) { | ||
@@ -1093,15 +1122,15 @@ cb(); | ||
} else if ((m = PING.exec(buf)) !== null) { | ||
client.sendCommand(PONG_RESPONSE); | ||
this.sendCommand(PONG_RESPONSE); | ||
} else if ((m = INFO.exec(buf)) !== null) { | ||
client.info = JSON.parse(m[1]); | ||
this.info = JSON.parse(m[1]); | ||
// Always try to read the connect_urls from info | ||
client.processServerUpdate(); | ||
this.processServerUpdate(); | ||
// Process first INFO | ||
if (client.infoReceived === false) { | ||
if (this.infoReceived === false) { | ||
// Check on TLS mismatch. | ||
if (client.checkTLSMismatch() === true) { | ||
if (this.checkTLSMismatch() === true) { | ||
return; | ||
} | ||
if (client.checkNkeyMismatch() === true) { | ||
if (this.checkNkeyMismatch() === true) { | ||
return; | ||
@@ -1111,34 +1140,34 @@ } | ||
// Switch over to TLS as needed. | ||
if (client.info.tls_required === true) { | ||
var tlsOpts = { | ||
socket: client.stream | ||
if (this.info.tls_required === true) { | ||
const tlsOpts = { | ||
socket: this.stream | ||
}; | ||
if ('object' === typeof client.options.tls) { | ||
for (var key in client.options.tls) { | ||
tlsOpts[key] = client.options.tls[key]; | ||
if ('object' === typeof this.options.tls) { | ||
for (const key in this.options.tls) { | ||
tlsOpts[key] = this.options.tls[key]; | ||
} | ||
} | ||
// if we have a stream, this is from an old connection, reap it | ||
if (client.stream) { | ||
client.stream.removeAllListeners(); | ||
if (this.stream) { | ||
this.stream.removeAllListeners(); | ||
} | ||
client.stream = tls.connect(tlsOpts, function() { | ||
client.flushPending(); | ||
this.stream = tls.connect(tlsOpts, () => { | ||
this.flushPending(); | ||
}); | ||
client.setupHandlers(); | ||
this.setupHandlers(); | ||
} | ||
// Send the connect message and subscriptions immediately | ||
client.sendConnect(); | ||
client.sendSubscriptions(); | ||
this.sendConnect(); | ||
this.sendSubscriptions(); | ||
client.pongs.unshift(function() { | ||
client.connectCB(); | ||
this.pongs.unshift(() => { | ||
this.connectCB(); | ||
}); | ||
client.stream.write(PING_REQUEST); | ||
this.stream.write(PING_REQUEST); | ||
// Mark as received | ||
client.infoReceived = true; | ||
client.stripPendingSubs(); | ||
client.flushPending(); | ||
this.infoReceived = true; | ||
this.stripPendingSubs(); | ||
this.flushPending(); | ||
} | ||
@@ -1151,4 +1180,5 @@ } else { | ||
break; | ||
} | ||
case AWAITING_MSG_PAYLOAD: | ||
case AWAITING_MSG_PAYLOAD: { | ||
@@ -1160,9 +1190,9 @@ // If we do not have the complete message, hold onto the chunks | ||
// simple concat above. | ||
if (client.inbound.length < client.payload.psize) { | ||
if (undefined === client.payload.chunks) { | ||
client.payload.chunks = []; | ||
if (this.inbound.length < this.payload.psize) { | ||
if (undefined === this.payload.chunks) { | ||
this.payload.chunks = []; | ||
} | ||
client.payload.chunks.push(client.inbound); | ||
client.payload.psize -= client.inbound.length; | ||
client.inbound = null; | ||
this.payload.chunks.push(this.inbound); | ||
this.payload.psize -= this.inbound.length; | ||
this.inbound = null; | ||
return; | ||
@@ -1173,12 +1203,12 @@ } | ||
// Check to see if we have existing chunks | ||
if (client.payload.chunks) { | ||
if (this.payload.chunks) { | ||
client.payload.chunks.push(client.inbound.slice(0, client.payload.psize)); | ||
this.payload.chunks.push(this.inbound.slice(0, this.payload.psize)); | ||
// don't append trailing control characters | ||
var mbuf = Buffer.concat(client.payload.chunks, client.payload.size); | ||
const mbuf = Buffer.concat(this.payload.chunks, this.payload.size); | ||
if (client.options.preserveBuffers) { | ||
client.payload.msg = mbuf; | ||
if (this.options.preserveBuffers) { | ||
this.payload.msg = mbuf; | ||
} else { | ||
client.payload.msg = mbuf.toString(client.encoding); | ||
this.payload.msg = mbuf.toString(this.encoding); | ||
} | ||
@@ -1188,6 +1218,6 @@ | ||
if (client.options.preserveBuffers) { | ||
client.payload.msg = client.inbound.slice(0, client.payload.size); | ||
if (this.options.preserveBuffers) { | ||
this.payload.msg = this.inbound.slice(0, this.payload.size); | ||
} else { | ||
client.payload.msg = client.inbound.toString(client.encoding, 0, client.payload.size); | ||
this.payload.msg = this.inbound.toString(this.encoding, 0, this.payload.size); | ||
} | ||
@@ -1198,20 +1228,20 @@ | ||
// Eat the size of the inbound that represents the message. | ||
if (client.inbound.length === client.payload.psize) { | ||
client.inbound = null; | ||
if (this.inbound.length === this.payload.psize) { | ||
this.inbound = null; | ||
} else { | ||
client.inbound = client.inbound.slice(client.payload.psize); | ||
this.inbound = this.inbound.slice(this.payload.psize); | ||
} | ||
// process the message | ||
client.processMsg(); | ||
this.processMsg(); | ||
// Reset | ||
client.pstate = AWAITING_CONTROL; | ||
client.payload = null; | ||
this.pstate = AWAITING_CONTROL; | ||
this.payload = null; | ||
// Check to see if we have an option to yield for other events after yieldTime. | ||
if (start !== undefined) { | ||
if ((Date.now() - start) > client.options.yieldTime) { | ||
client.stream.pause(); | ||
setImmediate(client.processInbound.bind(this)); | ||
if ((Date.now() - start) > this.options.yieldTime) { | ||
this.stream.pause(); | ||
setImmediate(this.processInbound.bind(this)); | ||
return; | ||
@@ -1221,2 +1251,3 @@ } | ||
break; | ||
} | ||
} | ||
@@ -1227,7 +1258,7 @@ | ||
// Chop inbound | ||
var psize = m[0].length; | ||
if (psize >= client.inbound.length) { | ||
client.inbound = null; | ||
const psize = m[0].length; | ||
if (psize >= this.inbound.length) { | ||
this.inbound = null; | ||
} else { | ||
client.inbound = client.inbound.slice(psize); | ||
this.inbound = this.inbound.slice(psize); | ||
} | ||
@@ -1244,9 +1275,8 @@ } | ||
Client.prototype.processServerUpdate = function() { | ||
var client = this; | ||
if (client.info.connect_urls && client.info.connect_urls.length > 0) { | ||
if (this.info.connect_urls && this.info.connect_urls.length > 0) { | ||
// parse the infos | ||
var tmp = {}; | ||
client.info.connect_urls.forEach(function(server) { | ||
var u = 'nats://' + server; | ||
var s = new Server(url.parse(u)); | ||
const tmp = {}; | ||
this.info.connect_urls.forEach((server) => { | ||
const u = 'nats://' + server; | ||
const s = new Server(url.parse(u)); | ||
// implicit servers are ones added via the info connect_urls | ||
@@ -1258,6 +1288,6 @@ s.implicit = true; | ||
// remove implicit servers that are no longer reported | ||
var toDelete = []; | ||
client.servers.forEach(function(s, index) { | ||
var u = s.url.href; | ||
if (s.implicit && client.currentServer.url.href !== u && tmp[u] === undefined) { | ||
const toDelete = []; | ||
this.servers.forEach((s, index) => { | ||
const u = s.url.href; | ||
if (s.implicit && this.currentServer.url.href !== u && tmp[u] === undefined) { | ||
// server was removed | ||
@@ -1272,11 +1302,11 @@ toDelete.push(index); | ||
toDelete.reverse(); | ||
toDelete.forEach(function(index) { | ||
client.servers.splice(index, 1); | ||
toDelete.forEach((index) => { | ||
this.servers.splice(index, 1); | ||
}); | ||
// remaining servers are new | ||
var newURLs = []; | ||
for (var k in tmp) { | ||
const newURLs = []; | ||
for (const k in tmp) { | ||
if (tmp.hasOwnProperty(k)) { | ||
client.servers.push(tmp[k]); | ||
this.servers.push(tmp[k]); | ||
newURLs.push(k); | ||
@@ -1288,5 +1318,5 @@ } | ||
// new reported servers useful for tests | ||
client.emit('serversDiscovered', newURLs); | ||
this.emit('serversDiscovered', newURLs); | ||
// simpler version | ||
client.emit('servers', newURLs); | ||
this.emit('servers', newURLs); | ||
} | ||
@@ -1302,3 +1332,3 @@ } | ||
Client.prototype.processMsg = function() { | ||
var sub = this.subs[this.payload.sid]; | ||
const sub = this.subs[this.payload.sid]; | ||
if (sub !== undefined) { | ||
@@ -1326,3 +1356,3 @@ sub.received += 1; | ||
if (sub.callback) { | ||
var msg = this.payload.msg; | ||
let msg = this.payload.msg; | ||
if (this.options.json) { | ||
@@ -1356,3 +1386,3 @@ try { | ||
// except stale connection and permission errors | ||
var m = s ? s.toLowerCase() : ''; | ||
const m = s ? s.toLowerCase() : ''; | ||
if (m.indexOf(STALE_CONNECTION_ERR) !== -1) { | ||
@@ -1455,3 +1485,3 @@ // closeStream() triggers a reconnect if allowed | ||
// Hold PUB SUB [REPLY] | ||
var psub; | ||
let psub; | ||
if (opt_reply === undefined) { | ||
@@ -1465,3 +1495,3 @@ psub = 'PUB ' + subject + SPC; | ||
if (!Buffer.isBuffer(msg)) { | ||
var str = msg; | ||
let str = msg; | ||
if (this.options.json) { | ||
@@ -1476,4 +1506,4 @@ try { | ||
} else { | ||
var b = Buffer.allocUnsafe(psub.length + msg.length + (2 * CR_LF_LEN) + msg.length.toString().length); | ||
var len = b.write(psub + msg.length + CR_LF); | ||
const b = Buffer.allocUnsafe(psub.length + msg.length + (2 * CR_LF_LEN) + msg.length.toString().length); | ||
const len = b.write(psub + msg.length + CR_LF); | ||
msg.copy(b, len); | ||
@@ -1505,3 +1535,3 @@ b.write(CR_LF, len + msg.length); | ||
} | ||
var qgroup, max; | ||
let qgroup, max; | ||
if (typeof opts === 'function') { | ||
@@ -1522,3 +1552,3 @@ callback = opts; | ||
var proto; | ||
let proto; | ||
if (typeof qgroup === 'string') { | ||
@@ -1562,3 +1592,3 @@ this.subs[this.ssid].qgroup = qgroup; | ||
var proto; | ||
let proto; | ||
if (opt_max) { | ||
@@ -1571,3 +1601,3 @@ proto = [UNSUB, sid, opt_max + CR_LF]; | ||
var sub = this.subs[sid]; | ||
const sub = this.subs[sid]; | ||
if (sub === undefined) { | ||
@@ -1605,6 +1635,6 @@ return; | ||
} | ||
var sub = null; | ||
let sub = null; | ||
// check the sid is not a mux sid - which is always negative | ||
if (sid < 0) { | ||
var conf = this.getMuxRequestConfig(sid); | ||
const conf = this.getMuxRequestConfig(sid); | ||
if (conf && conf.timeout) { | ||
@@ -1621,7 +1651,6 @@ // clear auto-set timeout | ||
sub.expected = expected; | ||
var that = this; | ||
sub.timeout = setTimeout(function() { | ||
sub.timeout = setTimeout(() => { | ||
callback(sid); | ||
// if callback fails unsubscribe will leak | ||
that.unsubscribe(sid); | ||
this.unsubscribe(sid); | ||
}, timeout); | ||
@@ -1668,12 +1697,11 @@ } | ||
opt_options = opt_options || {}; | ||
var conf = this.initMuxRequestDetails(callback, opt_options.max); | ||
const conf = this.initMuxRequestDetails(callback, opt_options.max); | ||
this.publish(subject, opt_msg, conf.inbox); | ||
if (opt_options.timeout) { | ||
var client = this; | ||
conf.timeout = setTimeout(function() { | ||
conf.timeout = setTimeout(() => { | ||
if (conf.callback) { | ||
conf.callback(new NatsError(REQ_TIMEOUT_MSG_PREFIX + conf.id, REQ_TIMEOUT)); | ||
} | ||
client.cancelMuxRequest(conf.token); | ||
this.cancelMuxRequest(conf.token); | ||
}, opt_options.timeout); | ||
@@ -1700,4 +1728,4 @@ } | ||
} | ||
var inbox = createInbox(); | ||
var s = this.subscribe(inbox, opt_options, function(msg, reply) { | ||
const inbox = this.createInbox(); | ||
const s = this.subscribe(inbox, opt_options, function(msg, reply) { | ||
callback(msg, reply); | ||
@@ -1771,8 +1799,7 @@ }); | ||
if (!this.respmux) { | ||
var client = this; | ||
var inbox = createInbox(); | ||
var ginbox = inbox + ".*"; | ||
var sid = this.subscribe(ginbox, function(msg, reply, subject) { | ||
var token = client.extractToken(subject); | ||
var conf = client.getMuxRequestConfig(token); | ||
const inbox = this.createInbox(); | ||
const ginbox = inbox + ".*"; | ||
const sid = this.subscribe(ginbox, (msg, reply, subject) => { | ||
const token = this.extractToken(subject); | ||
const conf = this.getMuxRequestConfig(token); | ||
if (conf) { | ||
@@ -1785,4 +1812,4 @@ if (conf.callback) { | ||
if (conf.received >= conf.expected) { | ||
client.cancelMuxRequest(token); | ||
client.emit('unsubscribe', sid, subject); | ||
this.cancelMuxRequest(token); | ||
this.emit('unsubscribe', sid, subject); | ||
} | ||
@@ -1808,7 +1835,7 @@ } | ||
Client.prototype.initMuxRequestDetails = function(callback, expected) { | ||
var ginbox = this.createResponseMux(); | ||
var token = nuid.next(); | ||
var inbox = ginbox + '.' + token; | ||
const ginbox = this.createResponseMux(); | ||
const token = nuid.next(); | ||
const inbox = ginbox + '.' + token; | ||
var conf = { | ||
const conf = { | ||
token: token, | ||
@@ -1836,6 +1863,6 @@ callback: callback, | ||
if (typeof token === 'number') { | ||
var entry = null; | ||
for (var p in this.respmux.requestMap) { | ||
let entry = null; | ||
for (const p in this.respmux.requestMap) { | ||
if (this.respmux.requestMap.hasOwnProperty(p)) { | ||
var v = this.respmux.requestMap[p]; | ||
const v = this.respmux.requestMap[p]; | ||
if (v.id === token) { | ||
@@ -1860,3 +1887,3 @@ entry = v; | ||
Client.prototype.cancelMuxRequest = function(token) { | ||
var conf = this.getMuxRequestConfig(token); | ||
const conf = this.getMuxRequestConfig(token); | ||
if (conf) { | ||
@@ -1893,3 +1920,3 @@ if (conf.timeout) { | ||
var sid = this.request(subject, opt_msg, opt_options, callback); | ||
const sid = this.request(subject, opt_msg, opt_options, callback); | ||
this.timeout(sid, timeout, 1, function() { | ||
@@ -1933,5 +1960,4 @@ callback(new NatsError(REQ_TIMEOUT_MSG_PREFIX + sid, REQ_TIMEOUT)); | ||
Client.prototype.scheduleReconnect = function() { | ||
var client = this; | ||
// Just return if no more servers | ||
if (client.servers.length === 0) { | ||
if (this.servers.length === 0) { | ||
return; | ||
@@ -1941,13 +1967,13 @@ } | ||
// for the first time. | ||
if (client.wasConnected === true) { | ||
client.reconnecting = true; | ||
if (this.wasConnected === true) { | ||
this.reconnecting = true; | ||
} | ||
// Only stall if we have connected before. | ||
var wait = 0; | ||
if (client.servers[0].didConnect === true) { | ||
let wait = 0; | ||
if (this.servers[0].didConnect === true) { | ||
wait = this.options.reconnectTimeWait; | ||
} | ||
setTimeout(function() { | ||
client.reconnect(); | ||
setTimeout(() => { | ||
this.reconnect(); | ||
}, wait); | ||
}; |
{ | ||
"name": "nats", | ||
"version": "1.2.2", | ||
"version": "1.2.3", | ||
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", | ||
@@ -33,11 +33,11 @@ "keywords": [ | ||
"depcheck:unused": "dependency-check ./package.json --unused --no-dev lib/*", | ||
"test:unit": "mkdir -p reports/ && NODE_ENV=test multi='spec=- xunit=reports/mocha-xunit.xml' istanbul cover _mocha -- -R mocha-multi --timeout 10000 --slow 750 && istanbul check-coverage", | ||
"test:unit": "mkdir -p reports/ && NODE_ENV=test multi='spec=- xunit=reports/mocha-xunit.xml' nyc mocha --timeout 10000 --slow 750", | ||
"test": "npm run depcheck && npm run depcheck:unused && npm run lint && npm run test:unit", | ||
"coveralls": "npm run cover -- --report lcovonly && cat ./reports/coverage/lcov.info | coveralls", | ||
"cover": "istanbul cover _mocha", | ||
"lint": "if-node-version >=4 eslint ./lib ./examples ./benchmark ./test", | ||
"coveralls": "npm run test && nyc report --reporter=text-lcov | coveralls", | ||
"cover": "nyc report --reporter=html && open coverage/index.html", | ||
"lint": "eslint ./lib ./examples ./benchmark ./test", | ||
"fmt": "js-beautify -n --config crockford.jscsrc -r lib/* test/*.js test/support/*.js examples/* benchmark/*.js" | ||
}, | ||
"engines": { | ||
"node": ">= 4.5.0" | ||
"node": ">= 6.5.0" | ||
}, | ||
@@ -49,2 +49,3 @@ "dependencies": { | ||
"devDependencies": { | ||
"@types/node": "^11.11.6", | ||
"minimist": "^1.2.0", | ||
@@ -54,4 +55,3 @@ "coveralls": "^3.0.2", | ||
"eslint": "^5.10.0", | ||
"if-node-version": "^1.1.1", | ||
"istanbul": "^0.4.5", | ||
"nyc": "^13.3.0", | ||
"js-beautify": "^1.6.12", | ||
@@ -65,3 +65,13 @@ "jshint": "^2.9.6", | ||
}, | ||
"typings": "./index.d.ts" | ||
"typings": "./index.d.ts", | ||
"nyc": { | ||
"include": [ | ||
"lib/**" | ||
], | ||
"exclude": [ | ||
"test/**", | ||
"examples/**", | ||
"benchmark/**" | ||
] | ||
} | ||
} |
@@ -9,2 +9,3 @@ # NATS - Node.js Client | ||
[![npm](https://img.shields.io/npm/v/nats.svg)](https://www.npmjs.com/package/nats) | ||
[![npm](https://img.shields.io/npm/dt/nats.svg)](https://www.npmjs.com/package/nats) | ||
[![npm](https://img.shields.io/npm/dm/nats.svg)](https://www.npmjs.com/package/nats) | ||
@@ -68,2 +69,57 @@ | ||
## JSON | ||
The `json` connect property makes it easier to exchange JSON data with other | ||
clients. | ||
```javascript | ||
var nc = NATS.connect({json: true}); | ||
nc.on('connect', function() { | ||
nc.on('error', function(err) { | ||
console.log(err); | ||
}); | ||
nc.subscribe("greeting", function(msg, reply) { | ||
// msg is a parsed JSON object object | ||
if(msg.name && msg.reply) { | ||
nc.publish(reply, {greeting: "hello " + msg.name}); | ||
} | ||
}); | ||
// As with all inputs from unknown sources, if you don't trust the data | ||
// you should verify it prior to accessing it. While JSON is safe because | ||
// it doesn't export functions, it is still possible for a client to | ||
// cause issues to a downstream consumer that is not written carefully | ||
nc.subscribe("unsafe", function(msg) { | ||
// for example a client could inject a bogus `toString` property | ||
// which could cause your client to crash should you try to | ||
// concatenation with the `+` like this: | ||
// console.log("received", msg + "here"); | ||
// `TypeError: Cannot convert object to primitive value` | ||
// Note that simple `console.log(msg)` is fine. | ||
if (msg.hasOwnProperty('toString')) { | ||
console.log('tricky - trying to crash me:', msg.toString); | ||
return; | ||
} | ||
// of course this is no different than using a value that is | ||
// expected in one format (say a number), but the client provides | ||
// a string: | ||
if (isNaN(msg.amount) === false) { | ||
// do something with the number | ||
} | ||
//... | ||
}); | ||
// the bad guy | ||
nc.publish("unsafe", {toString: "no good"}); | ||
nc.flush(function() { | ||
nc.close(); | ||
}); | ||
}); | ||
``` | ||
## Wildcard Subscriptions | ||
@@ -158,3 +214,3 @@ | ||
// each time that NATS connects. | ||
var nc = NATS.connect('connect.ngs.global', NATS.creds("./myid.creds"); | ||
var nc = NATS.connect('connect.ngs.global', NATS.creds("./myid.creds")); | ||
@@ -161,0 +217,0 @@ // Setting nkey and signing callback directly. |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
91992
1907
396