Comparing version 0.2.9 to 0.3.0
199
lib/nats.js
/*! | ||
* Nats | ||
* Copyright(c) 2012-2015 Apcera Inc. All rights reserved. | ||
* Copyright(c) 2011-2014 Derek Collison (derek.collison@gmail.com) | ||
@@ -20,3 +21,3 @@ * MIT Licensed | ||
var VERSION = '0.2.9', | ||
var VERSION = '0.3.0', | ||
@@ -152,2 +153,12 @@ DEFAULT_PORT = 4222, | ||
function shuffle(array) { | ||
for (var i = array.length - 1; i > 0; i--) { | ||
var j = Math.floor(Math.random() * (i + 1)); | ||
var temp = array[i]; | ||
array[i] = array[j]; | ||
array[j] = temp; | ||
} | ||
return array; | ||
} | ||
/** | ||
@@ -162,3 +173,2 @@ * Parse the conctructor/connect options. | ||
var options = this.options = { | ||
'url' : DEFAULT_URI, | ||
'verbose' : false, | ||
@@ -170,3 +180,6 @@ 'pedantic' : false, | ||
}; | ||
if ('number' === typeof opts) { | ||
if (undefined === opts) { | ||
options.url = DEFAULT_URI; | ||
} else if ('number' === typeof opts) { | ||
options.url = DEFAULT_PRE + opts; | ||
@@ -190,18 +203,70 @@ } else if ('string' === typeof opts) { | ||
this.assignOption(opts, 'reconnectTimeWait'); | ||
this.assignOption(opts, 'servers'); | ||
this.assignOption(opts, 'urls', 'servers'); | ||
this.assignOption(opts, 'noRandomize'); | ||
this.assignOption(opts, 'NoRandomize', 'noRandomize'); | ||
this.assignOption(opts, 'dontRandomize', 'noRandomize'); | ||
} | ||
options.uri = options.url; | ||
if (options.url !== undefined) { | ||
// Parse the url | ||
this.url = url.parse(options.url); | ||
if ('auth' in this.url && !!this.url.auth) { | ||
var auth = this.url.auth.split(':'); | ||
if (options.user === undefined) { | ||
options.user = auth[0]; | ||
} | ||
if (options.pass === undefined) { | ||
options.pass = auth[1]; | ||
} | ||
var client = this; | ||
// Set user/pass as needed if in options. | ||
client.user = options.user; | ||
client.pass = options.pass; | ||
// For cluster support | ||
client.servers = []; | ||
if (Array.isArray(options.servers)) { | ||
options.servers.forEach(function(server) { | ||
client.servers.push(new Server(url.parse(server))); | ||
}); | ||
} else { | ||
client.servers.push(new Server(url.parse(options.url))); | ||
} | ||
// Randomize if needed | ||
if (options.noRandomize !== true) { | ||
shuffle(client.servers); | ||
} | ||
}; | ||
/** | ||
* Create a new server. | ||
* | ||
* @api private | ||
*/ | ||
function Server(url) { | ||
this.url = url; | ||
this.didConnect = false; | ||
this.reconnects = 0; | ||
} | ||
/** | ||
* Properly select the next server. | ||
* We rotate the server list as we go, | ||
* we also pull auth from urls as needed, or | ||
* if they were set in options use that as override. | ||
* | ||
* @api private | ||
*/ | ||
Client.prototype.selectServer = function() { | ||
var client = this; | ||
var server = client.servers.shift(); | ||
// Place in client context. | ||
client.currentServer = server; | ||
client.url = server.url; | ||
if ('auth' in server.url && !!server.url.auth) { | ||
var auth = server.url.auth.split(':'); | ||
if (client.options.user === undefined) { | ||
client.user = auth[0]; | ||
} | ||
if (client.options.pass === undefined) { | ||
client.pass = auth[1]; | ||
} | ||
} | ||
client.servers.push(server); | ||
}; | ||
@@ -223,2 +288,4 @@ | ||
client.selectServer(); | ||
var stream = client.stream = net.createConnection(client.url.port, client.url.hostname); | ||
@@ -230,2 +297,4 @@ | ||
client.connected = true; | ||
client.wasConnected = true; | ||
client.currentServer.didConnect = true; | ||
client.reconnecting = false; | ||
@@ -242,5 +311,3 @@ client.reconnects = 0; | ||
// FIXME, use close event? | ||
stream.on('close', function(hadError) { | ||
if (hadError) { return; } | ||
client.closeStream(); | ||
@@ -258,17 +325,20 @@ client.emit('disconnect'); | ||
stream.on('error', function(exception) { | ||
client.closeStream(); | ||
// If we were connected just return, close event will process | ||
if (client.wasConnected === true && client.currentServer.didConnect === true) { | ||
return; | ||
} | ||
if (client.reconnecting === false) { | ||
client.emit('error', exception); | ||
// if the current server did not connect at all, and we in | ||
// general have not connected to any server, remove it from | ||
// this list. | ||
if (client.wasConnected === false && client.currentServer.didConnect === false) { | ||
client.servers.splice(client.servers.length-1, 1); | ||
} | ||
client.emit('disconnect'); | ||
if (client.reconnecting === true) { | ||
if (client.closed === true || | ||
client.reconnects >= client.options.maxReconnectAttempts) { | ||
client.emit('close'); | ||
} else { | ||
client.scheduleReconnect(); | ||
} | ||
// Only bubble up error if we never had connected | ||
// to the server and we only have one. | ||
if (client.wasConnected === false && client.servers.length === 0) { | ||
client.emit('error', "Could not connect to server: " + exception); | ||
} | ||
client.closeStream(); | ||
}); | ||
@@ -279,2 +349,4 @@ | ||
// This will switch inbound to a string but seems to perform | ||
// better than Buffer.concat. | ||
client.inbound = client.inbound ? client.inbound + data : data; | ||
@@ -284,4 +356,8 @@ | ||
switch (client.pstate) { | ||
case AWAITING_CONTROL: | ||
if ((m = MSG.exec(client.inbound)) !== null) { | ||
// Regex only works on strings, so convert once to be more efficient. | ||
// Long term answer is hand rolled parser, not regex. | ||
var buf = client.inbound.toString('binary', 0, MAX_CONTROL_LINE_SIZE); | ||
if ((m = MSG.exec(buf)) !== null) { | ||
client.payload = { | ||
@@ -294,12 +370,12 @@ subj : m[1], | ||
client.pstate = AWAITING_MSG_PAYLOAD; | ||
} else if ((m = OK.exec(client.inbound)) !== null) { | ||
} else if ((m = OK.exec(buf)) !== null) { | ||
// Ignore for now.. | ||
} else if ((m = ERR.exec(client.inbound)) !== null) { | ||
} else if ((m = ERR.exec(buf)) !== null) { | ||
client.emit('error', m[1]); | ||
} else if ((m = PONG.exec(client.inbound)) !== null) { | ||
} else if ((m = PONG.exec(buf)) !== null) { | ||
var cb = client.pongs.shift(); | ||
if (cb) { cb(); } // FIXME: Should we check for exceptions? | ||
} else if ((m = PING.exec(client.inbound)) !== null) { | ||
} else if ((m = PING.exec(buf)) !== null) { | ||
client.sendCommand(PONG_RESPONSE); | ||
} else if ((m = INFO.exec(client.inbound)) !== null) { | ||
} else if ((m = INFO.exec(buf)) !== null) { | ||
// Ignore for now.. | ||
@@ -319,4 +395,4 @@ } else { | ||
// FIXME, may be inefficient. | ||
client.payload.msg = client.inbound.slice(0, client.payload.size).toString(); | ||
// FIXME(dlc), may be inefficient. | ||
client.payload.msg = client.inbound.toString('utf8', 0, client.payload.size); | ||
@@ -346,6 +422,11 @@ if (client.inbound.length === client.payload.size + CR_LF_LEN) { | ||
// Queue the connect command. | ||
var cs = { 'verbose':this.options.verbose, 'pedantic':this.options.pedantic }; | ||
if (this.options.user !== undefined) { | ||
cs.user = this.options.user; | ||
cs.pass = this.options.pass; | ||
var cs = { | ||
'lang' : 'node', | ||
'version' : VERSION, | ||
'verbose' : this.options.verbose, | ||
'pedantic': this.options.pedantic | ||
}; | ||
if (this.user !== undefined) { | ||
cs.user = this.user; | ||
cs.pass = this.pass; | ||
} | ||
@@ -366,3 +447,5 @@ this.sendCommand(CONNECT + SPC + JSON.stringify(cs) + CR_LF); | ||
this.connected = false; | ||
this.wasConnected = false; | ||
this.reconnecting = false; | ||
this.server = null; | ||
}; | ||
@@ -406,2 +489,3 @@ | ||
} | ||
this.inbound = null; | ||
}; | ||
@@ -554,11 +638,11 @@ | ||
var proto = [PUB, subject]; | ||
var pmsg = [Buffer.byteLength(msg), CR_LF, msg, CR_LF]; | ||
if (opt_reply !== undefined) { | ||
proto.push(opt_reply); | ||
// Hold PUB SUB [REPLY] | ||
var psub; | ||
if (opt_reply === undefined) { | ||
psub = "PUB " + subject + SPC; | ||
} else { | ||
psub = "PUB " + subject + SPC + opt_reply + SPC; | ||
} | ||
this.sendCommand(psub + Buffer.byteLength(msg) + CR_LF + msg + CR_LF); | ||
this.sendCommand(proto.concat(pmsg.join(EMPTY)).join(SPC)); | ||
if (opt_callback !== undefined) { | ||
@@ -692,3 +776,5 @@ this.flush(opt_callback); | ||
this.createConnection(); | ||
this.emit('reconnecting'); | ||
if (this.currentServer.didConnect === true) { | ||
this.emit('reconnecting'); | ||
} | ||
}; | ||
@@ -704,4 +790,17 @@ | ||
var client = this; | ||
client.reconnecting = true; | ||
setTimeout(function() { client.reconnect(); }, this.options.reconnectTimeWait); | ||
// Just return if no more servers | ||
if (client.servers.length === 0) { | ||
return; | ||
} | ||
// Don't set reconnecting state if we are just trying | ||
// for the first time. | ||
if (client.wasConnected === true) { | ||
client.reconnecting = true; | ||
} | ||
// Only stall if we have connected before. | ||
var wait = 0; | ||
if (client.servers[0].didConnect === true) { | ||
wait = this.options.reconnectTimeWait; | ||
} | ||
setTimeout(function() { client.reconnect(); }, wait); | ||
}; |
{ | ||
"name": "nats", | ||
"description": "Node.js client for NATS, a lightweight messaging system", | ||
"version": "0.2.9", | ||
"description": "Node.js client for NATS, a lightweight, high-performance messaging system", | ||
"version": "0.3.0", | ||
"repository": { | ||
"type": "git", | ||
"url": "git@github.com:derekcollison/node-nats.git" | ||
"url": "git@github.com:nats-io/node-nats.git" | ||
}, | ||
"homepage": "https://nats.io", | ||
"author": "Derek Collison <derek.collison@gmail.com>", | ||
"author": "Derek Collison <derek@apcera.com>", | ||
"keywords": [ | ||
@@ -25,4 +25,4 @@ "messaging", | ||
"mocha": "*", | ||
"mocha-lcov-reporter": "^0.0.1", | ||
"should": ">= 3.0.0" | ||
"mocha-lcov-reporter": "^0.0.2", | ||
"should": ">= 7.0.0" | ||
}, | ||
@@ -35,4 +35,4 @@ "main": "index", | ||
"engines": { | ||
"node": ">= 0.8.x <= 0.10.x" | ||
"node": ">= 0.10.x <= 0.12.x" | ||
} | ||
} |
@@ -5,3 +5,4 @@ # NATS - Node.js Client | ||
[![Build Status](https://secure.travis-ci.org/derekcollison/node-nats.png)](http://travis-ci.org/derekcollison/node-nats) [![npm version](https://badge.fury.io/js/nats.svg)](http://badge.fury.io/js/nats)[![Coverage Status](https://img.shields.io/coveralls/derekcollison/node-nats.svg)](https://coveralls.io/r/derekcollison/node-nats?branch=master) | ||
[![License MIT](https://img.shields.io/npm/l/express.svg)](http://opensource.org/licenses/MIT) | ||
[![Build Status](https://secure.travis-ci.org/nats-io/node-nats.png)](http://travis-ci.org/nats-io/node-nats) [![npm version](https://badge.fury.io/js/nats.svg)](http://badge.fury.io/js/nats)[![Coverage Status](https://coveralls.io/repos/nats-io/node-nats/badge.svg)](https://coveralls.io/r/nats-io/node-nats?branch=master) | ||
@@ -82,3 +83,20 @@ ## Installation | ||
``` | ||
## Clustered Usage | ||
```javascript | ||
var nats = require('nats'); | ||
var servers = ['nats://nats.io:4222', 'nats://nats.io:5222', 'nats://nats.io:6222']; | ||
// Randomly connect to a server in the cluster group. | ||
var nc = nats.connect({'servers': servers}); | ||
// currentServer is the URL of the connected server. | ||
console.log("Connected to " + nc.currentServer.host); | ||
// Preserve order when connecting to servers. | ||
nc = nats.connect({'dontRandomize': true, 'servers':servers}); | ||
``` | ||
## Advanced Usage | ||
@@ -129,2 +147,3 @@ | ||
Copyright (c) 2015 Apcera Inc.<br/> | ||
Copyright (c) 2011-2014 Derek Collison | ||
@@ -131,0 +150,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
28570
787
166