Comparing version 0.7.20 to 0.7.24
import events = require('events'); | ||
import tls = require('tls'); | ||
@@ -9,13 +10,17 @@ export const version: string; | ||
*/ | ||
export const BAD_SUBJECT: string; | ||
export const BAD_AUTHENTICATION: string; | ||
export const BAD_JSON: string; | ||
export const BAD_MSG: string; | ||
export const BAD_REPLY: string; | ||
export const BAD_SUBJECT: string; | ||
export const CLIENT_CERT_REQ: string; | ||
export const CONN_CLOSED: string; | ||
export const BAD_JSON: string; | ||
export const BAD_AUTHENTICATION: string; | ||
export const CONN_ERR: string; | ||
export const INVALID_ENCODING: string; | ||
export const NATS_PROTOCOL_ERR: string; | ||
export const NON_SECURE_CONN_REQ: string; | ||
export const PERMISSIONS_ERR: string; | ||
export const REQ_TIMEOUT: string; | ||
export const SECURE_CONN_REQ: string; | ||
export const NON_SECURE_CONN_REQ: string; | ||
export const CLIENT_CERT_REQ: string; | ||
export const NATS_PROTOCOL_ERR: string; | ||
export const STALE_CONNECTION_ERR: string; | ||
@@ -60,3 +65,3 @@ /** | ||
encoding?: BufferEncoding, | ||
tls?: boolean, | ||
tls?: boolean | tls.TlsOptions, | ||
name?: string, | ||
@@ -63,0 +68,0 @@ yieldTime?: number, |
1698
lib/nats.js
@@ -14,9 +14,8 @@ /*! | ||
*/ | ||
var net = require('net'), | ||
tls = require('tls'), | ||
url = require('url'), | ||
util = require('util'), | ||
var net = require('net'), | ||
tls = require('tls'), | ||
url = require('url'), | ||
util = require('util'), | ||
events = require('events'), | ||
nuid = require('nuid'); | ||
nuid = require('nuid'); | ||
@@ -26,8 +25,7 @@ /** | ||
*/ | ||
var VERSION = '0.7.24', | ||
var VERSION = '0.7.20', | ||
DEFAULT_PORT = 4222, | ||
DEFAULT_PRE = 'nats://localhost:', | ||
DEFAULT_URI = DEFAULT_PRE + DEFAULT_PORT, | ||
DEFAULT_PRE = 'nats://localhost:', | ||
DEFAULT_URI = DEFAULT_PRE + DEFAULT_PORT, | ||
@@ -41,3 +39,3 @@ MAX_CONTROL_LINE_SIZE = 512, | ||
// Reconnect Parameters, 2 sec wait, 10 tries | ||
DEFAULT_RECONNECT_TIME_WAIT = 2*1000, | ||
DEFAULT_RECONNECT_TIME_WAIT = 2 * 1000, | ||
DEFAULT_MAX_RECONNECT_ATTEMPTS = 10, | ||
@@ -48,8 +46,8 @@ | ||
MSG = /^MSG\s+([^\s\r\n]+)\s+([^\s\r\n]+)\s+(([^\s\r\n]+)[^\S\r\n]+)?(\d+)\r\n/i, | ||
OK = /^\+OK\s*\r\n/i, | ||
ERR = /^-ERR\s+('.+')?\r\n/i, | ||
PING = /^PING\r\n/i, | ||
PONG = /^PONG\r\n/i, | ||
INFO = /^INFO\s+([^\r\n]+)\r\n/i, | ||
MSG = /^MSG\s+([^\s\r\n]+)\s+([^\s\r\n]+)\s+(([^\s\r\n]+)[^\S\r\n]+)?(\d+)\r\n/i, | ||
OK = /^\+OK\s*\r\n/i, | ||
ERR = /^-ERR\s+('.+')?\r\n/i, | ||
PING = /^PING\r\n/i, | ||
PONG = /^PONG\r\n/i, | ||
INFO = /^INFO\s+([^\r\n]+)\r\n/i, | ||
SUBRE = /^SUB\s+([^\r\n]+)\r\n/i, | ||
@@ -64,13 +62,15 @@ | ||
//PUB = 'PUB', // TODO: remove / never used | ||
SUB = 'SUB', | ||
UNSUB = 'UNSUB', | ||
SUB = 'SUB', | ||
UNSUB = 'UNSUB', | ||
CONNECT = 'CONNECT', | ||
// Responses | ||
PING_REQUEST = 'PING' + CR_LF, | ||
PING_REQUEST = 'PING' + CR_LF, | ||
PONG_RESPONSE = 'PONG' + CR_LF, | ||
// Errors | ||
BAD_SUBJECT = 'BAD_SUBJECT', | ||
BAD_SUBJECT_MSG = 'Subject must be supplied', | ||
BAD_AUTHENTICATION = 'BAD_AUTHENTICATION', | ||
BAD_AUTHENTICATION_MSG = 'User and Token can not both be provided', | ||
BAD_JSON = 'BAD_JSON', | ||
BAD_JSON_MSG = 'Message should be a JSON object', | ||
BAD_MSG = 'BAD_MSG', | ||
@@ -80,23 +80,21 @@ BAD_MSG_MSG = 'Message can\'t be a function', | ||
BAD_REPLY_MSG = 'Reply can\'t be a function', | ||
BAD_SUBJECT = 'BAD_SUBJECT', | ||
BAD_SUBJECT_MSG = 'Subject must be supplied', | ||
CLIENT_CERT_REQ = 'CLIENT_CERT_REQ', | ||
CLIENT_CERT_REQ_MSG = 'Server requires a client certificate.', | ||
CONN_CLOSED = 'CONN_CLOSED', | ||
CONN_CLOSED_MSG = 'Connection closed', | ||
BAD_JSON = 'BAD_JSON', | ||
BAD_JSON_MSG = 'Message should be a JSON object', | ||
BAD_AUTHENTICATION = 'BAD_AUTHENTICATION', | ||
BAD_AUTHENTICATION_MSG = 'User and Token can not both be provided', | ||
CONN_ERR = 'CONN_ERR', | ||
CONN_ERR_MSG_PREFIX = 'Could not connect to server: ', | ||
INVALID_ENCODING = 'INVALID_ENCODING', | ||
INVALID_ENCODING_MSG_PREFIX = 'Invalid Encoding:', | ||
SECURE_CONN_REQ = 'SECURE_CONN_REQ', | ||
SECURE_CONN_REQ_MSG = 'Server requires a secure connection.', | ||
NATS_PROTOCOL_ERR = 'NATS_PROTOCOL_ERR', | ||
NON_SECURE_CONN_REQ = 'NON_SECURE_CONN_REQ', | ||
NON_SECURE_CONN_REQ_MSG = 'Server does not support a secure connection.', | ||
CLIENT_CERT_REQ = 'CLIENT_CERT_REQ', | ||
CLIENT_CERT_REQ_MSG = 'Server requires a client certificate.', | ||
CONN_ERR = 'CONN_ERR', | ||
CONN_ERR_MSG_PREFIX = 'Could not connect to server: ', | ||
NATS_PROTOCOL_ERR = 'NATS_PROTOCOL_ERR', | ||
PERMISSIONS_ERR = "permissions violation", | ||
REQ_TIMEOUT = 'REQ_TIMEOUT', | ||
REQ_TIMEOUT_MSG_PREFIX = 'The request timed out for subscription id: ', | ||
SECURE_CONN_REQ = 'SECURE_CONN_REQ', | ||
SECURE_CONN_REQ_MSG = 'Server requires a secure connection.', | ||
STALE_CONNECTION_ERR = "stale connection", | ||
PERMISSIONS_ERR = "permissions violation", | ||
@@ -110,3 +108,3 @@ // Pedantic Mode support | ||
function NatsError(message, code, chainedError) { | ||
function NatsError(message, code, chainedError) { | ||
Error.captureStackTrace(this, this.constructor); | ||
@@ -117,6 +115,6 @@ this.name = this.constructor.name; | ||
this.chainedError = chainedError; | ||
} | ||
} | ||
util.inherits(NatsError, Error); | ||
exports.NatsError = NatsError; | ||
util.inherits(NatsError, Error); | ||
exports.NatsError = NatsError; | ||
@@ -149,6 +147,5 @@ /** | ||
* @api public | ||
*/ | ||
*/ | ||
var createInbox = exports.createInbox = function() { | ||
return ("_INBOX." + nuid.next()); | ||
return ("_INBOX." + nuid.next()); | ||
}; | ||
@@ -162,8 +159,7 @@ | ||
*/ | ||
function Client(opts) { | ||
events.EventEmitter.call(this); | ||
this.parseOptions(opts); | ||
this.initState(); | ||
this.createConnection(); | ||
events.EventEmitter.call(this); | ||
this.parseOptions(opts); | ||
this.initState(); | ||
this.createConnection(); | ||
} | ||
@@ -180,5 +176,4 @@ | ||
*/ | ||
exports.connect = function(opts) { | ||
return new Client(opts); | ||
return new Client(opts); | ||
}; | ||
@@ -189,3 +184,2 @@ | ||
*/ | ||
util.inherits(Client, events.EventEmitter); | ||
@@ -198,22 +192,21 @@ | ||
*/ | ||
Client.prototype.createInbox = createInbox; | ||
Client.prototype.assignOption = function(opts, prop, assign) { | ||
if (assign === undefined) { | ||
assign = prop; | ||
} | ||
if (opts[prop] !== undefined) { | ||
this.options[assign] = opts[prop]; | ||
} | ||
if (assign === undefined) { | ||
assign = prop; | ||
} | ||
if (opts[prop] !== undefined) { | ||
this.options[assign] = opts[prop]; | ||
} | ||
}; | ||
function shuffle(array) { | ||
for (var i = array.length - 1; i > 0; i--) { | ||
var j = Math.floor(Math.random() * (i + 1)); | ||
var temp = array[i]; | ||
array[i] = array[j]; | ||
array[j] = temp; | ||
} | ||
return array; | ||
for (var i = array.length - 1; i > 0; i--) { | ||
var j = Math.floor(Math.random() * (i + 1)); | ||
var temp = array[i]; | ||
array[i] = array[j]; | ||
array[j] = temp; | ||
} | ||
return array; | ||
} | ||
@@ -227,91 +220,90 @@ | ||
*/ | ||
Client.prototype.parseOptions = function(opts) { | ||
var options = this.options = { | ||
'verbose' : false, | ||
'pedantic' : false, | ||
'reconnect' : true, | ||
'maxReconnectAttempts' : DEFAULT_MAX_RECONNECT_ATTEMPTS, | ||
'reconnectTimeWait' : DEFAULT_RECONNECT_TIME_WAIT, | ||
'encoding' : 'utf8', | ||
'tls' : false, | ||
'waitOnFirstConnect' : false, | ||
}; | ||
var options = this.options = { | ||
'verbose': false, | ||
'pedantic': false, | ||
'reconnect': true, | ||
'maxReconnectAttempts': DEFAULT_MAX_RECONNECT_ATTEMPTS, | ||
'reconnectTimeWait': DEFAULT_RECONNECT_TIME_WAIT, | ||
'encoding': 'utf8', | ||
'tls': false, | ||
'waitOnFirstConnect': false, | ||
}; | ||
if (undefined === opts) { | ||
options.url = DEFAULT_URI; | ||
} else if ('number' === typeof opts) { | ||
options.url = DEFAULT_PRE + opts; | ||
} else if ('string' === typeof opts) { | ||
options.url = opts; | ||
} else if ('object' === typeof opts) { | ||
if (opts.port !== undefined) { | ||
options.url = DEFAULT_PRE + opts.port; | ||
if (undefined === opts) { | ||
options.url = DEFAULT_URI; | ||
} else if ('number' === typeof opts) { | ||
options.url = DEFAULT_PRE + opts; | ||
} else if ('string' === typeof opts) { | ||
options.url = opts; | ||
} else if ('object' === typeof opts) { | ||
if (opts.port !== undefined) { | ||
options.url = DEFAULT_PRE + opts.port; | ||
} | ||
// Pull out various options here | ||
this.assignOption(opts, 'url'); | ||
this.assignOption(opts, 'uri', 'url'); | ||
this.assignOption(opts, 'user'); | ||
this.assignOption(opts, 'pass'); | ||
this.assignOption(opts, 'token'); | ||
this.assignOption(opts, 'password', 'pass'); | ||
this.assignOption(opts, 'verbose'); | ||
this.assignOption(opts, 'pedantic'); | ||
this.assignOption(opts, 'reconnect'); | ||
this.assignOption(opts, 'maxReconnectAttempts'); | ||
this.assignOption(opts, 'reconnectTimeWait'); | ||
this.assignOption(opts, 'servers'); | ||
this.assignOption(opts, 'urls', 'servers'); | ||
this.assignOption(opts, 'noRandomize'); | ||
this.assignOption(opts, 'NoRandomize', 'noRandomize'); | ||
this.assignOption(opts, 'dontRandomize', 'noRandomize'); | ||
this.assignOption(opts, 'encoding'); | ||
this.assignOption(opts, 'tls'); | ||
this.assignOption(opts, 'secure', 'tls'); | ||
this.assignOption(opts, 'name'); | ||
this.assignOption(opts, 'client', 'name'); | ||
this.assignOption(opts, 'yieldTime'); | ||
this.assignOption(opts, 'waitOnFirstConnect'); | ||
this.assignOption(opts, 'json'); | ||
this.assignOption(opts, 'preserveBuffers'); | ||
} | ||
// Pull out various options here | ||
this.assignOption(opts, 'url'); | ||
this.assignOption(opts, 'uri', 'url'); | ||
this.assignOption(opts, 'user'); | ||
this.assignOption(opts, 'pass'); | ||
this.assignOption(opts, 'token'); | ||
this.assignOption(opts, 'password', 'pass'); | ||
this.assignOption(opts, 'verbose'); | ||
this.assignOption(opts, 'pedantic'); | ||
this.assignOption(opts, 'reconnect'); | ||
this.assignOption(opts, 'maxReconnectAttempts'); | ||
this.assignOption(opts, 'reconnectTimeWait'); | ||
this.assignOption(opts, 'servers'); | ||
this.assignOption(opts, 'urls', 'servers'); | ||
this.assignOption(opts, 'noRandomize'); | ||
this.assignOption(opts, 'NoRandomize', 'noRandomize'); | ||
this.assignOption(opts, 'dontRandomize', 'noRandomize'); | ||
this.assignOption(opts, 'encoding'); | ||
this.assignOption(opts, 'tls'); | ||
this.assignOption(opts, 'secure', 'tls'); | ||
this.assignOption(opts, 'name'); | ||
this.assignOption(opts, 'client', 'name'); | ||
this.assignOption(opts, 'yieldTime'); | ||
this.assignOption(opts, 'waitOnFirstConnect'); | ||
this.assignOption(opts, 'json'); | ||
this.assignOption(opts, 'preserveBuffers'); | ||
} | ||
var client = this; | ||
var client = this; | ||
// Set user/pass as needed if in options. | ||
client.user = options.user; | ||
client.pass = options.pass; | ||
// Set token as needed if in options. | ||
client.token = options.token; | ||
// Set user/pass as needed if in options. | ||
client.user = options.user; | ||
client.pass = options.pass; | ||
// Authentication - make sure authentication is valid. | ||
if (client.user && client.token) { | ||
throw(new NatsError(BAD_AUTHENTICATION_MSG, BAD_AUTHENTICATION)); | ||
} | ||
// Set token as needed if in options. | ||
client.token = options.token; | ||
// Encoding - make sure its valid. | ||
if (Buffer.isEncoding(options.encoding)) { | ||
client.encoding = options.encoding; | ||
} else { | ||
throw new NatsError(INVALID_ENCODING_MSG_PREFIX + options.encoding, INVALID_ENCODING); | ||
} | ||
// For cluster support | ||
client.servers = []; | ||
// Authentication - make sure authentication is valid. | ||
if (client.user && client.token) { | ||
throw (new NatsError(BAD_AUTHENTICATION_MSG, BAD_AUTHENTICATION)); | ||
} | ||
if (Array.isArray(options.servers)) { | ||
options.servers.forEach(function(server) { | ||
client.servers.push(new Server(url.parse(server))); | ||
}); | ||
} else { | ||
if (undefined === options.url) { | ||
options.url = DEFAULT_URI; | ||
// Encoding - make sure its valid. | ||
if (Buffer.isEncoding(options.encoding)) { | ||
client.encoding = options.encoding; | ||
} else { | ||
throw new NatsError(INVALID_ENCODING_MSG_PREFIX + options.encoding, INVALID_ENCODING); | ||
} | ||
client.servers.push(new Server(url.parse(options.url))); | ||
} | ||
// For cluster support | ||
client.servers = []; | ||
// Randomize if needed | ||
if (options.noRandomize !== true) { | ||
shuffle(client.servers); | ||
} | ||
if (Array.isArray(options.servers)) { | ||
options.servers.forEach(function(server) { | ||
client.servers.push(new Server(url.parse(server))); | ||
}); | ||
} else { | ||
if (undefined === options.url) { | ||
options.url = DEFAULT_URI; | ||
} | ||
client.servers.push(new Server(url.parse(options.url))); | ||
} | ||
// Randomize if needed | ||
if (options.noRandomize !== true) { | ||
shuffle(client.servers); | ||
} | ||
}; | ||
@@ -323,8 +315,7 @@ | ||
* @api private | ||
*/ | ||
*/ | ||
function Server(url) { | ||
this.url = url; | ||
this.didConnect = false; | ||
this.reconnects = 0; | ||
this.url = url; | ||
this.didConnect = false; | ||
this.reconnects = 0; | ||
} | ||
@@ -339,27 +330,26 @@ | ||
* @api private | ||
*/ | ||
*/ | ||
Client.prototype.selectServer = function() { | ||
var client = this; | ||
var server = client.servers.shift(); | ||
var client = this; | ||
var server = client.servers.shift(); | ||
// Place in client context. | ||
client.currentServer = server; | ||
client.url = server.url; | ||
if ('auth' in server.url && !!server.url.auth) { | ||
var auth = server.url.auth.split(':'); | ||
if (auth.length !== 1) { | ||
if (client.options.user === undefined) { | ||
client.user = auth[0]; | ||
} | ||
if (client.options.pass === undefined) { | ||
client.pass = auth[1]; | ||
} | ||
} else { | ||
if (client.options.token === undefined) { | ||
client.token = auth[0]; | ||
} | ||
// Place in client context. | ||
client.currentServer = server; | ||
client.url = server.url; | ||
if ('auth' in server.url && !!server.url.auth) { | ||
var auth = server.url.auth.split(':'); | ||
if (auth.length !== 1) { | ||
if (client.options.user === undefined) { | ||
client.user = auth[0]; | ||
} | ||
if (client.options.pass === undefined) { | ||
client.pass = auth[1]; | ||
} | ||
} else { | ||
if (client.options.token === undefined) { | ||
client.token = auth[0]; | ||
} | ||
} | ||
} | ||
} | ||
client.servers.push(server); | ||
client.servers.push(server); | ||
}; | ||
@@ -371,26 +361,25 @@ | ||
* @api private | ||
*/ | ||
*/ | ||
Client.prototype.checkTLSMismatch = function() { | ||
if (this.info.tls_required === true && | ||
this.options.tls === false) { | ||
this.emit('error', new NatsError(SECURE_CONN_REQ_MSG, SECURE_CONN_REQ)); | ||
this.closeStream(); | ||
return true; | ||
} | ||
if (this.info.tls_required === true && | ||
this.options.tls === false) { | ||
this.emit('error', new NatsError(SECURE_CONN_REQ_MSG, SECURE_CONN_REQ)); | ||
this.closeStream(); | ||
return true; | ||
} | ||
if (this.info.tls_required === false && | ||
this.options.tls !== false) { | ||
this.emit('error', new NatsError(NON_SECURE_CONN_REQ_MSG, NON_SECURE_CONN_REQ)); | ||
this.closeStream(); | ||
return true; | ||
} | ||
if (this.info.tls_required === false && | ||
this.options.tls !== false) { | ||
this.emit('error', new NatsError(NON_SECURE_CONN_REQ_MSG, NON_SECURE_CONN_REQ)); | ||
this.closeStream(); | ||
return true; | ||
} | ||
if (this.info.tls_verify === true && | ||
this.options.tls.cert === undefined) { | ||
this.emit('error', new NatsError(CLIENT_CERT_REQ_MSG, CLIENT_CERT_REQ)); | ||
this.closeStream(); | ||
return true; | ||
} | ||
return false; | ||
if (this.info.tls_verify === true && | ||
this.options.tls.cert === undefined) { | ||
this.emit('error', new NatsError(CLIENT_CERT_REQ_MSG, CLIENT_CERT_REQ)); | ||
this.closeStream(); | ||
return true; | ||
} | ||
return false; | ||
}; | ||
@@ -402,15 +391,14 @@ | ||
* @api private | ||
*/ | ||
*/ | ||
Client.prototype.connectCB = function() { | ||
var wasReconnecting = this.reconnecting; | ||
var event = (wasReconnecting === true) ? 'reconnect' : 'connect'; | ||
this.reconnecting = false; | ||
this.reconnects = 0; | ||
this.wasConnected = true; | ||
this.currentServer.didConnect = true; | ||
var wasReconnecting = this.reconnecting; | ||
var event = (wasReconnecting === true) ? 'reconnect' : 'connect'; | ||
this.reconnecting = false; | ||
this.reconnects = 0; | ||
this.wasConnected = true; | ||
this.currentServer.didConnect = true; | ||
this.emit(event, this); | ||
this.emit(event, this); | ||
this.flushPending(); | ||
this.flushPending(); | ||
}; | ||
@@ -423,69 +411,68 @@ | ||
* @api private | ||
*/ | ||
*/ | ||
Client.prototype.setupHandlers = function() { | ||
var client = this; | ||
var stream = client.stream; | ||
var client = this; | ||
var stream = client.stream; | ||
if (undefined === stream) { | ||
return; | ||
} | ||
if (undefined === stream) { | ||
return; | ||
} | ||
stream.on('connect', function() { | ||
client.connected = true; | ||
}); | ||
stream.on('connect', function() { | ||
client.connected = true; | ||
}); | ||
stream.on('close', function(hadError) { | ||
client.closeStream(); | ||
client.emit('disconnect'); | ||
if (client.closed === true || | ||
client.options.reconnect === false || | ||
((client.reconnects >= client.options.maxReconnectAttempts) && client.options.maxReconnectAttempts !== -1)) { | ||
client.emit('close'); | ||
} else { | ||
client.scheduleReconnect(); | ||
} | ||
}); | ||
stream.on('close', function(hadError) { | ||
client.closeStream(); | ||
client.emit('disconnect'); | ||
if (client.closed === true || | ||
client.options.reconnect === false || | ||
((client.reconnects >= client.options.maxReconnectAttempts) && client.options.maxReconnectAttempts !== -1)) { | ||
client.emit('close'); | ||
} else { | ||
client.scheduleReconnect(); | ||
} | ||
}); | ||
stream.on('error', function(exception) { | ||
// If we were connected just return, close event will process | ||
if (client.wasConnected === true && client.currentServer.didConnect === true) { | ||
return; | ||
} | ||
stream.on('error', function(exception) { | ||
// If we were connected just return, close event will process | ||
if (client.wasConnected === true && client.currentServer.didConnect === true) { | ||
return; | ||
} | ||
// if the current server did not connect at all, and we in | ||
// general have not connected to any server, remove it from | ||
// this list. Unless overidden | ||
if (client.wasConnected === false && client.currentServer.didConnect === false) { | ||
// We can override this behavior with waitOnFirstConnect, which will | ||
// treat it like a reconnect scenario. | ||
if (client.options.waitOnFirstConnect) { | ||
// Pretend to move us into a reconnect state. | ||
client.currentServer.didConnect = true; | ||
} else { | ||
client.servers.splice(client.servers.length-1, 1); | ||
} | ||
} | ||
// if the current server did not connect at all, and we in | ||
// general have not connected to any server, remove it from | ||
// this list. Unless overidden | ||
if (client.wasConnected === false && client.currentServer.didConnect === false) { | ||
// We can override this behavior with waitOnFirstConnect, which will | ||
// treat it like a reconnect scenario. | ||
if (client.options.waitOnFirstConnect) { | ||
// Pretend to move us into a reconnect state. | ||
client.currentServer.didConnect = true; | ||
} else { | ||
client.servers.splice(client.servers.length - 1, 1); | ||
} | ||
} | ||
// Only bubble up error if we never had connected | ||
// to the server and we only have one. | ||
if (client.wasConnected === false && client.servers.length === 0) { | ||
client.emit('error', new NatsError(CONN_ERR_MSG_PREFIX + exception, CONN_ERR, exception)); | ||
} | ||
client.closeStream(); | ||
}); | ||
// Only bubble up error if we never had connected | ||
// to the server and we only have one. | ||
if (client.wasConnected === false && client.servers.length === 0) { | ||
client.emit('error', new NatsError(CONN_ERR_MSG_PREFIX + exception, CONN_ERR, exception)); | ||
} | ||
client.closeStream(); | ||
}); | ||
stream.on('data', function (data) { | ||
// If inbound exists, concat them together. We try to avoid this for split | ||
// messages, so this should only really happen for a split control line. | ||
// Long term answer is hand rolled parser and not regexp. | ||
if (client.inbound) { | ||
client.inbound = Buffer.concat([client.inbound, data]); | ||
} else { | ||
client.inbound = data; | ||
} | ||
stream.on('data', function(data) { | ||
// If inbound exists, concat them together. We try to avoid this for split | ||
// messages, so this should only really happen for a split control line. | ||
// Long term answer is hand rolled parser and not regexp. | ||
if (client.inbound) { | ||
client.inbound = Buffer.concat([client.inbound, data]); | ||
} else { | ||
client.inbound = data; | ||
} | ||
// Process the inbound queue. | ||
client.processInbound(); | ||
}); | ||
// Process the inbound queue. | ||
client.processInbound(); | ||
}); | ||
}; | ||
@@ -498,27 +485,26 @@ | ||
* @api private | ||
*/ | ||
*/ | ||
Client.prototype.sendConnect = function() { | ||
// Queue the connect command. | ||
var cs = { | ||
'lang' : 'node', | ||
'version' : VERSION, | ||
'verbose' : this.options.verbose, | ||
'pedantic': this.options.pedantic, | ||
'protocol': 1, | ||
}; | ||
if (this.user !== undefined) { | ||
cs.user = this.user; | ||
cs.pass = this.pass; | ||
} | ||
if (this.token !== undefined) { | ||
cs.auth_token = this.token; | ||
} | ||
if (this.options.name !== undefined) { | ||
cs.name = this.options.name; | ||
} | ||
// If we enqueued requests before we received INFO from the server, or we | ||
// reconnected, there be other data pending, write this immediately instead | ||
// of adding it to the queue. | ||
this.stream.write(CONNECT + SPC + JSON.stringify(cs) + CR_LF); | ||
// Queue the connect command. | ||
var cs = { | ||
'lang': 'node', | ||
'version': VERSION, | ||
'verbose': this.options.verbose, | ||
'pedantic': this.options.pedantic, | ||
'protocol': 1, | ||
}; | ||
if (this.user !== undefined) { | ||
cs.user = this.user; | ||
cs.pass = this.pass; | ||
} | ||
if (this.token !== undefined) { | ||
cs.auth_token = this.token; | ||
} | ||
if (this.options.name !== undefined) { | ||
cs.name = this.options.name; | ||
} | ||
// If we enqueued requests before we received INFO from the server, or we | ||
// reconnected, there be other data pending, write this immediately instead | ||
// of adding it to the queue. | ||
this.stream.write(CONNECT + SPC + JSON.stringify(cs) + CR_LF); | ||
}; | ||
@@ -530,59 +516,58 @@ | ||
* @api private | ||
*/ | ||
*/ | ||
Client.prototype.createConnection = function() { | ||
// Commands may have been queued during reconnect. Discard everything except: | ||
// 1) ping requests with a pong callback | ||
// 2) publish requests | ||
// | ||
// Rationale: CONNECT and SUBs are written directly upon connecting, any PONG | ||
// response is no longer relevant, and any UNSUB will be accounted for when we | ||
// sync our SUBs. Without this, users of the client may miss state transitions | ||
// via callbacks, would have to track the client's internal connection state, | ||
// and may have to double buffer messages (which we are already doing) if they | ||
// wanted to ensure their messages reach the server. | ||
var pong = []; | ||
var pend = []; | ||
var pSize = 0; | ||
var client = this; | ||
if (client.pending !== null) { | ||
var pongIndex = 0; | ||
client.pending.forEach(function(cmd) { | ||
var cmdLen = Buffer.isBuffer(cmd) ? cmd.length : Buffer.byteLength(cmd); | ||
if (cmd === PING_REQUEST && client.pongs !== null && pongIndex < client.pongs.length) { | ||
// filter out any useless ping requests (no pong callback, nop flush) | ||
var p = client.pongs[pongIndex++]; | ||
if (p !== undefined) { | ||
pend.push(cmd); | ||
pSize += cmdLen; | ||
pong.push(p); | ||
} | ||
} else if (cmd.length > 3 && cmd[0] == 'P' && cmd[1] == 'U' && cmd[2] == 'B') { | ||
pend.push(cmd); | ||
pSize += cmdLen; | ||
} | ||
}); | ||
} | ||
this.pongs = pong; | ||
this.pending = pend; | ||
this.pSize = pSize; | ||
// Commands may have been queued during reconnect. Discard everything except: | ||
// 1) ping requests with a pong callback | ||
// 2) publish requests | ||
// | ||
// Rationale: CONNECT and SUBs are written directly upon connecting, any PONG | ||
// response is no longer relevant, and any UNSUB will be accounted for when we | ||
// sync our SUBs. Without this, users of the client may miss state transitions | ||
// via callbacks, would have to track the client's internal connection state, | ||
// and may have to double buffer messages (which we are already doing) if they | ||
// wanted to ensure their messages reach the server. | ||
var pong = []; | ||
var pend = []; | ||
var pSize = 0; | ||
var client = this; | ||
if (client.pending !== null) { | ||
var pongIndex = 0; | ||
client.pending.forEach(function(cmd) { | ||
var cmdLen = Buffer.isBuffer(cmd) ? cmd.length : Buffer.byteLength(cmd); | ||
if (cmd === PING_REQUEST && client.pongs !== null && pongIndex < client.pongs.length) { | ||
// filter out any useless ping requests (no pong callback, nop flush) | ||
var p = client.pongs[pongIndex++]; | ||
if (p !== undefined) { | ||
pend.push(cmd); | ||
pSize += cmdLen; | ||
pong.push(p); | ||
} | ||
} else if (cmd.length > 3 && cmd[0] === 'P' && cmd[1] === 'U' && cmd[2] === 'B') { | ||
pend.push(cmd); | ||
pSize += cmdLen; | ||
} | ||
}); | ||
} | ||
this.pongs = pong; | ||
this.pending = pend; | ||
this.pSize = pSize; | ||
this.pstate = AWAITING_CONTROL; | ||
this.pstate = AWAITING_CONTROL; | ||
// Clear info processing. | ||
this.info = null; | ||
this.infoReceived = false; | ||
// Clear info processing. | ||
this.info = null; | ||
this.infoReceived = false; | ||
// Select a server to connect to. | ||
this.selectServer(); | ||
// Select a server to connect to. | ||
this.selectServer(); | ||
// See #45 if we have a stream release the listeners | ||
// otherwise in addition to the leak events will fire fire | ||
if(this.stream) { | ||
this.stream.removeAllListeners(); | ||
this.stream.end(); | ||
} | ||
// otherwise in addition to the leak events will fire fire | ||
if (this.stream) { | ||
this.stream.removeAllListeners(); | ||
this.stream.end(); | ||
} | ||
// Create the stream | ||
this.stream = net.createConnection(this.url.port, this.url.hostname); | ||
// Setup the proper handlers. | ||
this.setupHandlers(); | ||
// Setup the proper handlers. | ||
this.setupHandlers(); | ||
}; | ||
@@ -595,12 +580,11 @@ | ||
*/ | ||
Client.prototype.initState = function() { | ||
this.ssid = 1; | ||
this.subs = {}; | ||
this.reconnects = 0; | ||
this.connected = false; | ||
this.wasConnected = false; | ||
this.reconnecting = false; | ||
this.server = null; | ||
this.pending = []; | ||
this.ssid = 1; | ||
this.subs = {}; | ||
this.reconnects = 0; | ||
this.connected = false; | ||
this.wasConnected = false; | ||
this.reconnecting = false; | ||
this.server = null; | ||
this.pending = []; | ||
}; | ||
@@ -613,13 +597,12 @@ | ||
*/ | ||
Client.prototype.close = function() { | ||
this.closed = true; | ||
this.removeAllListeners(); | ||
this.closeStream(); | ||
this.ssid = -1; | ||
this.subs = null; | ||
this.pstate = -1; | ||
this.pongs = null; | ||
this.pending = null; | ||
this.pSize = 0; | ||
this.closed = true; | ||
this.removeAllListeners(); | ||
this.closeStream(); | ||
this.ssid = -1; | ||
this.subs = null; | ||
this.pstate = -1; | ||
this.pongs = null; | ||
this.pending = null; | ||
this.pSize = 0; | ||
}; | ||
@@ -632,16 +615,15 @@ | ||
*/ | ||
Client.prototype.closeStream = function() { | ||
if (this.stream !== null) { | ||
this.stream.end(); | ||
this.stream.destroy(); | ||
this.stream = null; | ||
} | ||
if (this.connected === true || this.closed === true) { | ||
this.pongs = null; | ||
this.pending = null; | ||
this.pSize = 0; | ||
this.connected = false; | ||
} | ||
this.inbound = null; | ||
if (this.stream !== null) { | ||
this.stream.end(); | ||
this.stream.destroy(); | ||
this.stream = null; | ||
} | ||
if (this.connected === true || this.closed === true) { | ||
this.pongs = null; | ||
this.pending = null; | ||
this.pSize = 0; | ||
this.connected = false; | ||
} | ||
this.inbound = null; | ||
}; | ||
@@ -654,44 +636,43 @@ | ||
*/ | ||
Client.prototype.flushPending = function() { | ||
if (this.connected === false || | ||
this.pending === null || | ||
this.pending.length === 0 || | ||
this.infoReceived !== true) { | ||
return; | ||
} | ||
if (this.connected === false || | ||
this.pending === null || | ||
this.pending.length === 0 || | ||
this.infoReceived !== true) { | ||
return; | ||
} | ||
var client = this; | ||
var write = function(data) { | ||
client.pending = []; | ||
client.pSize = 0; | ||
return client.stream.write(data); | ||
}; | ||
if (!this.pBufs) { | ||
// All strings, fastest for now. | ||
return write(this.pending.join(EMPTY)); | ||
} else { | ||
// We have some or all Buffers. Figure out if we can optimize. | ||
var allBufs = true; | ||
for (var i=0; i < this.pending.length; i++){ | ||
if (!Buffer.isBuffer(this.pending[i])) { | ||
allBufs = false; | ||
break; | ||
} | ||
} | ||
// If all buffers, concat together and write once. | ||
if (allBufs) { | ||
return write(Buffer.concat(this.pending, this.pSize)); | ||
var client = this; | ||
var write = function(data) { | ||
client.pending = []; | ||
client.pSize = 0; | ||
return client.stream.write(data); | ||
}; | ||
if (!this.pBufs) { | ||
// All strings, fastest for now. | ||
return write(this.pending.join(EMPTY)); | ||
} else { | ||
// We have a mix, so write each one individually. | ||
var pending = this.pending; | ||
this.pending = []; | ||
this.pSize = 0; | ||
var result = true; | ||
for (i=0; i < pending.length; i++){ | ||
result = this.stream.write(pending[i]) && result; | ||
} | ||
return result; | ||
// We have some or all Buffers. Figure out if we can optimize. | ||
var allBufs = true; | ||
for (var i = 0; i < this.pending.length; i++) { | ||
if (!Buffer.isBuffer(this.pending[i])) { | ||
allBufs = false; | ||
break; | ||
} | ||
} | ||
// If all buffers, concat together and write once. | ||
if (allBufs) { | ||
return write(Buffer.concat(this.pending, this.pSize)); | ||
} else { | ||
// We have a mix, so write each one individually. | ||
var pending = this.pending; | ||
this.pending = []; | ||
this.pSize = 0; | ||
var result = true; | ||
for (i = 0; i < pending.length; i++) { | ||
result = this.stream.write(pending[i]) && result; | ||
} | ||
return result; | ||
} | ||
} | ||
} | ||
}; | ||
@@ -705,13 +686,12 @@ | ||
*/ | ||
Client.prototype.stripPendingSubs = function() { | ||
var pending = this.pending; | ||
this.pending = []; | ||
this.pSize = 0; | ||
for (var i=0; i < pending.length; i++){ | ||
if (!SUBRE.test(pending[i])) { | ||
// Re-queue the command. | ||
this.sendCommand(pending[i]); | ||
var pending = this.pending; | ||
this.pending = []; | ||
this.pSize = 0; | ||
for (var i = 0; i < pending.length; i++) { | ||
if (!SUBRE.test(pending[i])) { | ||
// Re-queue the command. | ||
this.sendCommand(pending[i]); | ||
} | ||
} | ||
} | ||
}; | ||
@@ -724,29 +704,30 @@ | ||
*/ | ||
Client.prototype.sendCommand = function(cmd) { | ||
// Buffer to cut down on system calls, increase throughput. | ||
// When receive gets faster, should make this Buffer based.. | ||
// Buffer to cut down on system calls, increase throughput. | ||
// When receive gets faster, should make this Buffer based.. | ||
if (this.closed || this.pending === null) { return; } | ||
if (this.closed || this.pending === null) { | ||
return; | ||
} | ||
this.pending.push(cmd); | ||
if (!Buffer.isBuffer(cmd)) { | ||
this.pSize += Buffer.byteLength(cmd); | ||
} else { | ||
this.pSize += cmd.length; | ||
this.pBufs = true; | ||
} | ||
this.pending.push(cmd); | ||
if (!Buffer.isBuffer(cmd)) { | ||
this.pSize += Buffer.byteLength(cmd); | ||
} else { | ||
this.pSize += cmd.length; | ||
this.pBufs = true; | ||
} | ||
if (this.connected === true) { | ||
// First one let's setup flush.. | ||
if (this.pending.length === 1) { | ||
var self = this; | ||
setImmediate(function() { | ||
self.flushPending(); | ||
}); | ||
} else if (this.pSize > FLUSH_THRESHOLD) { | ||
// Flush in place when threshold reached.. | ||
this.flushPending(); | ||
if (this.connected === true) { | ||
// First one let's setup flush.. | ||
if (this.pending.length === 1) { | ||
var self = this; | ||
setImmediate(function() { | ||
self.flushPending(); | ||
}); | ||
} else if (this.pSize > FLUSH_THRESHOLD) { | ||
// Flush in place when threshold reached.. | ||
this.flushPending(); | ||
} | ||
} | ||
} | ||
}; | ||
@@ -759,20 +740,19 @@ | ||
*/ | ||
Client.prototype.sendSubscriptions = function() { | ||
var protos = ""; | ||
for (var sid in this.subs) { | ||
if (this.subs.hasOwnProperty(sid)) { | ||
var sub = this.subs[sid]; | ||
var proto; | ||
if (sub.qgroup) { | ||
proto = [SUB, sub.subject, sub.qgroup, sid + CR_LF]; | ||
} else { | ||
proto = [SUB, sub.subject, sid + CR_LF]; | ||
} | ||
protos += proto.join(SPC); | ||
var protos = ""; | ||
for (var sid in this.subs) { | ||
if (this.subs.hasOwnProperty(sid)) { | ||
var sub = this.subs[sid]; | ||
var proto; | ||
if (sub.qgroup) { | ||
proto = [SUB, sub.subject, sub.qgroup, sid + CR_LF]; | ||
} else { | ||
proto = [SUB, sub.subject, sid + CR_LF]; | ||
} | ||
protos += proto.join(SPC); | ||
} | ||
} | ||
} | ||
if (protos.length > 0) { | ||
this.stream.write(protos); | ||
} | ||
if (protos.length > 0) { | ||
this.stream.write(protos); | ||
} | ||
}; | ||
@@ -785,205 +765,210 @@ | ||
*/ | ||
Client.prototype.processInbound = function() { | ||
var client = this; | ||
var client = this; | ||
// Hold any regex matches. | ||
var m; | ||
// Hold any regex matches. | ||
var m; | ||
// For optional yield | ||
var start; | ||
// For optional yield | ||
var start; | ||
if(! client.stream) { | ||
// if we are here, the stream was reaped and errors raised | ||
// if we continue. | ||
return; | ||
} | ||
// unpause if needed. | ||
// FIXME(dlc) client.stream.isPaused() causes 0.10 to fail | ||
client.stream.resume(); | ||
if (!client.stream) { | ||
// if we are here, the stream was reaped and errors raised | ||
// if we continue. | ||
return; | ||
} | ||
// unpause if needed. | ||
// FIXME(dlc) client.stream.isPaused() causes 0.10 to fail | ||
client.stream.resume(); | ||
/* jshint -W083 */ | ||
/* jshint -W083 */ | ||
if (client.options.yieldTime !== undefined) { | ||
start = Date.now(); | ||
} | ||
if (client.options.yieldTime !== undefined) { | ||
start = Date.now(); | ||
} | ||
while (!client.closed && client.inbound && client.inbound.length > 0) { | ||
switch (client.pstate) { | ||
while (!client.closed && client.inbound && client.inbound.length > 0) { | ||
switch (client.pstate) { | ||
case AWAITING_CONTROL: | ||
// Regex only works on strings, so convert once to be more efficient. | ||
// Long term answer is a hand rolled parser, not regex. | ||
var buf = client.inbound.toString('binary', 0, MAX_CONTROL_LINE_SIZE); | ||
if ((m = MSG.exec(buf)) !== null) { | ||
client.payload = { | ||
subj : m[1], | ||
sid : parseInt(m[2], 10), | ||
reply : m[4], | ||
size : parseInt(m[5], 10) | ||
}; | ||
client.payload.psize = client.payload.size + CR_LF_LEN; | ||
client.pstate = AWAITING_MSG_PAYLOAD; | ||
} else if ((m = OK.exec(buf)) !== null) { | ||
// Ignore for now.. | ||
} else if ((m = ERR.exec(buf)) !== null) { | ||
client.processErr(m[1]); | ||
return; | ||
} else if ((m = PONG.exec(buf)) !== null) { | ||
var cb = client.pongs && client.pongs.shift(); | ||
if (cb) { cb(); } // FIXME: Should we check for exceptions? | ||
} else if ((m = PING.exec(buf)) !== null) { | ||
client.sendCommand(PONG_RESPONSE); | ||
} else if ((m = INFO.exec(buf)) !== null) { | ||
client.info = JSON.parse(m[1]); | ||
// Check on TLS mismatch. | ||
if (client.checkTLSMismatch() === true) { | ||
return; | ||
} | ||
case AWAITING_CONTROL: | ||
// Regex only works on strings, so convert once to be more efficient. | ||
// Long term answer is a hand rolled parser, not regex. | ||
var buf = client.inbound.toString('binary', 0, MAX_CONTROL_LINE_SIZE); | ||
if ((m = MSG.exec(buf)) !== null) { | ||
client.payload = { | ||
subj: m[1], | ||
sid: parseInt(m[2], 10), | ||
reply: m[4], | ||
size: parseInt(m[5], 10) | ||
}; | ||
client.payload.psize = client.payload.size + CR_LF_LEN; | ||
client.pstate = AWAITING_MSG_PAYLOAD; | ||
} else if ((m = OK.exec(buf)) !== null) { | ||
// Ignore for now.. | ||
} else if ((m = ERR.exec(buf)) !== null) { | ||
client.processErr(m[1]); | ||
return; | ||
} else if ((m = PONG.exec(buf)) !== null) { | ||
var cb = client.pongs && client.pongs.shift(); | ||
if (cb) { | ||
cb(); | ||
} // FIXME: Should we check for exceptions? | ||
} else if ((m = PING.exec(buf)) !== null) { | ||
client.sendCommand(PONG_RESPONSE); | ||
} else if ((m = INFO.exec(buf)) !== null) { | ||
client.info = JSON.parse(m[1]); | ||
// Check on TLS mismatch. | ||
if (client.checkTLSMismatch() === true) { | ||
return; | ||
} | ||
// Always try to read the connect_urls from info | ||
if(client.info.connect_urls && client.info.connect_urls.length > 0) { | ||
// don't add duplicates | ||
var known = []; | ||
client.servers.forEach(function(server) { | ||
known.push(server.url.href); | ||
}); | ||
// add new ones | ||
var toAdd = []; | ||
client.info.connect_urls.forEach(function(server) { | ||
var u = 'nats://' + server; | ||
if(known.indexOf(u) === -1) { | ||
toAdd.push(new Server(url.parse(u))); | ||
} | ||
}); | ||
// Always try to read the connect_urls from info | ||
if (client.info.connect_urls && client.info.connect_urls.length > 0) { | ||
// don't add duplicates | ||
var known = []; | ||
client.servers.forEach(function(server) { | ||
known.push(server.url.href); | ||
}); | ||
// add new ones | ||
var toAdd = []; | ||
client.info.connect_urls.forEach(function(server) { | ||
var u = 'nats://' + server; | ||
if (known.indexOf(u) === -1) { | ||
toAdd.push(new Server(url.parse(u))); | ||
} | ||
}); | ||
if(toAdd.length > 0) { | ||
if(client.options.noRandomize !== true) { | ||
shuffle(toAdd); | ||
} | ||
toAdd.forEach(function(s) { | ||
client.servers.push(s); | ||
}); | ||
} | ||
} | ||
if (toAdd.length > 0) { | ||
if (client.options.noRandomize !== true) { | ||
shuffle(toAdd); | ||
} | ||
toAdd.forEach(function(s) { | ||
client.servers.push(s); | ||
}); | ||
} | ||
} | ||
// Process first INFO | ||
if (client.infoReceived === false) { | ||
// Switch over to TLS as needed. | ||
if (client.options.tls !== false && | ||
client.stream.encrypted !== true) { | ||
var tlsOpts = {socket: client.stream}; | ||
if ('object' === typeof client.options.tls) { | ||
for (var key in client.options.tls) { | ||
tlsOpts[key] = client.options.tls[key]; | ||
} | ||
} | ||
// if we have a stream, this is from an old connection, reap it | ||
if(client.stream) { | ||
client.stream.removeAllListeners(); | ||
client.stream.end(); | ||
} | ||
client.stream = tls.connect(tlsOpts, function() { | ||
client.flushPending(); | ||
}); | ||
client.setupHandlers(); | ||
} | ||
// Process first INFO | ||
if (client.infoReceived === false) { | ||
// Switch over to TLS as needed. | ||
if (client.options.tls !== false && | ||
client.stream.encrypted !== true) { | ||
var tlsOpts = { | ||
socket: client.stream | ||
}; | ||
if ('object' === typeof client.options.tls) { | ||
for (var key in client.options.tls) { | ||
tlsOpts[key] = client.options.tls[key]; | ||
} | ||
} | ||
// if we have a stream, this is from an old connection, reap it | ||
if (client.stream) { | ||
client.stream.removeAllListeners(); | ||
client.stream.end(); | ||
} | ||
client.stream = tls.connect(tlsOpts, function() { | ||
client.flushPending(); | ||
}); | ||
client.setupHandlers(); | ||
} | ||
// Send the connect message and subscriptions immediately | ||
client.sendConnect(); | ||
client.sendSubscriptions(); | ||
// Send the connect message and subscriptions immediately | ||
client.sendConnect(); | ||
client.sendSubscriptions(); | ||
client.pongs.unshift(function() { client.connectCB(); }); | ||
client.stream.write(PING_REQUEST); | ||
client.pongs.unshift(function() { | ||
client.connectCB(); | ||
}); | ||
client.stream.write(PING_REQUEST); | ||
// Mark as received | ||
client.infoReceived = true; | ||
client.stripPendingSubs(); | ||
client.flushPending(); | ||
} | ||
} else { | ||
// FIXME, check line length for something weird. | ||
// Nothing here yet, return | ||
return; | ||
} | ||
break; | ||
// Mark as received | ||
client.infoReceived = true; | ||
client.stripPendingSubs(); | ||
client.flushPending(); | ||
} | ||
} else { | ||
// FIXME, check line length for something weird. | ||
// Nothing here yet, return | ||
return; | ||
} | ||
break; | ||
case AWAITING_MSG_PAYLOAD: | ||
case AWAITING_MSG_PAYLOAD: | ||
// If we do not have the complete message, hold onto the chunks | ||
// and assemble when we have all we need. This optimizes for | ||
// when we parse a large buffer down to a small number of bytes, | ||
// then we receive a large chunk. This avoids a big copy with a | ||
// simple concat above. | ||
if (client.inbound.length < client.payload.psize) { | ||
if (undefined === client.payload.chunks) { | ||
client.payload.chunks = []; | ||
} | ||
client.payload.chunks.push(client.inbound); | ||
client.payload.psize -= client.inbound.length; | ||
client.inbound = null; | ||
return; | ||
} | ||
// If we do not have the complete message, hold onto the chunks | ||
// and assemble when we have all we need. This optimizes for | ||
// when we parse a large buffer down to a small number of bytes, | ||
// then we receive a large chunk. This avoids a big copy with a | ||
// simple concat above. | ||
if (client.inbound.length < client.payload.psize) { | ||
if (undefined === client.payload.chunks) { | ||
client.payload.chunks = []; | ||
} | ||
client.payload.chunks.push(client.inbound); | ||
client.payload.psize -= client.inbound.length; | ||
client.inbound = null; | ||
return; | ||
} | ||
// If we are here we have the complete message. | ||
// Check to see if we have existing chunks | ||
if (client.payload.chunks) { | ||
// If we are here we have the complete message. | ||
// Check to see if we have existing chunks | ||
if (client.payload.chunks) { | ||
client.payload.chunks.push(client.inbound.slice(0, client.payload.psize)); | ||
// don't append trailing control characters | ||
var mbuf = Buffer.concat(client.payload.chunks, client.payload.size); | ||
if (client.options.preserveBuffers) { | ||
client.payload.msg = mbuf; | ||
} else { | ||
client.payload.msg = mbuf.toString(client.encoding); | ||
} | ||
client.payload.chunks.push(client.inbound.slice(0, client.payload.psize)); | ||
// don't append trailing control characters | ||
var mbuf = Buffer.concat(client.payload.chunks, client.payload.size); | ||
} else { | ||
if (client.options.preserveBuffers) { | ||
client.payload.msg = mbuf; | ||
} else { | ||
client.payload.msg = mbuf.toString(client.encoding); | ||
} | ||
if (client.options.preserveBuffers) { | ||
client.payload.msg = client.inbound.slice(0, client.payload.size); | ||
} else { | ||
client.payload.msg = client.inbound.toString(client.encoding, 0, client.payload.size); | ||
} | ||
} else { | ||
} | ||
if (client.options.preserveBuffers) { | ||
client.payload.msg = client.inbound.slice(0, client.payload.size); | ||
} else { | ||
client.payload.msg = client.inbound.toString(client.encoding, 0, client.payload.size); | ||
} | ||
// Eat the size of the inbound that represents the message. | ||
if (client.inbound.length === client.payload.psize) { | ||
client.inbound = null; | ||
} else { | ||
client.inbound = client.inbound.slice(client.payload.psize); | ||
} | ||
} | ||
// process the message | ||
client.processMsg(); | ||
// Eat the size of the inbound that represents the message. | ||
if (client.inbound.length === client.payload.psize) { | ||
client.inbound = null; | ||
} else { | ||
client.inbound = client.inbound.slice(client.payload.psize); | ||
} | ||
// Reset | ||
client.pstate = AWAITING_CONTROL; | ||
client.payload = null; | ||
// process the message | ||
client.processMsg(); | ||
// Check to see if we have an option to yield for other events after yieldTime. | ||
if (start !== undefined) { | ||
if ((Date.now() - start) > client.options.yieldTime) { | ||
client.stream.pause(); | ||
setImmediate(client.processInbound.bind(this)); | ||
return; | ||
} | ||
} | ||
break; | ||
} | ||
// Reset | ||
client.pstate = AWAITING_CONTROL; | ||
client.payload = null; | ||
// This is applicable for a regex match to eat the bytes we used from a control line. | ||
if (m && !this.closed) { | ||
// Chop inbound | ||
var psize = m[0].length; | ||
if (psize >= client.inbound.length) { | ||
client.inbound = null; | ||
} else { | ||
client.inbound = client.inbound.slice(psize); | ||
} | ||
// Check to see if we have an option to yield for other events after yieldTime. | ||
if (start !== undefined) { | ||
if ((Date.now() - start) > client.options.yieldTime) { | ||
client.stream.pause(); | ||
setImmediate(client.processInbound.bind(this)); | ||
return; | ||
} | ||
} | ||
break; | ||
} | ||
// This is applicable for a regex match to eat the bytes we used from a control line. | ||
if (m && !this.closed) { | ||
// Chop inbound | ||
var psize = m[0].length; | ||
if (psize >= client.inbound.length) { | ||
client.inbound = null; | ||
} else { | ||
client.inbound = client.inbound.slice(psize); | ||
} | ||
} | ||
m = null; | ||
} | ||
m = null; | ||
} | ||
}; | ||
@@ -996,41 +981,40 @@ | ||
*/ | ||
Client.prototype.processMsg = function() { | ||
var sub = this.subs[this.payload.sid]; | ||
if (sub !== undefined) { | ||
sub.received += 1; | ||
// Check for a timeout, and cancel if received >= expected | ||
if (sub.timeout) { | ||
if (sub.received >= sub.expected) { | ||
clearTimeout(sub.timeout); | ||
sub.timeout = null; | ||
} | ||
} | ||
// Check for auto-unsubscribe | ||
if (sub.max !== undefined) { | ||
if (sub.received === sub.max) { | ||
delete this.subs[this.payload.sid]; | ||
this.emit('unsubscribe', this.payload.sid, sub.subject); | ||
} else if (sub.received > sub.max) { | ||
this.unsubscribe(this.payload.sid); | ||
sub.callback = null; | ||
} | ||
} | ||
var sub = this.subs[this.payload.sid]; | ||
if (sub !== undefined) { | ||
sub.received += 1; | ||
// Check for a timeout, and cancel if received >= expected | ||
if (sub.timeout) { | ||
if (sub.received >= sub.expected) { | ||
clearTimeout(sub.timeout); | ||
sub.timeout = null; | ||
} | ||
} | ||
// Check for auto-unsubscribe | ||
if (sub.max !== undefined) { | ||
if (sub.received === sub.max) { | ||
delete this.subs[this.payload.sid]; | ||
this.emit('unsubscribe', this.payload.sid, sub.subject); | ||
} else if (sub.received > sub.max) { | ||
this.unsubscribe(this.payload.sid); | ||
sub.callback = null; | ||
} | ||
} | ||
if (sub.callback) { | ||
var msg = this.payload.msg; | ||
if (this.options.json) { | ||
try { | ||
if (this.options.preserveBuffers) { | ||
msg = JSON.parse(this.payload.msg.toString()); | ||
} else { | ||
msg = JSON.parse(this.payload.msg.toString(this.options.encoding)); | ||
} | ||
} catch (e) { | ||
msg = e; | ||
if (sub.callback) { | ||
var msg = this.payload.msg; | ||
if (this.options.json) { | ||
try { | ||
if (this.options.preserveBuffers) { | ||
msg = JSON.parse(this.payload.msg.toString()); | ||
} else { | ||
msg = JSON.parse(this.payload.msg.toString(this.options.encoding)); | ||
} | ||
} catch (e) { | ||
msg = e; | ||
} | ||
} | ||
sub.callback(msg, this.payload.reply, this.payload.subj, this.payload.sid); | ||
} | ||
} | ||
sub.callback(msg, this.payload.reply, this.payload.subj, this.payload.sid); | ||
} | ||
} | ||
}; | ||
@@ -1047,9 +1031,9 @@ | ||
var m = s ? s.toLowerCase() : ''; | ||
if(m.indexOf(STALE_CONNECTION_ERR) !== -1) { | ||
this.scheduleReconnect(); | ||
} else if(m.indexOf(PERMISSIONS_ERR) !== -1) { | ||
this.emit('permission_error', new NatsError(s, NATS_PROTOCOL_ERR)); | ||
if (m.indexOf(STALE_CONNECTION_ERR) !== -1) { | ||
this.scheduleReconnect(); | ||
} else if (m.indexOf(PERMISSIONS_ERR) !== -1) { | ||
this.emit('permission_error', new NatsError(s, NATS_PROTOCOL_ERR)); | ||
} else { | ||
this.emit('error', new NatsError(s, NATS_PROTOCOL_ERR)); | ||
this.closeStream(); | ||
this.emit('error', new NatsError(s, NATS_PROTOCOL_ERR)); | ||
this.closeStream(); | ||
} | ||
@@ -1063,10 +1047,9 @@ }; | ||
* @api public | ||
*/ | ||
*/ | ||
Client.prototype.addServer = function(uri) { | ||
this.servers.push(new Server(url.parse(uri))); | ||
this.servers.push(new Server(url.parse(uri))); | ||
if (this.options.noRandomize !== true) { | ||
shuffle(this.servers); | ||
} | ||
if (this.options.noRandomize !== true) { | ||
shuffle(this.servers); | ||
} | ||
}; | ||
@@ -1081,17 +1064,16 @@ | ||
*/ | ||
Client.prototype.flush = function(opt_callback) { | ||
if (this.closed) { | ||
if (typeof opt_callback === 'function') { | ||
opt_callback(new NatsError(CONN_CLOSED_MSG, CONN_CLOSED)); | ||
return; | ||
} else { | ||
throw(new NatsError(CONN_CLOSED_MSG, CONN_CLOSED)); | ||
if (this.closed) { | ||
if (typeof opt_callback === 'function') { | ||
opt_callback(new NatsError(CONN_CLOSED_MSG, CONN_CLOSED)); | ||
return; | ||
} else { | ||
throw (new NatsError(CONN_CLOSED_MSG, CONN_CLOSED)); | ||
} | ||
} | ||
} | ||
if (this.pongs) { | ||
this.pongs.push(opt_callback); | ||
this.sendCommand(PING_REQUEST); | ||
this.flushPending(); | ||
} | ||
if (this.pongs) { | ||
this.pongs.push(opt_callback); | ||
this.sendCommand(PING_REQUEST); | ||
this.flushPending(); | ||
} | ||
}; | ||
@@ -1108,70 +1090,71 @@ | ||
*/ | ||
Client.prototype.publish = function(subject, msg, opt_reply, opt_callback) { | ||
// They only supplied a callback function. | ||
if (typeof subject === 'function') { | ||
opt_callback = subject; | ||
subject = undefined; | ||
} | ||
if (!msg) { msg = EMPTY; } | ||
if (!subject) { | ||
if (opt_callback) { | ||
opt_callback(new NatsError(BAD_SUBJECT_MSG, BAD_SUBJECT)); | ||
} else { | ||
throw(new NatsError(BAD_SUBJECT_MSG, BAD_SUBJECT)); | ||
// They only supplied a callback function. | ||
if (typeof subject === 'function') { | ||
opt_callback = subject; | ||
subject = undefined; | ||
} | ||
} | ||
if (typeof msg === 'function') { | ||
if (opt_callback || opt_reply) { | ||
opt_callback(new NatsError(BAD_MSG_MSG, BAD_MSG)); | ||
return; | ||
if (!msg) { | ||
msg = EMPTY; | ||
} | ||
opt_callback = msg; | ||
msg = EMPTY; | ||
opt_reply = undefined; | ||
} | ||
if (typeof opt_reply === 'function') { | ||
if (opt_callback) { | ||
opt_callback(new NatsError(BAD_REPLY_MSG, BAD_REPLY)); | ||
return; | ||
if (!subject) { | ||
if (opt_callback) { | ||
opt_callback(new NatsError(BAD_SUBJECT_MSG, BAD_SUBJECT)); | ||
} else { | ||
throw (new NatsError(BAD_SUBJECT_MSG, BAD_SUBJECT)); | ||
} | ||
} | ||
opt_callback = opt_reply; | ||
opt_reply = undefined; | ||
} | ||
if (typeof msg === 'function') { | ||
if (opt_callback || opt_reply) { | ||
opt_callback(new NatsError(BAD_MSG_MSG, BAD_MSG)); | ||
return; | ||
} | ||
opt_callback = msg; | ||
msg = EMPTY; | ||
opt_reply = undefined; | ||
} | ||
if (typeof opt_reply === 'function') { | ||
if (opt_callback) { | ||
opt_callback(new NatsError(BAD_REPLY_MSG, BAD_REPLY)); | ||
return; | ||
} | ||
opt_callback = opt_reply; | ||
opt_reply = undefined; | ||
} | ||
// Hold PUB SUB [REPLY] | ||
var psub; | ||
if (opt_reply === undefined) { | ||
psub = 'PUB ' + subject + SPC; | ||
} else { | ||
psub = 'PUB ' + subject + SPC + opt_reply + SPC; | ||
} | ||
// Hold PUB SUB [REPLY] | ||
var psub; | ||
if (opt_reply === undefined) { | ||
psub = 'PUB ' + subject + SPC; | ||
} else { | ||
psub = 'PUB ' + subject + SPC + opt_reply + SPC; | ||
} | ||
// Need to treat sending buffers different. | ||
if (!Buffer.isBuffer(msg)) { | ||
var str = msg; | ||
if (this.options.json) { | ||
if (typeof msg !== 'object') { | ||
throw(new NatsError(BAD_JSON_MSG, BAD_JSON)); | ||
} | ||
try { | ||
str = JSON.stringify(msg); | ||
} catch (e) { | ||
throw(new NatsError(BAD_JSON_MSG, BAD_JSON)); | ||
} | ||
// Need to treat sending buffers different. | ||
if (!Buffer.isBuffer(msg)) { | ||
var str = msg; | ||
if (this.options.json) { | ||
if (typeof msg !== 'object') { | ||
throw (new NatsError(BAD_JSON_MSG, BAD_JSON)); | ||
} | ||
try { | ||
str = JSON.stringify(msg); | ||
} catch (e) { | ||
throw (new NatsError(BAD_JSON_MSG, BAD_JSON)); | ||
} | ||
} | ||
this.sendCommand(psub + Buffer.byteLength(str) + CR_LF + str + CR_LF); | ||
} else { | ||
var b = new Buffer(psub.length + msg.length + (2 * CR_LF_LEN) + msg.length.toString().length); | ||
var len = b.write(psub + msg.length + CR_LF); | ||
msg.copy(b, len); | ||
b.write(CR_LF, len + msg.length); | ||
this.sendCommand(b); | ||
} | ||
this.sendCommand(psub + Buffer.byteLength(str) + CR_LF + str + CR_LF); | ||
} else { | ||
var b = new Buffer(psub.length + msg.length + (2 * CR_LF_LEN) + msg.length.toString().length); | ||
var len = b.write(psub + msg.length + CR_LF); | ||
msg.copy(b, len); | ||
b.write(CR_LF, len + msg.length); | ||
this.sendCommand(b); | ||
} | ||
if (opt_callback !== undefined) { | ||
this.flush(opt_callback); | ||
} else if (this.closed) { | ||
throw(new NatsError(CONN_CLOSED_MSG, CONN_CLOSED)); | ||
} | ||
if (opt_callback !== undefined) { | ||
this.flush(opt_callback); | ||
} else if (this.closed) { | ||
throw (new NatsError(CONN_CLOSED_MSG, CONN_CLOSED)); | ||
} | ||
}; | ||
@@ -1189,34 +1172,37 @@ | ||
*/ | ||
Client.prototype.subscribe = function(subject, opts, callback) { | ||
if (this.closed) { | ||
throw(new NatsError(CONN_CLOSED_MSG, CONN_CLOSED)); | ||
} | ||
var qgroup, max; | ||
if (typeof opts === 'function') { | ||
callback = opts; | ||
opts = undefined; | ||
} else if (opts && typeof opts === 'object') { | ||
// FIXME, check exists, error otherwise.. | ||
qgroup = opts.queue; | ||
max = opts.max; | ||
} | ||
this.ssid += 1; | ||
this.subs[this.ssid] = { 'subject':subject, 'callback':callback, 'received':0 }; | ||
if (this.closed) { | ||
throw (new NatsError(CONN_CLOSED_MSG, CONN_CLOSED)); | ||
} | ||
var qgroup, max; | ||
if (typeof opts === 'function') { | ||
callback = opts; | ||
opts = undefined; | ||
} else if (opts && typeof opts === 'object') { | ||
// FIXME, check exists, error otherwise.. | ||
qgroup = opts.queue; | ||
max = opts.max; | ||
} | ||
this.ssid += 1; | ||
this.subs[this.ssid] = { | ||
'subject': subject, | ||
'callback': callback, | ||
'received': 0 | ||
}; | ||
var proto; | ||
if (typeof qgroup === 'string') { | ||
this.subs[this.ssid].qgroup = qgroup; | ||
proto = [SUB, subject, qgroup, this.ssid + CR_LF]; | ||
} else { | ||
proto = [SUB, subject, this.ssid + CR_LF]; | ||
} | ||
var proto; | ||
if (typeof qgroup === 'string') { | ||
this.subs[this.ssid].qgroup = qgroup; | ||
proto = [SUB, subject, qgroup, this.ssid + CR_LF]; | ||
} else { | ||
proto = [SUB, subject, this.ssid + CR_LF]; | ||
} | ||
this.sendCommand(proto.join(SPC)); | ||
this.emit('subscribe', this.ssid, subject, opts); | ||
this.sendCommand(proto.join(SPC)); | ||
this.emit('subscribe', this.ssid, subject, opts); | ||
if (max) { | ||
this.unsubscribe(this.ssid, max); | ||
} | ||
return this.ssid; | ||
if (max) { | ||
this.unsubscribe(this.ssid, max); | ||
} | ||
return this.ssid; | ||
}; | ||
@@ -1233,28 +1219,29 @@ | ||
*/ | ||
Client.prototype.unsubscribe = function(sid, opt_max) { | ||
if (!sid || this.closed) { return; } | ||
if (!sid || this.closed) { | ||
return; | ||
} | ||
var proto; | ||
if (opt_max) { | ||
proto = [UNSUB, sid, opt_max + CR_LF]; | ||
} else { | ||
proto = [UNSUB, sid + CR_LF]; | ||
} | ||
this.sendCommand(proto.join(SPC)); | ||
var proto; | ||
if (opt_max) { | ||
proto = [UNSUB, sid, opt_max + CR_LF]; | ||
} else { | ||
proto = [UNSUB, sid + CR_LF]; | ||
} | ||
this.sendCommand(proto.join(SPC)); | ||
var sub = this.subs[sid]; | ||
if (sub === undefined) { | ||
return; | ||
} | ||
sub.max = opt_max; | ||
if (sub.max === undefined || (sub.received >= sub.max)) { | ||
// remove any timeouts that may be pending | ||
if (sub.timeout) { | ||
clearTimeout(sub.timeout); | ||
sub.timeout = null; | ||
var sub = this.subs[sid]; | ||
if (sub === undefined) { | ||
return; | ||
} | ||
delete this.subs[sid]; | ||
this.emit('unsubscribe', sid, sub.subject); | ||
} | ||
sub.max = opt_max; | ||
if (sub.max === undefined || (sub.received >= sub.max)) { | ||
// remove any timeouts that may be pending | ||
if (sub.timeout) { | ||
clearTimeout(sub.timeout); | ||
sub.timeout = null; | ||
} | ||
delete this.subs[sid]; | ||
this.emit('unsubscribe', sid, sub.subject); | ||
} | ||
}; | ||
@@ -1272,12 +1259,16 @@ | ||
Client.prototype.timeout = function(sid, timeout, expected, callback) { | ||
if (!sid) { return; } | ||
var sub = this.subs[sid]; | ||
if (sub === null) { return; } | ||
sub.expected = expected; | ||
var that = this; | ||
sub.timeout = setTimeout(function() { | ||
callback(sid); | ||
// if callback fails unsubscribe will leak | ||
that.unsubscribe(sid); | ||
}, timeout); | ||
if (!sid) { | ||
return; | ||
} | ||
var sub = this.subs[sid]; | ||
if (sub === null) { | ||
return; | ||
} | ||
sub.expected = expected; | ||
var that = this; | ||
sub.timeout = setTimeout(function() { | ||
callback(sid); | ||
// if callback fails unsubscribe will leak | ||
that.unsubscribe(sid); | ||
}, timeout); | ||
}; | ||
@@ -1300,17 +1291,17 @@ | ||
Client.prototype.request = function(subject, opt_msg, opt_options, callback) { | ||
if (typeof opt_msg === 'function') { | ||
callback = opt_msg; | ||
opt_msg = EMPTY; | ||
opt_options = null; | ||
} | ||
if (typeof opt_options === 'function') { | ||
callback = opt_options; | ||
opt_options = null; | ||
} | ||
var inbox = createInbox(); | ||
var s = this.subscribe(inbox, opt_options, function(msg, reply) { | ||
callback(msg, reply); | ||
}); | ||
this.publish(subject, opt_msg, inbox); | ||
return s; | ||
if (typeof opt_msg === 'function') { | ||
callback = opt_msg; | ||
opt_msg = EMPTY; | ||
opt_options = null; | ||
} | ||
if (typeof opt_options === 'function') { | ||
callback = opt_options; | ||
opt_options = null; | ||
} | ||
var inbox = createInbox(); | ||
var s = this.subscribe(inbox, opt_options, function(msg, reply) { | ||
callback(msg, reply); | ||
}); | ||
this.publish(subject, opt_msg, inbox); | ||
return s; | ||
}; | ||
@@ -1337,9 +1328,9 @@ | ||
Client.prototype.requestOne = function(subject, opt_msg, opt_options, timeout, callback) { | ||
opt_options = opt_options || {}; | ||
opt_options.max = 1; | ||
var sid = this.request(subject, opt_msg, opt_options, callback); | ||
this.timeout(sid, timeout, 1, function() { | ||
callback(new NatsError(REQ_TIMEOUT_MSG_PREFIX + sid, REQ_TIMEOUT)); | ||
}); | ||
return sid; | ||
opt_options = opt_options || {}; | ||
opt_options.max = 1; | ||
var sid = this.request(subject, opt_msg, opt_options, callback); | ||
this.timeout(sid, timeout, 1, function() { | ||
callback(new NatsError(REQ_TIMEOUT_MSG_PREFIX + sid, REQ_TIMEOUT)); | ||
}); | ||
return sid; | ||
}; | ||
@@ -1353,5 +1344,4 @@ | ||
*/ | ||
Client.prototype.numSubscriptions = function() { | ||
return Object.keys(this.subs).length; | ||
return Object.keys(this.subs).length; | ||
}; | ||
@@ -1364,10 +1354,11 @@ | ||
*/ | ||
Client.prototype.reconnect = function() { | ||
if (this.closed) { return; } | ||
this.reconnects += 1; | ||
this.createConnection(); | ||
if (this.currentServer.didConnect === true) { | ||
this.emit('reconnecting'); | ||
} | ||
if (this.closed) { | ||
return; | ||
} | ||
this.reconnects += 1; | ||
this.createConnection(); | ||
if (this.currentServer.didConnect === true) { | ||
this.emit('reconnecting'); | ||
} | ||
}; | ||
@@ -1380,20 +1371,21 @@ | ||
*/ | ||
Client.prototype.scheduleReconnect = function() { | ||
var client = this; | ||
// Just return if no more servers | ||
if (client.servers.length === 0) { | ||
return; | ||
} | ||
// Don't set reconnecting state if we are just trying | ||
// for the first time. | ||
if (client.wasConnected === true) { | ||
client.reconnecting = true; | ||
} | ||
// Only stall if we have connected before. | ||
var wait = 0; | ||
if (client.servers[0].didConnect === true) { | ||
wait = this.options.reconnectTimeWait; | ||
} | ||
setTimeout(function() { client.reconnect(); }, wait); | ||
var client = this; | ||
// Just return if no more servers | ||
if (client.servers.length === 0) { | ||
return; | ||
} | ||
// Don't set reconnecting state if we are just trying | ||
// for the first time. | ||
if (client.wasConnected === true) { | ||
client.reconnecting = true; | ||
} | ||
// Only stall if we have connected before. | ||
var wait = 0; | ||
if (client.servers[0].didConnect === true) { | ||
wait = this.options.reconnectTimeWait; | ||
} | ||
setTimeout(function() { | ||
client.reconnect(); | ||
}, wait); | ||
}; |
{ | ||
"name": "nats", | ||
"version": "0.7.20", | ||
"version": "0.7.24", | ||
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", | ||
@@ -32,3 +32,2 @@ "keywords": [ | ||
"scripts": { | ||
"lint": "jshint --reporter node_modules/jshint-stylish lib test examples", | ||
"depcheck": "dependency-check . lib/*", | ||
@@ -39,3 +38,5 @@ "depcheck:unused": "dependency-check ./package.json --unused --no-dev lib/*", | ||
"coveralls": "npm run cover -- --report lcovonly && cat ./reports/coverage/lcov.info | coveralls", | ||
"cover": "istanbul cover _mocha" | ||
"cover": "istanbul cover _mocha", | ||
"lint": "if-node-version >=4 eslint ./lib ./examples ./benchmark ./test", | ||
"fmt": "js-beautify -n --config crockford.jscsrc -r lib/* test/*.js test/support/*.js examples/* benchmark/*.js" | ||
}, | ||
@@ -51,3 +52,6 @@ "engines": { | ||
"dependency-check": "2.5.x", | ||
"eslint": "^3.19.0", | ||
"if-node-version": "^1.1.1", | ||
"istanbul": "0.4.x", | ||
"js-beautify": "^1.6.12", | ||
"jshint": "2.9.x", | ||
@@ -54,0 +58,0 @@ "jshint-stylish": "2.2.x", |
@@ -222,6 +222,50 @@ # NATS - Node.js Client | ||
// Reconnect Attempts and Time between reconnects | ||
// By default a NATS connection will try to reconnect to a server 10 times | ||
// waiting 2 seconds between reconnect attempts. If the maximum number of | ||
// retries is reached, the client will close the connection. | ||
// To change the default behaviour specify the max number of connection | ||
// attempts in `maxReconnectAttempts` (set to -1 to retry forever), and the | ||
// time in milliseconds between reconnects in `reconnectTimeWait`. | ||
nc = nats.connect({'maxReconnectAttempts': -1, 'reconnectTimeWait': 250}); | ||
nc.on('error', function(err) { | ||
console.log(err); | ||
}); | ||
nc.on('connect', function(nc) { | ||
console.log('connected'); | ||
}); | ||
nc.on('disconnect', function() { | ||
console.log('disconnect'); | ||
}); | ||
nc.on('reconnecting', function() { | ||
console.log('reconnecting'); | ||
}); | ||
nc.on('reconnect', function(nc) { | ||
console.log('reconnect'); | ||
}); | ||
nc.on('close', function() { | ||
console.log('close'); | ||
}); | ||
``` | ||
See examples and benchmarks for more information.. | ||
See examples and benchmarks for more information. | ||
## Supported Node Versions | ||
Support policy for Nodejs versions follows | ||
[Nodejs release support]( https://github.com/nodejs/Release). | ||
We will support and build node-nats on even Nodejs versions that are current | ||
or in maintenance. | ||
## License | ||
@@ -228,0 +272,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
56390
1359
294
12