Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

nats

Package Overview
Dependencies
Maintainers
2
Versions
195
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nats - npm Package Compare versions

Comparing version 0.7.20 to 0.7.24

19

index.d.ts
import events = require('events');
import tls = require('tls');

@@ -9,13 +10,17 @@ export const version: string;

*/
export const BAD_SUBJECT: string;
export const BAD_AUTHENTICATION: string;
export const BAD_JSON: string;
export const BAD_MSG: string;
export const BAD_REPLY: string;
export const BAD_SUBJECT: string;
export const CLIENT_CERT_REQ: string;
export const CONN_CLOSED: string;
export const BAD_JSON: string;
export const BAD_AUTHENTICATION: string;
export const CONN_ERR: string;
export const INVALID_ENCODING: string;
export const NATS_PROTOCOL_ERR: string;
export const NON_SECURE_CONN_REQ: string;
export const PERMISSIONS_ERR: string;
export const REQ_TIMEOUT: string;
export const SECURE_CONN_REQ: string;
export const NON_SECURE_CONN_REQ: string;
export const CLIENT_CERT_REQ: string;
export const NATS_PROTOCOL_ERR: string;
export const STALE_CONNECTION_ERR: string;

@@ -60,3 +65,3 @@ /**

encoding?: BufferEncoding,
tls?: boolean,
tls?: boolean | tls.TlsOptions,
name?: string,

@@ -63,0 +68,0 @@ yieldTime?: number,

@@ -14,9 +14,8 @@ /*!

*/
var net = require('net'),
tls = require('tls'),
url = require('url'),
util = require('util'),
var net = require('net'),
tls = require('tls'),
url = require('url'),
util = require('util'),
events = require('events'),
nuid = require('nuid');
nuid = require('nuid');

@@ -26,8 +25,7 @@ /**

*/
var VERSION = '0.7.24',
var VERSION = '0.7.20',
DEFAULT_PORT = 4222,
DEFAULT_PRE = 'nats://localhost:',
DEFAULT_URI = DEFAULT_PRE + DEFAULT_PORT,
DEFAULT_PRE = 'nats://localhost:',
DEFAULT_URI = DEFAULT_PRE + DEFAULT_PORT,

@@ -41,3 +39,3 @@ MAX_CONTROL_LINE_SIZE = 512,

// Reconnect Parameters, 2 sec wait, 10 tries
DEFAULT_RECONNECT_TIME_WAIT = 2*1000,
DEFAULT_RECONNECT_TIME_WAIT = 2 * 1000,
DEFAULT_MAX_RECONNECT_ATTEMPTS = 10,

@@ -48,8 +46,8 @@

MSG = /^MSG\s+([^\s\r\n]+)\s+([^\s\r\n]+)\s+(([^\s\r\n]+)[^\S\r\n]+)?(\d+)\r\n/i,
OK = /^\+OK\s*\r\n/i,
ERR = /^-ERR\s+('.+')?\r\n/i,
PING = /^PING\r\n/i,
PONG = /^PONG\r\n/i,
INFO = /^INFO\s+([^\r\n]+)\r\n/i,
MSG = /^MSG\s+([^\s\r\n]+)\s+([^\s\r\n]+)\s+(([^\s\r\n]+)[^\S\r\n]+)?(\d+)\r\n/i,
OK = /^\+OK\s*\r\n/i,
ERR = /^-ERR\s+('.+')?\r\n/i,
PING = /^PING\r\n/i,
PONG = /^PONG\r\n/i,
INFO = /^INFO\s+([^\r\n]+)\r\n/i,
SUBRE = /^SUB\s+([^\r\n]+)\r\n/i,

@@ -64,13 +62,15 @@

//PUB = 'PUB', // TODO: remove / never used
SUB = 'SUB',
UNSUB = 'UNSUB',
SUB = 'SUB',
UNSUB = 'UNSUB',
CONNECT = 'CONNECT',
// Responses
PING_REQUEST = 'PING' + CR_LF,
PING_REQUEST = 'PING' + CR_LF,
PONG_RESPONSE = 'PONG' + CR_LF,
// Errors
BAD_SUBJECT = 'BAD_SUBJECT',
BAD_SUBJECT_MSG = 'Subject must be supplied',
BAD_AUTHENTICATION = 'BAD_AUTHENTICATION',
BAD_AUTHENTICATION_MSG = 'User and Token can not both be provided',
BAD_JSON = 'BAD_JSON',
BAD_JSON_MSG = 'Message should be a JSON object',
BAD_MSG = 'BAD_MSG',

@@ -80,23 +80,21 @@ BAD_MSG_MSG = 'Message can\'t be a function',

BAD_REPLY_MSG = 'Reply can\'t be a function',
BAD_SUBJECT = 'BAD_SUBJECT',
BAD_SUBJECT_MSG = 'Subject must be supplied',
CLIENT_CERT_REQ = 'CLIENT_CERT_REQ',
CLIENT_CERT_REQ_MSG = 'Server requires a client certificate.',
CONN_CLOSED = 'CONN_CLOSED',
CONN_CLOSED_MSG = 'Connection closed',
BAD_JSON = 'BAD_JSON',
BAD_JSON_MSG = 'Message should be a JSON object',
BAD_AUTHENTICATION = 'BAD_AUTHENTICATION',
BAD_AUTHENTICATION_MSG = 'User and Token can not both be provided',
CONN_ERR = 'CONN_ERR',
CONN_ERR_MSG_PREFIX = 'Could not connect to server: ',
INVALID_ENCODING = 'INVALID_ENCODING',
INVALID_ENCODING_MSG_PREFIX = 'Invalid Encoding:',
SECURE_CONN_REQ = 'SECURE_CONN_REQ',
SECURE_CONN_REQ_MSG = 'Server requires a secure connection.',
NATS_PROTOCOL_ERR = 'NATS_PROTOCOL_ERR',
NON_SECURE_CONN_REQ = 'NON_SECURE_CONN_REQ',
NON_SECURE_CONN_REQ_MSG = 'Server does not support a secure connection.',
CLIENT_CERT_REQ = 'CLIENT_CERT_REQ',
CLIENT_CERT_REQ_MSG = 'Server requires a client certificate.',
CONN_ERR = 'CONN_ERR',
CONN_ERR_MSG_PREFIX = 'Could not connect to server: ',
NATS_PROTOCOL_ERR = 'NATS_PROTOCOL_ERR',
PERMISSIONS_ERR = "permissions violation",
REQ_TIMEOUT = 'REQ_TIMEOUT',
REQ_TIMEOUT_MSG_PREFIX = 'The request timed out for subscription id: ',
SECURE_CONN_REQ = 'SECURE_CONN_REQ',
SECURE_CONN_REQ_MSG = 'Server requires a secure connection.',
STALE_CONNECTION_ERR = "stale connection",
PERMISSIONS_ERR = "permissions violation",

@@ -110,3 +108,3 @@ // Pedantic Mode support

function NatsError(message, code, chainedError) {
function NatsError(message, code, chainedError) {
Error.captureStackTrace(this, this.constructor);

@@ -117,6 +115,6 @@ this.name = this.constructor.name;

this.chainedError = chainedError;
}
}
util.inherits(NatsError, Error);
exports.NatsError = NatsError;
util.inherits(NatsError, Error);
exports.NatsError = NatsError;

@@ -149,6 +147,5 @@ /**

* @api public
*/
*/
var createInbox = exports.createInbox = function() {
return ("_INBOX." + nuid.next());
return ("_INBOX." + nuid.next());
};

@@ -162,8 +159,7 @@

*/
function Client(opts) {
events.EventEmitter.call(this);
this.parseOptions(opts);
this.initState();
this.createConnection();
events.EventEmitter.call(this);
this.parseOptions(opts);
this.initState();
this.createConnection();
}

@@ -180,5 +176,4 @@

*/
exports.connect = function(opts) {
return new Client(opts);
return new Client(opts);
};

@@ -189,3 +184,2 @@

*/
util.inherits(Client, events.EventEmitter);

@@ -198,22 +192,21 @@

*/
Client.prototype.createInbox = createInbox;
Client.prototype.assignOption = function(opts, prop, assign) {
if (assign === undefined) {
assign = prop;
}
if (opts[prop] !== undefined) {
this.options[assign] = opts[prop];
}
if (assign === undefined) {
assign = prop;
}
if (opts[prop] !== undefined) {
this.options[assign] = opts[prop];
}
};
function shuffle(array) {
for (var i = array.length - 1; i > 0; i--) {
var j = Math.floor(Math.random() * (i + 1));
var temp = array[i];
array[i] = array[j];
array[j] = temp;
}
return array;
for (var i = array.length - 1; i > 0; i--) {
var j = Math.floor(Math.random() * (i + 1));
var temp = array[i];
array[i] = array[j];
array[j] = temp;
}
return array;
}

@@ -227,91 +220,90 @@

*/
Client.prototype.parseOptions = function(opts) {
var options = this.options = {
'verbose' : false,
'pedantic' : false,
'reconnect' : true,
'maxReconnectAttempts' : DEFAULT_MAX_RECONNECT_ATTEMPTS,
'reconnectTimeWait' : DEFAULT_RECONNECT_TIME_WAIT,
'encoding' : 'utf8',
'tls' : false,
'waitOnFirstConnect' : false,
};
var options = this.options = {
'verbose': false,
'pedantic': false,
'reconnect': true,
'maxReconnectAttempts': DEFAULT_MAX_RECONNECT_ATTEMPTS,
'reconnectTimeWait': DEFAULT_RECONNECT_TIME_WAIT,
'encoding': 'utf8',
'tls': false,
'waitOnFirstConnect': false,
};
if (undefined === opts) {
options.url = DEFAULT_URI;
} else if ('number' === typeof opts) {
options.url = DEFAULT_PRE + opts;
} else if ('string' === typeof opts) {
options.url = opts;
} else if ('object' === typeof opts) {
if (opts.port !== undefined) {
options.url = DEFAULT_PRE + opts.port;
if (undefined === opts) {
options.url = DEFAULT_URI;
} else if ('number' === typeof opts) {
options.url = DEFAULT_PRE + opts;
} else if ('string' === typeof opts) {
options.url = opts;
} else if ('object' === typeof opts) {
if (opts.port !== undefined) {
options.url = DEFAULT_PRE + opts.port;
}
// Pull out various options here
this.assignOption(opts, 'url');
this.assignOption(opts, 'uri', 'url');
this.assignOption(opts, 'user');
this.assignOption(opts, 'pass');
this.assignOption(opts, 'token');
this.assignOption(opts, 'password', 'pass');
this.assignOption(opts, 'verbose');
this.assignOption(opts, 'pedantic');
this.assignOption(opts, 'reconnect');
this.assignOption(opts, 'maxReconnectAttempts');
this.assignOption(opts, 'reconnectTimeWait');
this.assignOption(opts, 'servers');
this.assignOption(opts, 'urls', 'servers');
this.assignOption(opts, 'noRandomize');
this.assignOption(opts, 'NoRandomize', 'noRandomize');
this.assignOption(opts, 'dontRandomize', 'noRandomize');
this.assignOption(opts, 'encoding');
this.assignOption(opts, 'tls');
this.assignOption(opts, 'secure', 'tls');
this.assignOption(opts, 'name');
this.assignOption(opts, 'client', 'name');
this.assignOption(opts, 'yieldTime');
this.assignOption(opts, 'waitOnFirstConnect');
this.assignOption(opts, 'json');
this.assignOption(opts, 'preserveBuffers');
}
// Pull out various options here
this.assignOption(opts, 'url');
this.assignOption(opts, 'uri', 'url');
this.assignOption(opts, 'user');
this.assignOption(opts, 'pass');
this.assignOption(opts, 'token');
this.assignOption(opts, 'password', 'pass');
this.assignOption(opts, 'verbose');
this.assignOption(opts, 'pedantic');
this.assignOption(opts, 'reconnect');
this.assignOption(opts, 'maxReconnectAttempts');
this.assignOption(opts, 'reconnectTimeWait');
this.assignOption(opts, 'servers');
this.assignOption(opts, 'urls', 'servers');
this.assignOption(opts, 'noRandomize');
this.assignOption(opts, 'NoRandomize', 'noRandomize');
this.assignOption(opts, 'dontRandomize', 'noRandomize');
this.assignOption(opts, 'encoding');
this.assignOption(opts, 'tls');
this.assignOption(opts, 'secure', 'tls');
this.assignOption(opts, 'name');
this.assignOption(opts, 'client', 'name');
this.assignOption(opts, 'yieldTime');
this.assignOption(opts, 'waitOnFirstConnect');
this.assignOption(opts, 'json');
this.assignOption(opts, 'preserveBuffers');
}
var client = this;
var client = this;
// Set user/pass as needed if in options.
client.user = options.user;
client.pass = options.pass;
// Set token as needed if in options.
client.token = options.token;
// Set user/pass as needed if in options.
client.user = options.user;
client.pass = options.pass;
// Authentication - make sure authentication is valid.
if (client.user && client.token) {
throw(new NatsError(BAD_AUTHENTICATION_MSG, BAD_AUTHENTICATION));
}
// Set token as needed if in options.
client.token = options.token;
// Encoding - make sure its valid.
if (Buffer.isEncoding(options.encoding)) {
client.encoding = options.encoding;
} else {
throw new NatsError(INVALID_ENCODING_MSG_PREFIX + options.encoding, INVALID_ENCODING);
}
// For cluster support
client.servers = [];
// Authentication - make sure authentication is valid.
if (client.user && client.token) {
throw (new NatsError(BAD_AUTHENTICATION_MSG, BAD_AUTHENTICATION));
}
if (Array.isArray(options.servers)) {
options.servers.forEach(function(server) {
client.servers.push(new Server(url.parse(server)));
});
} else {
if (undefined === options.url) {
options.url = DEFAULT_URI;
// Encoding - make sure its valid.
if (Buffer.isEncoding(options.encoding)) {
client.encoding = options.encoding;
} else {
throw new NatsError(INVALID_ENCODING_MSG_PREFIX + options.encoding, INVALID_ENCODING);
}
client.servers.push(new Server(url.parse(options.url)));
}
// For cluster support
client.servers = [];
// Randomize if needed
if (options.noRandomize !== true) {
shuffle(client.servers);
}
if (Array.isArray(options.servers)) {
options.servers.forEach(function(server) {
client.servers.push(new Server(url.parse(server)));
});
} else {
if (undefined === options.url) {
options.url = DEFAULT_URI;
}
client.servers.push(new Server(url.parse(options.url)));
}
// Randomize if needed
if (options.noRandomize !== true) {
shuffle(client.servers);
}
};

@@ -323,8 +315,7 @@

* @api private
*/
*/
function Server(url) {
this.url = url;
this.didConnect = false;
this.reconnects = 0;
this.url = url;
this.didConnect = false;
this.reconnects = 0;
}

@@ -339,27 +330,26 @@

* @api private
*/
*/
Client.prototype.selectServer = function() {
var client = this;
var server = client.servers.shift();
var client = this;
var server = client.servers.shift();
// Place in client context.
client.currentServer = server;
client.url = server.url;
if ('auth' in server.url && !!server.url.auth) {
var auth = server.url.auth.split(':');
if (auth.length !== 1) {
if (client.options.user === undefined) {
client.user = auth[0];
}
if (client.options.pass === undefined) {
client.pass = auth[1];
}
} else {
if (client.options.token === undefined) {
client.token = auth[0];
}
// Place in client context.
client.currentServer = server;
client.url = server.url;
if ('auth' in server.url && !!server.url.auth) {
var auth = server.url.auth.split(':');
if (auth.length !== 1) {
if (client.options.user === undefined) {
client.user = auth[0];
}
if (client.options.pass === undefined) {
client.pass = auth[1];
}
} else {
if (client.options.token === undefined) {
client.token = auth[0];
}
}
}
}
client.servers.push(server);
client.servers.push(server);
};

@@ -371,26 +361,25 @@

* @api private
*/
*/
Client.prototype.checkTLSMismatch = function() {
if (this.info.tls_required === true &&
this.options.tls === false) {
this.emit('error', new NatsError(SECURE_CONN_REQ_MSG, SECURE_CONN_REQ));
this.closeStream();
return true;
}
if (this.info.tls_required === true &&
this.options.tls === false) {
this.emit('error', new NatsError(SECURE_CONN_REQ_MSG, SECURE_CONN_REQ));
this.closeStream();
return true;
}
if (this.info.tls_required === false &&
this.options.tls !== false) {
this.emit('error', new NatsError(NON_SECURE_CONN_REQ_MSG, NON_SECURE_CONN_REQ));
this.closeStream();
return true;
}
if (this.info.tls_required === false &&
this.options.tls !== false) {
this.emit('error', new NatsError(NON_SECURE_CONN_REQ_MSG, NON_SECURE_CONN_REQ));
this.closeStream();
return true;
}
if (this.info.tls_verify === true &&
this.options.tls.cert === undefined) {
this.emit('error', new NatsError(CLIENT_CERT_REQ_MSG, CLIENT_CERT_REQ));
this.closeStream();
return true;
}
return false;
if (this.info.tls_verify === true &&
this.options.tls.cert === undefined) {
this.emit('error', new NatsError(CLIENT_CERT_REQ_MSG, CLIENT_CERT_REQ));
this.closeStream();
return true;
}
return false;
};

@@ -402,15 +391,14 @@

* @api private
*/
*/
Client.prototype.connectCB = function() {
var wasReconnecting = this.reconnecting;
var event = (wasReconnecting === true) ? 'reconnect' : 'connect';
this.reconnecting = false;
this.reconnects = 0;
this.wasConnected = true;
this.currentServer.didConnect = true;
var wasReconnecting = this.reconnecting;
var event = (wasReconnecting === true) ? 'reconnect' : 'connect';
this.reconnecting = false;
this.reconnects = 0;
this.wasConnected = true;
this.currentServer.didConnect = true;
this.emit(event, this);
this.emit(event, this);
this.flushPending();
this.flushPending();
};

@@ -423,69 +411,68 @@

* @api private
*/
*/
Client.prototype.setupHandlers = function() {
var client = this;
var stream = client.stream;
var client = this;
var stream = client.stream;
if (undefined === stream) {
return;
}
if (undefined === stream) {
return;
}
stream.on('connect', function() {
client.connected = true;
});
stream.on('connect', function() {
client.connected = true;
});
stream.on('close', function(hadError) {
client.closeStream();
client.emit('disconnect');
if (client.closed === true ||
client.options.reconnect === false ||
((client.reconnects >= client.options.maxReconnectAttempts) && client.options.maxReconnectAttempts !== -1)) {
client.emit('close');
} else {
client.scheduleReconnect();
}
});
stream.on('close', function(hadError) {
client.closeStream();
client.emit('disconnect');
if (client.closed === true ||
client.options.reconnect === false ||
((client.reconnects >= client.options.maxReconnectAttempts) && client.options.maxReconnectAttempts !== -1)) {
client.emit('close');
} else {
client.scheduleReconnect();
}
});
stream.on('error', function(exception) {
// If we were connected just return, close event will process
if (client.wasConnected === true && client.currentServer.didConnect === true) {
return;
}
stream.on('error', function(exception) {
// If we were connected just return, close event will process
if (client.wasConnected === true && client.currentServer.didConnect === true) {
return;
}
// if the current server did not connect at all, and we in
// general have not connected to any server, remove it from
// this list. Unless overidden
if (client.wasConnected === false && client.currentServer.didConnect === false) {
// We can override this behavior with waitOnFirstConnect, which will
// treat it like a reconnect scenario.
if (client.options.waitOnFirstConnect) {
// Pretend to move us into a reconnect state.
client.currentServer.didConnect = true;
} else {
client.servers.splice(client.servers.length-1, 1);
}
}
// if the current server did not connect at all, and we in
// general have not connected to any server, remove it from
// this list. Unless overidden
if (client.wasConnected === false && client.currentServer.didConnect === false) {
// We can override this behavior with waitOnFirstConnect, which will
// treat it like a reconnect scenario.
if (client.options.waitOnFirstConnect) {
// Pretend to move us into a reconnect state.
client.currentServer.didConnect = true;
} else {
client.servers.splice(client.servers.length - 1, 1);
}
}
// Only bubble up error if we never had connected
// to the server and we only have one.
if (client.wasConnected === false && client.servers.length === 0) {
client.emit('error', new NatsError(CONN_ERR_MSG_PREFIX + exception, CONN_ERR, exception));
}
client.closeStream();
});
// Only bubble up error if we never had connected
// to the server and we only have one.
if (client.wasConnected === false && client.servers.length === 0) {
client.emit('error', new NatsError(CONN_ERR_MSG_PREFIX + exception, CONN_ERR, exception));
}
client.closeStream();
});
stream.on('data', function (data) {
// If inbound exists, concat them together. We try to avoid this for split
// messages, so this should only really happen for a split control line.
// Long term answer is hand rolled parser and not regexp.
if (client.inbound) {
client.inbound = Buffer.concat([client.inbound, data]);
} else {
client.inbound = data;
}
stream.on('data', function(data) {
// If inbound exists, concat them together. We try to avoid this for split
// messages, so this should only really happen for a split control line.
// Long term answer is hand rolled parser and not regexp.
if (client.inbound) {
client.inbound = Buffer.concat([client.inbound, data]);
} else {
client.inbound = data;
}
// Process the inbound queue.
client.processInbound();
});
// Process the inbound queue.
client.processInbound();
});
};

@@ -498,27 +485,26 @@

* @api private
*/
*/
Client.prototype.sendConnect = function() {
// Queue the connect command.
var cs = {
'lang' : 'node',
'version' : VERSION,
'verbose' : this.options.verbose,
'pedantic': this.options.pedantic,
'protocol': 1,
};
if (this.user !== undefined) {
cs.user = this.user;
cs.pass = this.pass;
}
if (this.token !== undefined) {
cs.auth_token = this.token;
}
if (this.options.name !== undefined) {
cs.name = this.options.name;
}
// If we enqueued requests before we received INFO from the server, or we
// reconnected, there be other data pending, write this immediately instead
// of adding it to the queue.
this.stream.write(CONNECT + SPC + JSON.stringify(cs) + CR_LF);
// Queue the connect command.
var cs = {
'lang': 'node',
'version': VERSION,
'verbose': this.options.verbose,
'pedantic': this.options.pedantic,
'protocol': 1,
};
if (this.user !== undefined) {
cs.user = this.user;
cs.pass = this.pass;
}
if (this.token !== undefined) {
cs.auth_token = this.token;
}
if (this.options.name !== undefined) {
cs.name = this.options.name;
}
// If we enqueued requests before we received INFO from the server, or we
// reconnected, there be other data pending, write this immediately instead
// of adding it to the queue.
this.stream.write(CONNECT + SPC + JSON.stringify(cs) + CR_LF);
};

@@ -530,59 +516,58 @@

* @api private
*/
*/
Client.prototype.createConnection = function() {
// Commands may have been queued during reconnect. Discard everything except:
// 1) ping requests with a pong callback
// 2) publish requests
//
// Rationale: CONNECT and SUBs are written directly upon connecting, any PONG
// response is no longer relevant, and any UNSUB will be accounted for when we
// sync our SUBs. Without this, users of the client may miss state transitions
// via callbacks, would have to track the client's internal connection state,
// and may have to double buffer messages (which we are already doing) if they
// wanted to ensure their messages reach the server.
var pong = [];
var pend = [];
var pSize = 0;
var client = this;
if (client.pending !== null) {
var pongIndex = 0;
client.pending.forEach(function(cmd) {
var cmdLen = Buffer.isBuffer(cmd) ? cmd.length : Buffer.byteLength(cmd);
if (cmd === PING_REQUEST && client.pongs !== null && pongIndex < client.pongs.length) {
// filter out any useless ping requests (no pong callback, nop flush)
var p = client.pongs[pongIndex++];
if (p !== undefined) {
pend.push(cmd);
pSize += cmdLen;
pong.push(p);
}
} else if (cmd.length > 3 && cmd[0] == 'P' && cmd[1] == 'U' && cmd[2] == 'B') {
pend.push(cmd);
pSize += cmdLen;
}
});
}
this.pongs = pong;
this.pending = pend;
this.pSize = pSize;
// Commands may have been queued during reconnect. Discard everything except:
// 1) ping requests with a pong callback
// 2) publish requests
//
// Rationale: CONNECT and SUBs are written directly upon connecting, any PONG
// response is no longer relevant, and any UNSUB will be accounted for when we
// sync our SUBs. Without this, users of the client may miss state transitions
// via callbacks, would have to track the client's internal connection state,
// and may have to double buffer messages (which we are already doing) if they
// wanted to ensure their messages reach the server.
var pong = [];
var pend = [];
var pSize = 0;
var client = this;
if (client.pending !== null) {
var pongIndex = 0;
client.pending.forEach(function(cmd) {
var cmdLen = Buffer.isBuffer(cmd) ? cmd.length : Buffer.byteLength(cmd);
if (cmd === PING_REQUEST && client.pongs !== null && pongIndex < client.pongs.length) {
// filter out any useless ping requests (no pong callback, nop flush)
var p = client.pongs[pongIndex++];
if (p !== undefined) {
pend.push(cmd);
pSize += cmdLen;
pong.push(p);
}
} else if (cmd.length > 3 && cmd[0] === 'P' && cmd[1] === 'U' && cmd[2] === 'B') {
pend.push(cmd);
pSize += cmdLen;
}
});
}
this.pongs = pong;
this.pending = pend;
this.pSize = pSize;
this.pstate = AWAITING_CONTROL;
this.pstate = AWAITING_CONTROL;
// Clear info processing.
this.info = null;
this.infoReceived = false;
// Clear info processing.
this.info = null;
this.infoReceived = false;
// Select a server to connect to.
this.selectServer();
// Select a server to connect to.
this.selectServer();
// See #45 if we have a stream release the listeners
// otherwise in addition to the leak events will fire fire
if(this.stream) {
this.stream.removeAllListeners();
this.stream.end();
}
// otherwise in addition to the leak events will fire fire
if (this.stream) {
this.stream.removeAllListeners();
this.stream.end();
}
// Create the stream
this.stream = net.createConnection(this.url.port, this.url.hostname);
// Setup the proper handlers.
this.setupHandlers();
// Setup the proper handlers.
this.setupHandlers();
};

@@ -595,12 +580,11 @@

*/
Client.prototype.initState = function() {
this.ssid = 1;
this.subs = {};
this.reconnects = 0;
this.connected = false;
this.wasConnected = false;
this.reconnecting = false;
this.server = null;
this.pending = [];
this.ssid = 1;
this.subs = {};
this.reconnects = 0;
this.connected = false;
this.wasConnected = false;
this.reconnecting = false;
this.server = null;
this.pending = [];
};

@@ -613,13 +597,12 @@

*/
Client.prototype.close = function() {
this.closed = true;
this.removeAllListeners();
this.closeStream();
this.ssid = -1;
this.subs = null;
this.pstate = -1;
this.pongs = null;
this.pending = null;
this.pSize = 0;
this.closed = true;
this.removeAllListeners();
this.closeStream();
this.ssid = -1;
this.subs = null;
this.pstate = -1;
this.pongs = null;
this.pending = null;
this.pSize = 0;
};

@@ -632,16 +615,15 @@

*/
Client.prototype.closeStream = function() {
if (this.stream !== null) {
this.stream.end();
this.stream.destroy();
this.stream = null;
}
if (this.connected === true || this.closed === true) {
this.pongs = null;
this.pending = null;
this.pSize = 0;
this.connected = false;
}
this.inbound = null;
if (this.stream !== null) {
this.stream.end();
this.stream.destroy();
this.stream = null;
}
if (this.connected === true || this.closed === true) {
this.pongs = null;
this.pending = null;
this.pSize = 0;
this.connected = false;
}
this.inbound = null;
};

@@ -654,44 +636,43 @@

*/
Client.prototype.flushPending = function() {
if (this.connected === false ||
this.pending === null ||
this.pending.length === 0 ||
this.infoReceived !== true) {
return;
}
if (this.connected === false ||
this.pending === null ||
this.pending.length === 0 ||
this.infoReceived !== true) {
return;
}
var client = this;
var write = function(data) {
client.pending = [];
client.pSize = 0;
return client.stream.write(data);
};
if (!this.pBufs) {
// All strings, fastest for now.
return write(this.pending.join(EMPTY));
} else {
// We have some or all Buffers. Figure out if we can optimize.
var allBufs = true;
for (var i=0; i < this.pending.length; i++){
if (!Buffer.isBuffer(this.pending[i])) {
allBufs = false;
break;
}
}
// If all buffers, concat together and write once.
if (allBufs) {
return write(Buffer.concat(this.pending, this.pSize));
var client = this;
var write = function(data) {
client.pending = [];
client.pSize = 0;
return client.stream.write(data);
};
if (!this.pBufs) {
// All strings, fastest for now.
return write(this.pending.join(EMPTY));
} else {
// We have a mix, so write each one individually.
var pending = this.pending;
this.pending = [];
this.pSize = 0;
var result = true;
for (i=0; i < pending.length; i++){
result = this.stream.write(pending[i]) && result;
}
return result;
// We have some or all Buffers. Figure out if we can optimize.
var allBufs = true;
for (var i = 0; i < this.pending.length; i++) {
if (!Buffer.isBuffer(this.pending[i])) {
allBufs = false;
break;
}
}
// If all buffers, concat together and write once.
if (allBufs) {
return write(Buffer.concat(this.pending, this.pSize));
} else {
// We have a mix, so write each one individually.
var pending = this.pending;
this.pending = [];
this.pSize = 0;
var result = true;
for (i = 0; i < pending.length; i++) {
result = this.stream.write(pending[i]) && result;
}
return result;
}
}
}
};

@@ -705,13 +686,12 @@

*/
Client.prototype.stripPendingSubs = function() {
var pending = this.pending;
this.pending = [];
this.pSize = 0;
for (var i=0; i < pending.length; i++){
if (!SUBRE.test(pending[i])) {
// Re-queue the command.
this.sendCommand(pending[i]);
var pending = this.pending;
this.pending = [];
this.pSize = 0;
for (var i = 0; i < pending.length; i++) {
if (!SUBRE.test(pending[i])) {
// Re-queue the command.
this.sendCommand(pending[i]);
}
}
}
};

@@ -724,29 +704,30 @@

*/
Client.prototype.sendCommand = function(cmd) {
// Buffer to cut down on system calls, increase throughput.
// When receive gets faster, should make this Buffer based..
// Buffer to cut down on system calls, increase throughput.
// When receive gets faster, should make this Buffer based..
if (this.closed || this.pending === null) { return; }
if (this.closed || this.pending === null) {
return;
}
this.pending.push(cmd);
if (!Buffer.isBuffer(cmd)) {
this.pSize += Buffer.byteLength(cmd);
} else {
this.pSize += cmd.length;
this.pBufs = true;
}
this.pending.push(cmd);
if (!Buffer.isBuffer(cmd)) {
this.pSize += Buffer.byteLength(cmd);
} else {
this.pSize += cmd.length;
this.pBufs = true;
}
if (this.connected === true) {
// First one let's setup flush..
if (this.pending.length === 1) {
var self = this;
setImmediate(function() {
self.flushPending();
});
} else if (this.pSize > FLUSH_THRESHOLD) {
// Flush in place when threshold reached..
this.flushPending();
if (this.connected === true) {
// First one let's setup flush..
if (this.pending.length === 1) {
var self = this;
setImmediate(function() {
self.flushPending();
});
} else if (this.pSize > FLUSH_THRESHOLD) {
// Flush in place when threshold reached..
this.flushPending();
}
}
}
};

@@ -759,20 +740,19 @@

*/
Client.prototype.sendSubscriptions = function() {
var protos = "";
for (var sid in this.subs) {
if (this.subs.hasOwnProperty(sid)) {
var sub = this.subs[sid];
var proto;
if (sub.qgroup) {
proto = [SUB, sub.subject, sub.qgroup, sid + CR_LF];
} else {
proto = [SUB, sub.subject, sid + CR_LF];
}
protos += proto.join(SPC);
var protos = "";
for (var sid in this.subs) {
if (this.subs.hasOwnProperty(sid)) {
var sub = this.subs[sid];
var proto;
if (sub.qgroup) {
proto = [SUB, sub.subject, sub.qgroup, sid + CR_LF];
} else {
proto = [SUB, sub.subject, sid + CR_LF];
}
protos += proto.join(SPC);
}
}
}
if (protos.length > 0) {
this.stream.write(protos);
}
if (protos.length > 0) {
this.stream.write(protos);
}
};

@@ -785,205 +765,210 @@

*/
Client.prototype.processInbound = function() {
var client = this;
var client = this;
// Hold any regex matches.
var m;
// Hold any regex matches.
var m;
// For optional yield
var start;
// For optional yield
var start;
if(! client.stream) {
// if we are here, the stream was reaped and errors raised
// if we continue.
return;
}
// unpause if needed.
// FIXME(dlc) client.stream.isPaused() causes 0.10 to fail
client.stream.resume();
if (!client.stream) {
// if we are here, the stream was reaped and errors raised
// if we continue.
return;
}
// unpause if needed.
// FIXME(dlc) client.stream.isPaused() causes 0.10 to fail
client.stream.resume();
/* jshint -W083 */
/* jshint -W083 */
if (client.options.yieldTime !== undefined) {
start = Date.now();
}
if (client.options.yieldTime !== undefined) {
start = Date.now();
}
while (!client.closed && client.inbound && client.inbound.length > 0) {
switch (client.pstate) {
while (!client.closed && client.inbound && client.inbound.length > 0) {
switch (client.pstate) {
case AWAITING_CONTROL:
// Regex only works on strings, so convert once to be more efficient.
// Long term answer is a hand rolled parser, not regex.
var buf = client.inbound.toString('binary', 0, MAX_CONTROL_LINE_SIZE);
if ((m = MSG.exec(buf)) !== null) {
client.payload = {
subj : m[1],
sid : parseInt(m[2], 10),
reply : m[4],
size : parseInt(m[5], 10)
};
client.payload.psize = client.payload.size + CR_LF_LEN;
client.pstate = AWAITING_MSG_PAYLOAD;
} else if ((m = OK.exec(buf)) !== null) {
// Ignore for now..
} else if ((m = ERR.exec(buf)) !== null) {
client.processErr(m[1]);
return;
} else if ((m = PONG.exec(buf)) !== null) {
var cb = client.pongs && client.pongs.shift();
if (cb) { cb(); } // FIXME: Should we check for exceptions?
} else if ((m = PING.exec(buf)) !== null) {
client.sendCommand(PONG_RESPONSE);
} else if ((m = INFO.exec(buf)) !== null) {
client.info = JSON.parse(m[1]);
// Check on TLS mismatch.
if (client.checkTLSMismatch() === true) {
return;
}
case AWAITING_CONTROL:
// Regex only works on strings, so convert once to be more efficient.
// Long term answer is a hand rolled parser, not regex.
var buf = client.inbound.toString('binary', 0, MAX_CONTROL_LINE_SIZE);
if ((m = MSG.exec(buf)) !== null) {
client.payload = {
subj: m[1],
sid: parseInt(m[2], 10),
reply: m[4],
size: parseInt(m[5], 10)
};
client.payload.psize = client.payload.size + CR_LF_LEN;
client.pstate = AWAITING_MSG_PAYLOAD;
} else if ((m = OK.exec(buf)) !== null) {
// Ignore for now..
} else if ((m = ERR.exec(buf)) !== null) {
client.processErr(m[1]);
return;
} else if ((m = PONG.exec(buf)) !== null) {
var cb = client.pongs && client.pongs.shift();
if (cb) {
cb();
} // FIXME: Should we check for exceptions?
} else if ((m = PING.exec(buf)) !== null) {
client.sendCommand(PONG_RESPONSE);
} else if ((m = INFO.exec(buf)) !== null) {
client.info = JSON.parse(m[1]);
// Check on TLS mismatch.
if (client.checkTLSMismatch() === true) {
return;
}
// Always try to read the connect_urls from info
if(client.info.connect_urls && client.info.connect_urls.length > 0) {
// don't add duplicates
var known = [];
client.servers.forEach(function(server) {
known.push(server.url.href);
});
// add new ones
var toAdd = [];
client.info.connect_urls.forEach(function(server) {
var u = 'nats://' + server;
if(known.indexOf(u) === -1) {
toAdd.push(new Server(url.parse(u)));
}
});
// Always try to read the connect_urls from info
if (client.info.connect_urls && client.info.connect_urls.length > 0) {
// don't add duplicates
var known = [];
client.servers.forEach(function(server) {
known.push(server.url.href);
});
// add new ones
var toAdd = [];
client.info.connect_urls.forEach(function(server) {
var u = 'nats://' + server;
if (known.indexOf(u) === -1) {
toAdd.push(new Server(url.parse(u)));
}
});
if(toAdd.length > 0) {
if(client.options.noRandomize !== true) {
shuffle(toAdd);
}
toAdd.forEach(function(s) {
client.servers.push(s);
});
}
}
if (toAdd.length > 0) {
if (client.options.noRandomize !== true) {
shuffle(toAdd);
}
toAdd.forEach(function(s) {
client.servers.push(s);
});
}
}
// Process first INFO
if (client.infoReceived === false) {
// Switch over to TLS as needed.
if (client.options.tls !== false &&
client.stream.encrypted !== true) {
var tlsOpts = {socket: client.stream};
if ('object' === typeof client.options.tls) {
for (var key in client.options.tls) {
tlsOpts[key] = client.options.tls[key];
}
}
// if we have a stream, this is from an old connection, reap it
if(client.stream) {
client.stream.removeAllListeners();
client.stream.end();
}
client.stream = tls.connect(tlsOpts, function() {
client.flushPending();
});
client.setupHandlers();
}
// Process first INFO
if (client.infoReceived === false) {
// Switch over to TLS as needed.
if (client.options.tls !== false &&
client.stream.encrypted !== true) {
var tlsOpts = {
socket: client.stream
};
if ('object' === typeof client.options.tls) {
for (var key in client.options.tls) {
tlsOpts[key] = client.options.tls[key];
}
}
// if we have a stream, this is from an old connection, reap it
if (client.stream) {
client.stream.removeAllListeners();
client.stream.end();
}
client.stream = tls.connect(tlsOpts, function() {
client.flushPending();
});
client.setupHandlers();
}
// Send the connect message and subscriptions immediately
client.sendConnect();
client.sendSubscriptions();
// Send the connect message and subscriptions immediately
client.sendConnect();
client.sendSubscriptions();
client.pongs.unshift(function() { client.connectCB(); });
client.stream.write(PING_REQUEST);
client.pongs.unshift(function() {
client.connectCB();
});
client.stream.write(PING_REQUEST);
// Mark as received
client.infoReceived = true;
client.stripPendingSubs();
client.flushPending();
}
} else {
// FIXME, check line length for something weird.
// Nothing here yet, return
return;
}
break;
// Mark as received
client.infoReceived = true;
client.stripPendingSubs();
client.flushPending();
}
} else {
// FIXME, check line length for something weird.
// Nothing here yet, return
return;
}
break;
case AWAITING_MSG_PAYLOAD:
case AWAITING_MSG_PAYLOAD:
// If we do not have the complete message, hold onto the chunks
// and assemble when we have all we need. This optimizes for
// when we parse a large buffer down to a small number of bytes,
// then we receive a large chunk. This avoids a big copy with a
// simple concat above.
if (client.inbound.length < client.payload.psize) {
if (undefined === client.payload.chunks) {
client.payload.chunks = [];
}
client.payload.chunks.push(client.inbound);
client.payload.psize -= client.inbound.length;
client.inbound = null;
return;
}
// If we do not have the complete message, hold onto the chunks
// and assemble when we have all we need. This optimizes for
// when we parse a large buffer down to a small number of bytes,
// then we receive a large chunk. This avoids a big copy with a
// simple concat above.
if (client.inbound.length < client.payload.psize) {
if (undefined === client.payload.chunks) {
client.payload.chunks = [];
}
client.payload.chunks.push(client.inbound);
client.payload.psize -= client.inbound.length;
client.inbound = null;
return;
}
// If we are here we have the complete message.
// Check to see if we have existing chunks
if (client.payload.chunks) {
// If we are here we have the complete message.
// Check to see if we have existing chunks
if (client.payload.chunks) {
client.payload.chunks.push(client.inbound.slice(0, client.payload.psize));
// don't append trailing control characters
var mbuf = Buffer.concat(client.payload.chunks, client.payload.size);
if (client.options.preserveBuffers) {
client.payload.msg = mbuf;
} else {
client.payload.msg = mbuf.toString(client.encoding);
}
client.payload.chunks.push(client.inbound.slice(0, client.payload.psize));
// don't append trailing control characters
var mbuf = Buffer.concat(client.payload.chunks, client.payload.size);
} else {
if (client.options.preserveBuffers) {
client.payload.msg = mbuf;
} else {
client.payload.msg = mbuf.toString(client.encoding);
}
if (client.options.preserveBuffers) {
client.payload.msg = client.inbound.slice(0, client.payload.size);
} else {
client.payload.msg = client.inbound.toString(client.encoding, 0, client.payload.size);
}
} else {
}
if (client.options.preserveBuffers) {
client.payload.msg = client.inbound.slice(0, client.payload.size);
} else {
client.payload.msg = client.inbound.toString(client.encoding, 0, client.payload.size);
}
// Eat the size of the inbound that represents the message.
if (client.inbound.length === client.payload.psize) {
client.inbound = null;
} else {
client.inbound = client.inbound.slice(client.payload.psize);
}
}
// process the message
client.processMsg();
// Eat the size of the inbound that represents the message.
if (client.inbound.length === client.payload.psize) {
client.inbound = null;
} else {
client.inbound = client.inbound.slice(client.payload.psize);
}
// Reset
client.pstate = AWAITING_CONTROL;
client.payload = null;
// process the message
client.processMsg();
// Check to see if we have an option to yield for other events after yieldTime.
if (start !== undefined) {
if ((Date.now() - start) > client.options.yieldTime) {
client.stream.pause();
setImmediate(client.processInbound.bind(this));
return;
}
}
break;
}
// Reset
client.pstate = AWAITING_CONTROL;
client.payload = null;
// This is applicable for a regex match to eat the bytes we used from a control line.
if (m && !this.closed) {
// Chop inbound
var psize = m[0].length;
if (psize >= client.inbound.length) {
client.inbound = null;
} else {
client.inbound = client.inbound.slice(psize);
}
// Check to see if we have an option to yield for other events after yieldTime.
if (start !== undefined) {
if ((Date.now() - start) > client.options.yieldTime) {
client.stream.pause();
setImmediate(client.processInbound.bind(this));
return;
}
}
break;
}
// This is applicable for a regex match to eat the bytes we used from a control line.
if (m && !this.closed) {
// Chop inbound
var psize = m[0].length;
if (psize >= client.inbound.length) {
client.inbound = null;
} else {
client.inbound = client.inbound.slice(psize);
}
}
m = null;
}
m = null;
}
};

@@ -996,41 +981,40 @@

*/
Client.prototype.processMsg = function() {
var sub = this.subs[this.payload.sid];
if (sub !== undefined) {
sub.received += 1;
// Check for a timeout, and cancel if received >= expected
if (sub.timeout) {
if (sub.received >= sub.expected) {
clearTimeout(sub.timeout);
sub.timeout = null;
}
}
// Check for auto-unsubscribe
if (sub.max !== undefined) {
if (sub.received === sub.max) {
delete this.subs[this.payload.sid];
this.emit('unsubscribe', this.payload.sid, sub.subject);
} else if (sub.received > sub.max) {
this.unsubscribe(this.payload.sid);
sub.callback = null;
}
}
var sub = this.subs[this.payload.sid];
if (sub !== undefined) {
sub.received += 1;
// Check for a timeout, and cancel if received >= expected
if (sub.timeout) {
if (sub.received >= sub.expected) {
clearTimeout(sub.timeout);
sub.timeout = null;
}
}
// Check for auto-unsubscribe
if (sub.max !== undefined) {
if (sub.received === sub.max) {
delete this.subs[this.payload.sid];
this.emit('unsubscribe', this.payload.sid, sub.subject);
} else if (sub.received > sub.max) {
this.unsubscribe(this.payload.sid);
sub.callback = null;
}
}
if (sub.callback) {
var msg = this.payload.msg;
if (this.options.json) {
try {
if (this.options.preserveBuffers) {
msg = JSON.parse(this.payload.msg.toString());
} else {
msg = JSON.parse(this.payload.msg.toString(this.options.encoding));
}
} catch (e) {
msg = e;
if (sub.callback) {
var msg = this.payload.msg;
if (this.options.json) {
try {
if (this.options.preserveBuffers) {
msg = JSON.parse(this.payload.msg.toString());
} else {
msg = JSON.parse(this.payload.msg.toString(this.options.encoding));
}
} catch (e) {
msg = e;
}
}
sub.callback(msg, this.payload.reply, this.payload.subj, this.payload.sid);
}
}
sub.callback(msg, this.payload.reply, this.payload.subj, this.payload.sid);
}
}
};

@@ -1047,9 +1031,9 @@

var m = s ? s.toLowerCase() : '';
if(m.indexOf(STALE_CONNECTION_ERR) !== -1) {
this.scheduleReconnect();
} else if(m.indexOf(PERMISSIONS_ERR) !== -1) {
this.emit('permission_error', new NatsError(s, NATS_PROTOCOL_ERR));
if (m.indexOf(STALE_CONNECTION_ERR) !== -1) {
this.scheduleReconnect();
} else if (m.indexOf(PERMISSIONS_ERR) !== -1) {
this.emit('permission_error', new NatsError(s, NATS_PROTOCOL_ERR));
} else {
this.emit('error', new NatsError(s, NATS_PROTOCOL_ERR));
this.closeStream();
this.emit('error', new NatsError(s, NATS_PROTOCOL_ERR));
this.closeStream();
}

@@ -1063,10 +1047,9 @@ };

* @api public
*/
*/
Client.prototype.addServer = function(uri) {
this.servers.push(new Server(url.parse(uri)));
this.servers.push(new Server(url.parse(uri)));
if (this.options.noRandomize !== true) {
shuffle(this.servers);
}
if (this.options.noRandomize !== true) {
shuffle(this.servers);
}
};

@@ -1081,17 +1064,16 @@

*/
Client.prototype.flush = function(opt_callback) {
if (this.closed) {
if (typeof opt_callback === 'function') {
opt_callback(new NatsError(CONN_CLOSED_MSG, CONN_CLOSED));
return;
} else {
throw(new NatsError(CONN_CLOSED_MSG, CONN_CLOSED));
if (this.closed) {
if (typeof opt_callback === 'function') {
opt_callback(new NatsError(CONN_CLOSED_MSG, CONN_CLOSED));
return;
} else {
throw (new NatsError(CONN_CLOSED_MSG, CONN_CLOSED));
}
}
}
if (this.pongs) {
this.pongs.push(opt_callback);
this.sendCommand(PING_REQUEST);
this.flushPending();
}
if (this.pongs) {
this.pongs.push(opt_callback);
this.sendCommand(PING_REQUEST);
this.flushPending();
}
};

@@ -1108,70 +1090,71 @@

*/
Client.prototype.publish = function(subject, msg, opt_reply, opt_callback) {
// They only supplied a callback function.
if (typeof subject === 'function') {
opt_callback = subject;
subject = undefined;
}
if (!msg) { msg = EMPTY; }
if (!subject) {
if (opt_callback) {
opt_callback(new NatsError(BAD_SUBJECT_MSG, BAD_SUBJECT));
} else {
throw(new NatsError(BAD_SUBJECT_MSG, BAD_SUBJECT));
// They only supplied a callback function.
if (typeof subject === 'function') {
opt_callback = subject;
subject = undefined;
}
}
if (typeof msg === 'function') {
if (opt_callback || opt_reply) {
opt_callback(new NatsError(BAD_MSG_MSG, BAD_MSG));
return;
if (!msg) {
msg = EMPTY;
}
opt_callback = msg;
msg = EMPTY;
opt_reply = undefined;
}
if (typeof opt_reply === 'function') {
if (opt_callback) {
opt_callback(new NatsError(BAD_REPLY_MSG, BAD_REPLY));
return;
if (!subject) {
if (opt_callback) {
opt_callback(new NatsError(BAD_SUBJECT_MSG, BAD_SUBJECT));
} else {
throw (new NatsError(BAD_SUBJECT_MSG, BAD_SUBJECT));
}
}
opt_callback = opt_reply;
opt_reply = undefined;
}
if (typeof msg === 'function') {
if (opt_callback || opt_reply) {
opt_callback(new NatsError(BAD_MSG_MSG, BAD_MSG));
return;
}
opt_callback = msg;
msg = EMPTY;
opt_reply = undefined;
}
if (typeof opt_reply === 'function') {
if (opt_callback) {
opt_callback(new NatsError(BAD_REPLY_MSG, BAD_REPLY));
return;
}
opt_callback = opt_reply;
opt_reply = undefined;
}
// Hold PUB SUB [REPLY]
var psub;
if (opt_reply === undefined) {
psub = 'PUB ' + subject + SPC;
} else {
psub = 'PUB ' + subject + SPC + opt_reply + SPC;
}
// Hold PUB SUB [REPLY]
var psub;
if (opt_reply === undefined) {
psub = 'PUB ' + subject + SPC;
} else {
psub = 'PUB ' + subject + SPC + opt_reply + SPC;
}
// Need to treat sending buffers different.
if (!Buffer.isBuffer(msg)) {
var str = msg;
if (this.options.json) {
if (typeof msg !== 'object') {
throw(new NatsError(BAD_JSON_MSG, BAD_JSON));
}
try {
str = JSON.stringify(msg);
} catch (e) {
throw(new NatsError(BAD_JSON_MSG, BAD_JSON));
}
// Need to treat sending buffers different.
if (!Buffer.isBuffer(msg)) {
var str = msg;
if (this.options.json) {
if (typeof msg !== 'object') {
throw (new NatsError(BAD_JSON_MSG, BAD_JSON));
}
try {
str = JSON.stringify(msg);
} catch (e) {
throw (new NatsError(BAD_JSON_MSG, BAD_JSON));
}
}
this.sendCommand(psub + Buffer.byteLength(str) + CR_LF + str + CR_LF);
} else {
var b = new Buffer(psub.length + msg.length + (2 * CR_LF_LEN) + msg.length.toString().length);
var len = b.write(psub + msg.length + CR_LF);
msg.copy(b, len);
b.write(CR_LF, len + msg.length);
this.sendCommand(b);
}
this.sendCommand(psub + Buffer.byteLength(str) + CR_LF + str + CR_LF);
} else {
var b = new Buffer(psub.length + msg.length + (2 * CR_LF_LEN) + msg.length.toString().length);
var len = b.write(psub + msg.length + CR_LF);
msg.copy(b, len);
b.write(CR_LF, len + msg.length);
this.sendCommand(b);
}
if (opt_callback !== undefined) {
this.flush(opt_callback);
} else if (this.closed) {
throw(new NatsError(CONN_CLOSED_MSG, CONN_CLOSED));
}
if (opt_callback !== undefined) {
this.flush(opt_callback);
} else if (this.closed) {
throw (new NatsError(CONN_CLOSED_MSG, CONN_CLOSED));
}
};

@@ -1189,34 +1172,37 @@

*/
Client.prototype.subscribe = function(subject, opts, callback) {
if (this.closed) {
throw(new NatsError(CONN_CLOSED_MSG, CONN_CLOSED));
}
var qgroup, max;
if (typeof opts === 'function') {
callback = opts;
opts = undefined;
} else if (opts && typeof opts === 'object') {
// FIXME, check exists, error otherwise..
qgroup = opts.queue;
max = opts.max;
}
this.ssid += 1;
this.subs[this.ssid] = { 'subject':subject, 'callback':callback, 'received':0 };
if (this.closed) {
throw (new NatsError(CONN_CLOSED_MSG, CONN_CLOSED));
}
var qgroup, max;
if (typeof opts === 'function') {
callback = opts;
opts = undefined;
} else if (opts && typeof opts === 'object') {
// FIXME, check exists, error otherwise..
qgroup = opts.queue;
max = opts.max;
}
this.ssid += 1;
this.subs[this.ssid] = {
'subject': subject,
'callback': callback,
'received': 0
};
var proto;
if (typeof qgroup === 'string') {
this.subs[this.ssid].qgroup = qgroup;
proto = [SUB, subject, qgroup, this.ssid + CR_LF];
} else {
proto = [SUB, subject, this.ssid + CR_LF];
}
var proto;
if (typeof qgroup === 'string') {
this.subs[this.ssid].qgroup = qgroup;
proto = [SUB, subject, qgroup, this.ssid + CR_LF];
} else {
proto = [SUB, subject, this.ssid + CR_LF];
}
this.sendCommand(proto.join(SPC));
this.emit('subscribe', this.ssid, subject, opts);
this.sendCommand(proto.join(SPC));
this.emit('subscribe', this.ssid, subject, opts);
if (max) {
this.unsubscribe(this.ssid, max);
}
return this.ssid;
if (max) {
this.unsubscribe(this.ssid, max);
}
return this.ssid;
};

@@ -1233,28 +1219,29 @@

*/
Client.prototype.unsubscribe = function(sid, opt_max) {
if (!sid || this.closed) { return; }
if (!sid || this.closed) {
return;
}
var proto;
if (opt_max) {
proto = [UNSUB, sid, opt_max + CR_LF];
} else {
proto = [UNSUB, sid + CR_LF];
}
this.sendCommand(proto.join(SPC));
var proto;
if (opt_max) {
proto = [UNSUB, sid, opt_max + CR_LF];
} else {
proto = [UNSUB, sid + CR_LF];
}
this.sendCommand(proto.join(SPC));
var sub = this.subs[sid];
if (sub === undefined) {
return;
}
sub.max = opt_max;
if (sub.max === undefined || (sub.received >= sub.max)) {
// remove any timeouts that may be pending
if (sub.timeout) {
clearTimeout(sub.timeout);
sub.timeout = null;
var sub = this.subs[sid];
if (sub === undefined) {
return;
}
delete this.subs[sid];
this.emit('unsubscribe', sid, sub.subject);
}
sub.max = opt_max;
if (sub.max === undefined || (sub.received >= sub.max)) {
// remove any timeouts that may be pending
if (sub.timeout) {
clearTimeout(sub.timeout);
sub.timeout = null;
}
delete this.subs[sid];
this.emit('unsubscribe', sid, sub.subject);
}
};

@@ -1272,12 +1259,16 @@

Client.prototype.timeout = function(sid, timeout, expected, callback) {
if (!sid) { return; }
var sub = this.subs[sid];
if (sub === null) { return; }
sub.expected = expected;
var that = this;
sub.timeout = setTimeout(function() {
callback(sid);
// if callback fails unsubscribe will leak
that.unsubscribe(sid);
}, timeout);
if (!sid) {
return;
}
var sub = this.subs[sid];
if (sub === null) {
return;
}
sub.expected = expected;
var that = this;
sub.timeout = setTimeout(function() {
callback(sid);
// if callback fails unsubscribe will leak
that.unsubscribe(sid);
}, timeout);
};

@@ -1300,17 +1291,17 @@

Client.prototype.request = function(subject, opt_msg, opt_options, callback) {
if (typeof opt_msg === 'function') {
callback = opt_msg;
opt_msg = EMPTY;
opt_options = null;
}
if (typeof opt_options === 'function') {
callback = opt_options;
opt_options = null;
}
var inbox = createInbox();
var s = this.subscribe(inbox, opt_options, function(msg, reply) {
callback(msg, reply);
});
this.publish(subject, opt_msg, inbox);
return s;
if (typeof opt_msg === 'function') {
callback = opt_msg;
opt_msg = EMPTY;
opt_options = null;
}
if (typeof opt_options === 'function') {
callback = opt_options;
opt_options = null;
}
var inbox = createInbox();
var s = this.subscribe(inbox, opt_options, function(msg, reply) {
callback(msg, reply);
});
this.publish(subject, opt_msg, inbox);
return s;
};

@@ -1337,9 +1328,9 @@

Client.prototype.requestOne = function(subject, opt_msg, opt_options, timeout, callback) {
opt_options = opt_options || {};
opt_options.max = 1;
var sid = this.request(subject, opt_msg, opt_options, callback);
this.timeout(sid, timeout, 1, function() {
callback(new NatsError(REQ_TIMEOUT_MSG_PREFIX + sid, REQ_TIMEOUT));
});
return sid;
opt_options = opt_options || {};
opt_options.max = 1;
var sid = this.request(subject, opt_msg, opt_options, callback);
this.timeout(sid, timeout, 1, function() {
callback(new NatsError(REQ_TIMEOUT_MSG_PREFIX + sid, REQ_TIMEOUT));
});
return sid;
};

@@ -1353,5 +1344,4 @@

*/
Client.prototype.numSubscriptions = function() {
return Object.keys(this.subs).length;
return Object.keys(this.subs).length;
};

@@ -1364,10 +1354,11 @@

*/
Client.prototype.reconnect = function() {
if (this.closed) { return; }
this.reconnects += 1;
this.createConnection();
if (this.currentServer.didConnect === true) {
this.emit('reconnecting');
}
if (this.closed) {
return;
}
this.reconnects += 1;
this.createConnection();
if (this.currentServer.didConnect === true) {
this.emit('reconnecting');
}
};

@@ -1380,20 +1371,21 @@

*/
Client.prototype.scheduleReconnect = function() {
var client = this;
// Just return if no more servers
if (client.servers.length === 0) {
return;
}
// Don't set reconnecting state if we are just trying
// for the first time.
if (client.wasConnected === true) {
client.reconnecting = true;
}
// Only stall if we have connected before.
var wait = 0;
if (client.servers[0].didConnect === true) {
wait = this.options.reconnectTimeWait;
}
setTimeout(function() { client.reconnect(); }, wait);
var client = this;
// Just return if no more servers
if (client.servers.length === 0) {
return;
}
// Don't set reconnecting state if we are just trying
// for the first time.
if (client.wasConnected === true) {
client.reconnecting = true;
}
// Only stall if we have connected before.
var wait = 0;
if (client.servers[0].didConnect === true) {
wait = this.options.reconnectTimeWait;
}
setTimeout(function() {
client.reconnect();
}, wait);
};
{
"name": "nats",
"version": "0.7.20",
"version": "0.7.24",
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system",

@@ -32,3 +32,2 @@ "keywords": [

"scripts": {
"lint": "jshint --reporter node_modules/jshint-stylish lib test examples",
"depcheck": "dependency-check . lib/*",

@@ -39,3 +38,5 @@ "depcheck:unused": "dependency-check ./package.json --unused --no-dev lib/*",

"coveralls": "npm run cover -- --report lcovonly && cat ./reports/coverage/lcov.info | coveralls",
"cover": "istanbul cover _mocha"
"cover": "istanbul cover _mocha",
"lint": "if-node-version >=4 eslint ./lib ./examples ./benchmark ./test",
"fmt": "js-beautify -n --config crockford.jscsrc -r lib/* test/*.js test/support/*.js examples/* benchmark/*.js"
},

@@ -51,3 +52,6 @@ "engines": {

"dependency-check": "2.5.x",
"eslint": "^3.19.0",
"if-node-version": "^1.1.1",
"istanbul": "0.4.x",
"js-beautify": "^1.6.12",
"jshint": "2.9.x",

@@ -54,0 +58,0 @@ "jshint-stylish": "2.2.x",

@@ -222,6 +222,50 @@ # NATS - Node.js Client

// Reconnect Attempts and Time between reconnects
// By default a NATS connection will try to reconnect to a server 10 times
// waiting 2 seconds between reconnect attempts. If the maximum number of
// retries is reached, the client will close the connection.
// To change the default behaviour specify the max number of connection
// attempts in `maxReconnectAttempts` (set to -1 to retry forever), and the
// time in milliseconds between reconnects in `reconnectTimeWait`.
nc = nats.connect({'maxReconnectAttempts': -1, 'reconnectTimeWait': 250});
nc.on('error', function(err) {
console.log(err);
});
nc.on('connect', function(nc) {
console.log('connected');
});
nc.on('disconnect', function() {
console.log('disconnect');
});
nc.on('reconnecting', function() {
console.log('reconnecting');
});
nc.on('reconnect', function(nc) {
console.log('reconnect');
});
nc.on('close', function() {
console.log('close');
});
```
See examples and benchmarks for more information..
See examples and benchmarks for more information.
## Supported Node Versions
Support policy for Nodejs versions follows
[Nodejs release support]( https://github.com/nodejs/Release).
We will support and build node-nats on even Nodejs versions that are current
or in maintenance.
## License

@@ -228,0 +272,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc