Comparing version
199
lib/nats.js
/*! | ||
* Nats | ||
* Copyright(c) 2012-2015 Apcera Inc. All rights reserved. | ||
* Copyright(c) 2011-2014 Derek Collison (derek.collison@gmail.com) | ||
@@ -20,3 +21,3 @@ * MIT Licensed | ||
var VERSION = '0.2.9', | ||
var VERSION = '0.3.0', | ||
@@ -152,2 +153,12 @@ DEFAULT_PORT = 4222, | ||
function shuffle(array) { | ||
for (var i = array.length - 1; i > 0; i--) { | ||
var j = Math.floor(Math.random() * (i + 1)); | ||
var temp = array[i]; | ||
array[i] = array[j]; | ||
array[j] = temp; | ||
} | ||
return array; | ||
} | ||
/** | ||
@@ -162,3 +173,2 @@ * Parse the conctructor/connect options. | ||
var options = this.options = { | ||
'url' : DEFAULT_URI, | ||
'verbose' : false, | ||
@@ -170,3 +180,6 @@ 'pedantic' : false, | ||
}; | ||
if ('number' === typeof opts) { | ||
if (undefined === opts) { | ||
options.url = DEFAULT_URI; | ||
} else if ('number' === typeof opts) { | ||
options.url = DEFAULT_PRE + opts; | ||
@@ -190,18 +203,70 @@ } else if ('string' === typeof opts) { | ||
this.assignOption(opts, 'reconnectTimeWait'); | ||
this.assignOption(opts, 'servers'); | ||
this.assignOption(opts, 'urls', 'servers'); | ||
this.assignOption(opts, 'noRandomize'); | ||
this.assignOption(opts, 'NoRandomize', 'noRandomize'); | ||
this.assignOption(opts, 'dontRandomize', 'noRandomize'); | ||
} | ||
options.uri = options.url; | ||
if (options.url !== undefined) { | ||
// Parse the url | ||
this.url = url.parse(options.url); | ||
if ('auth' in this.url && !!this.url.auth) { | ||
var auth = this.url.auth.split(':'); | ||
if (options.user === undefined) { | ||
options.user = auth[0]; | ||
} | ||
if (options.pass === undefined) { | ||
options.pass = auth[1]; | ||
} | ||
var client = this; | ||
// Set user/pass as needed if in options. | ||
client.user = options.user; | ||
client.pass = options.pass; | ||
// For cluster support | ||
client.servers = []; | ||
if (Array.isArray(options.servers)) { | ||
options.servers.forEach(function(server) { | ||
client.servers.push(new Server(url.parse(server))); | ||
}); | ||
} else { | ||
client.servers.push(new Server(url.parse(options.url))); | ||
} | ||
// Randomize if needed | ||
if (options.noRandomize !== true) { | ||
shuffle(client.servers); | ||
} | ||
}; | ||
/** | ||
* Create a new server. | ||
* | ||
* @api private | ||
*/ | ||
function Server(url) { | ||
this.url = url; | ||
this.didConnect = false; | ||
this.reconnects = 0; | ||
} | ||
/** | ||
* Properly select the next server. | ||
* We rotate the server list as we go, | ||
* we also pull auth from urls as needed, or | ||
* if they were set in options use that as override. | ||
* | ||
* @api private | ||
*/ | ||
Client.prototype.selectServer = function() { | ||
var client = this; | ||
var server = client.servers.shift(); | ||
// Place in client context. | ||
client.currentServer = server; | ||
client.url = server.url; | ||
if ('auth' in server.url && !!server.url.auth) { | ||
var auth = server.url.auth.split(':'); | ||
if (client.options.user === undefined) { | ||
client.user = auth[0]; | ||
} | ||
if (client.options.pass === undefined) { | ||
client.pass = auth[1]; | ||
} | ||
} | ||
client.servers.push(server); | ||
}; | ||
@@ -223,2 +288,4 @@ | ||
client.selectServer(); | ||
var stream = client.stream = net.createConnection(client.url.port, client.url.hostname); | ||
@@ -230,2 +297,4 @@ | ||
client.connected = true; | ||
client.wasConnected = true; | ||
client.currentServer.didConnect = true; | ||
client.reconnecting = false; | ||
@@ -242,5 +311,3 @@ client.reconnects = 0; | ||
// FIXME, use close event? | ||
stream.on('close', function(hadError) { | ||
if (hadError) { return; } | ||
client.closeStream(); | ||
@@ -258,17 +325,20 @@ client.emit('disconnect'); | ||
stream.on('error', function(exception) { | ||
client.closeStream(); | ||
// If we were connected just return, close event will process | ||
if (client.wasConnected === true && client.currentServer.didConnect === true) { | ||
return; | ||
} | ||
if (client.reconnecting === false) { | ||
client.emit('error', exception); | ||
// if the current server did not connect at all, and we in | ||
// general have not connected to any server, remove it from | ||
// this list. | ||
if (client.wasConnected === false && client.currentServer.didConnect === false) { | ||
client.servers.splice(client.servers.length-1, 1); | ||
} | ||
client.emit('disconnect'); | ||
if (client.reconnecting === true) { | ||
if (client.closed === true || | ||
client.reconnects >= client.options.maxReconnectAttempts) { | ||
client.emit('close'); | ||
} else { | ||
client.scheduleReconnect(); | ||
} | ||
// Only bubble up error if we never had connected | ||
// to the server and we only have one. | ||
if (client.wasConnected === false && client.servers.length === 0) { | ||
client.emit('error', "Could not connect to server: " + exception); | ||
} | ||
client.closeStream(); | ||
}); | ||
@@ -279,2 +349,4 @@ | ||
// This will switch inbound to a string but seems to perform | ||
// better than Buffer.concat. | ||
client.inbound = client.inbound ? client.inbound + data : data; | ||
@@ -284,4 +356,8 @@ | ||
switch (client.pstate) { | ||
case AWAITING_CONTROL: | ||
if ((m = MSG.exec(client.inbound)) !== null) { | ||
// Regex only works on strings, so convert once to be more efficient. | ||
// Long term answer is hand rolled parser, not regex. | ||
var buf = client.inbound.toString('binary', 0, MAX_CONTROL_LINE_SIZE); | ||
if ((m = MSG.exec(buf)) !== null) { | ||
client.payload = { | ||
@@ -294,12 +370,12 @@ subj : m[1], | ||
client.pstate = AWAITING_MSG_PAYLOAD; | ||
} else if ((m = OK.exec(client.inbound)) !== null) { | ||
} else if ((m = OK.exec(buf)) !== null) { | ||
// Ignore for now.. | ||
} else if ((m = ERR.exec(client.inbound)) !== null) { | ||
} else if ((m = ERR.exec(buf)) !== null) { | ||
client.emit('error', m[1]); | ||
} else if ((m = PONG.exec(client.inbound)) !== null) { | ||
} else if ((m = PONG.exec(buf)) !== null) { | ||
var cb = client.pongs.shift(); | ||
if (cb) { cb(); } // FIXME: Should we check for exceptions? | ||
} else if ((m = PING.exec(client.inbound)) !== null) { | ||
} else if ((m = PING.exec(buf)) !== null) { | ||
client.sendCommand(PONG_RESPONSE); | ||
} else if ((m = INFO.exec(client.inbound)) !== null) { | ||
} else if ((m = INFO.exec(buf)) !== null) { | ||
// Ignore for now.. | ||
@@ -319,4 +395,4 @@ } else { | ||
// FIXME, may be inefficient. | ||
client.payload.msg = client.inbound.slice(0, client.payload.size).toString(); | ||
// FIXME(dlc), may be inefficient. | ||
client.payload.msg = client.inbound.toString('utf8', 0, client.payload.size); | ||
@@ -346,6 +422,11 @@ if (client.inbound.length === client.payload.size + CR_LF_LEN) { | ||
// Queue the connect command. | ||
var cs = { 'verbose':this.options.verbose, 'pedantic':this.options.pedantic }; | ||
if (this.options.user !== undefined) { | ||
cs.user = this.options.user; | ||
cs.pass = this.options.pass; | ||
var cs = { | ||
'lang' : 'node', | ||
'version' : VERSION, | ||
'verbose' : this.options.verbose, | ||
'pedantic': this.options.pedantic | ||
}; | ||
if (this.user !== undefined) { | ||
cs.user = this.user; | ||
cs.pass = this.pass; | ||
} | ||
@@ -366,3 +447,5 @@ this.sendCommand(CONNECT + SPC + JSON.stringify(cs) + CR_LF); | ||
this.connected = false; | ||
this.wasConnected = false; | ||
this.reconnecting = false; | ||
this.server = null; | ||
}; | ||
@@ -406,2 +489,3 @@ | ||
} | ||
this.inbound = null; | ||
}; | ||
@@ -554,11 +638,11 @@ | ||
var proto = [PUB, subject]; | ||
var pmsg = [Buffer.byteLength(msg), CR_LF, msg, CR_LF]; | ||
if (opt_reply !== undefined) { | ||
proto.push(opt_reply); | ||
// Hold PUB SUB [REPLY] | ||
var psub; | ||
if (opt_reply === undefined) { | ||
psub = "PUB " + subject + SPC; | ||
} else { | ||
psub = "PUB " + subject + SPC + opt_reply + SPC; | ||
} | ||
this.sendCommand(psub + Buffer.byteLength(msg) + CR_LF + msg + CR_LF); | ||
this.sendCommand(proto.concat(pmsg.join(EMPTY)).join(SPC)); | ||
if (opt_callback !== undefined) { | ||
@@ -692,3 +776,5 @@ this.flush(opt_callback); | ||
this.createConnection(); | ||
this.emit('reconnecting'); | ||
if (this.currentServer.didConnect === true) { | ||
this.emit('reconnecting'); | ||
} | ||
}; | ||
@@ -704,4 +790,17 @@ | ||
var client = this; | ||
client.reconnecting = true; | ||
setTimeout(function() { client.reconnect(); }, this.options.reconnectTimeWait); | ||
// Just return if no more servers | ||
if (client.servers.length === 0) { | ||
return; | ||
} | ||
// Don't set reconnecting state if we are just trying | ||
// for the first time. | ||
if (client.wasConnected === true) { | ||
client.reconnecting = true; | ||
} | ||
// Only stall if we have connected before. | ||
var wait = 0; | ||
if (client.servers[0].didConnect === true) { | ||
wait = this.options.reconnectTimeWait; | ||
} | ||
setTimeout(function() { client.reconnect(); }, wait); | ||
}; |
{ | ||
"name": "nats", | ||
"description": "Node.js client for NATS, a lightweight messaging system", | ||
"version": "0.2.9", | ||
"description": "Node.js client for NATS, a lightweight, high-performance messaging system", | ||
"version": "0.3.0", | ||
"repository": { | ||
"type": "git", | ||
"url": "git@github.com:derekcollison/node-nats.git" | ||
"url": "git@github.com:nats-io/node-nats.git" | ||
}, | ||
"homepage": "https://nats.io", | ||
"author": "Derek Collison <derek.collison@gmail.com>", | ||
"author": "Derek Collison <derek@apcera.com>", | ||
"keywords": [ | ||
@@ -25,4 +25,4 @@ "messaging", | ||
"mocha": "*", | ||
"mocha-lcov-reporter": "^0.0.1", | ||
"should": ">= 3.0.0" | ||
"mocha-lcov-reporter": "^0.0.2", | ||
"should": ">= 7.0.0" | ||
}, | ||
@@ -35,4 +35,4 @@ "main": "index", | ||
"engines": { | ||
"node": ">= 0.8.x <= 0.10.x" | ||
"node": ">= 0.10.x <= 0.12.x" | ||
} | ||
} |
@@ -5,3 +5,4 @@ # NATS - Node.js Client | ||
[](http://travis-ci.org/derekcollison/node-nats) [](http://badge.fury.io/js/nats)[](https://coveralls.io/r/derekcollison/node-nats?branch=master) | ||
[](http://opensource.org/licenses/MIT) | ||
[](http://travis-ci.org/nats-io/node-nats) [](http://badge.fury.io/js/nats)[](https://coveralls.io/r/nats-io/node-nats?branch=master) | ||
@@ -82,3 +83,20 @@ ## Installation | ||
``` | ||
## Clustered Usage | ||
```javascript | ||
var nats = require('nats'); | ||
var servers = ['nats://nats.io:4222', 'nats://nats.io:5222', 'nats://nats.io:6222']; | ||
// Randomly connect to a server in the cluster group. | ||
var nc = nats.connect({'servers': servers}); | ||
// currentServer is the URL of the connected server. | ||
console.log("Connected to " + nc.currentServer.host); | ||
// Preserve order when connecting to servers. | ||
nc = nats.connect({'dontRandomize': true, 'servers':servers}); | ||
``` | ||
## Advanced Usage | ||
@@ -129,2 +147,3 @@ | ||
Copyright (c) 2015 Apcera Inc.<br/> | ||
Copyright (c) 2011-2014 Derek Collison | ||
@@ -131,0 +150,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
28570
13.02%787
12.59%166
12.93%