Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

nats

Package Overview
Dependencies
Maintainers
1
Versions
195
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nats - npm Package Compare versions

Comparing version 0.2.9 to 0.3.0

199

lib/nats.js
/*!
* Nats
* Copyright(c) 2012-2015 Apcera Inc. All rights reserved.
* Copyright(c) 2011-2014 Derek Collison (derek.collison@gmail.com)

@@ -20,3 +21,3 @@ * MIT Licensed

var VERSION = '0.2.9',
var VERSION = '0.3.0',

@@ -152,2 +153,12 @@ DEFAULT_PORT = 4222,

function shuffle(array) {
for (var i = array.length - 1; i > 0; i--) {
var j = Math.floor(Math.random() * (i + 1));
var temp = array[i];
array[i] = array[j];
array[j] = temp;
}
return array;
}
/**

@@ -162,3 +173,2 @@ * Parse the conctructor/connect options.

var options = this.options = {
'url' : DEFAULT_URI,
'verbose' : false,

@@ -170,3 +180,6 @@ 'pedantic' : false,

};
if ('number' === typeof opts) {
if (undefined === opts) {
options.url = DEFAULT_URI;
} else if ('number' === typeof opts) {
options.url = DEFAULT_PRE + opts;

@@ -190,18 +203,70 @@ } else if ('string' === typeof opts) {

this.assignOption(opts, 'reconnectTimeWait');
this.assignOption(opts, 'servers');
this.assignOption(opts, 'urls', 'servers');
this.assignOption(opts, 'noRandomize');
this.assignOption(opts, 'NoRandomize', 'noRandomize');
this.assignOption(opts, 'dontRandomize', 'noRandomize');
}
options.uri = options.url;
if (options.url !== undefined) {
// Parse the url
this.url = url.parse(options.url);
if ('auth' in this.url && !!this.url.auth) {
var auth = this.url.auth.split(':');
if (options.user === undefined) {
options.user = auth[0];
}
if (options.pass === undefined) {
options.pass = auth[1];
}
var client = this;
// Set user/pass as needed if in options.
client.user = options.user;
client.pass = options.pass;
// For cluster support
client.servers = [];
if (Array.isArray(options.servers)) {
options.servers.forEach(function(server) {
client.servers.push(new Server(url.parse(server)));
});
} else {
client.servers.push(new Server(url.parse(options.url)));
}
// Randomize if needed
if (options.noRandomize !== true) {
shuffle(client.servers);
}
};
/**
* Create a new server.
*
* @api private
*/
function Server(url) {
this.url = url;
this.didConnect = false;
this.reconnects = 0;
}
/**
* Properly select the next server.
* We rotate the server list as we go,
* we also pull auth from urls as needed, or
* if they were set in options use that as override.
*
* @api private
*/
Client.prototype.selectServer = function() {
var client = this;
var server = client.servers.shift();
// Place in client context.
client.currentServer = server;
client.url = server.url;
if ('auth' in server.url && !!server.url.auth) {
var auth = server.url.auth.split(':');
if (client.options.user === undefined) {
client.user = auth[0];
}
if (client.options.pass === undefined) {
client.pass = auth[1];
}
}
client.servers.push(server);
};

@@ -223,2 +288,4 @@

client.selectServer();
var stream = client.stream = net.createConnection(client.url.port, client.url.hostname);

@@ -230,2 +297,4 @@

client.connected = true;
client.wasConnected = true;
client.currentServer.didConnect = true;
client.reconnecting = false;

@@ -242,5 +311,3 @@ client.reconnects = 0;

// FIXME, use close event?
stream.on('close', function(hadError) {
if (hadError) { return; }
client.closeStream();

@@ -258,17 +325,20 @@ client.emit('disconnect');

stream.on('error', function(exception) {
client.closeStream();
// If we were connected just return, close event will process
if (client.wasConnected === true && client.currentServer.didConnect === true) {
return;
}
if (client.reconnecting === false) {
client.emit('error', exception);
// if the current server did not connect at all, and we in
// general have not connected to any server, remove it from
// this list.
if (client.wasConnected === false && client.currentServer.didConnect === false) {
client.servers.splice(client.servers.length-1, 1);
}
client.emit('disconnect');
if (client.reconnecting === true) {
if (client.closed === true ||
client.reconnects >= client.options.maxReconnectAttempts) {
client.emit('close');
} else {
client.scheduleReconnect();
}
// Only bubble up error if we never had connected
// to the server and we only have one.
if (client.wasConnected === false && client.servers.length === 0) {
client.emit('error', "Could not connect to server: " + exception);
}
client.closeStream();
});

@@ -279,2 +349,4 @@

// This will switch inbound to a string but seems to perform
// better than Buffer.concat.
client.inbound = client.inbound ? client.inbound + data : data;

@@ -284,4 +356,8 @@

switch (client.pstate) {
case AWAITING_CONTROL:
if ((m = MSG.exec(client.inbound)) !== null) {
// Regex only works on strings, so convert once to be more efficient.
// Long term answer is hand rolled parser, not regex.
var buf = client.inbound.toString('binary', 0, MAX_CONTROL_LINE_SIZE);
if ((m = MSG.exec(buf)) !== null) {
client.payload = {

@@ -294,12 +370,12 @@ subj : m[1],

client.pstate = AWAITING_MSG_PAYLOAD;
} else if ((m = OK.exec(client.inbound)) !== null) {
} else if ((m = OK.exec(buf)) !== null) {
// Ignore for now..
} else if ((m = ERR.exec(client.inbound)) !== null) {
} else if ((m = ERR.exec(buf)) !== null) {
client.emit('error', m[1]);
} else if ((m = PONG.exec(client.inbound)) !== null) {
} else if ((m = PONG.exec(buf)) !== null) {
var cb = client.pongs.shift();
if (cb) { cb(); } // FIXME: Should we check for exceptions?
} else if ((m = PING.exec(client.inbound)) !== null) {
} else if ((m = PING.exec(buf)) !== null) {
client.sendCommand(PONG_RESPONSE);
} else if ((m = INFO.exec(client.inbound)) !== null) {
} else if ((m = INFO.exec(buf)) !== null) {
// Ignore for now..

@@ -319,4 +395,4 @@ } else {

// FIXME, may be inefficient.
client.payload.msg = client.inbound.slice(0, client.payload.size).toString();
// FIXME(dlc), may be inefficient.
client.payload.msg = client.inbound.toString('utf8', 0, client.payload.size);

@@ -346,6 +422,11 @@ if (client.inbound.length === client.payload.size + CR_LF_LEN) {

// Queue the connect command.
var cs = { 'verbose':this.options.verbose, 'pedantic':this.options.pedantic };
if (this.options.user !== undefined) {
cs.user = this.options.user;
cs.pass = this.options.pass;
var cs = {
'lang' : 'node',
'version' : VERSION,
'verbose' : this.options.verbose,
'pedantic': this.options.pedantic
};
if (this.user !== undefined) {
cs.user = this.user;
cs.pass = this.pass;
}

@@ -366,3 +447,5 @@ this.sendCommand(CONNECT + SPC + JSON.stringify(cs) + CR_LF);

this.connected = false;
this.wasConnected = false;
this.reconnecting = false;
this.server = null;
};

@@ -406,2 +489,3 @@

}
this.inbound = null;
};

@@ -554,11 +638,11 @@

var proto = [PUB, subject];
var pmsg = [Buffer.byteLength(msg), CR_LF, msg, CR_LF];
if (opt_reply !== undefined) {
proto.push(opt_reply);
// Hold PUB SUB [REPLY]
var psub;
if (opt_reply === undefined) {
psub = "PUB " + subject + SPC;
} else {
psub = "PUB " + subject + SPC + opt_reply + SPC;
}
this.sendCommand(psub + Buffer.byteLength(msg) + CR_LF + msg + CR_LF);
this.sendCommand(proto.concat(pmsg.join(EMPTY)).join(SPC));
if (opt_callback !== undefined) {

@@ -692,3 +776,5 @@ this.flush(opt_callback);

this.createConnection();
this.emit('reconnecting');
if (this.currentServer.didConnect === true) {
this.emit('reconnecting');
}
};

@@ -704,4 +790,17 @@

var client = this;
client.reconnecting = true;
setTimeout(function() { client.reconnect(); }, this.options.reconnectTimeWait);
// Just return if no more servers
if (client.servers.length === 0) {
return;
}
// Don't set reconnecting state if we are just trying
// for the first time.
if (client.wasConnected === true) {
client.reconnecting = true;
}
// Only stall if we have connected before.
var wait = 0;
if (client.servers[0].didConnect === true) {
wait = this.options.reconnectTimeWait;
}
setTimeout(function() { client.reconnect(); }, wait);
};
{
"name": "nats",
"description": "Node.js client for NATS, a lightweight messaging system",
"version": "0.2.9",
"description": "Node.js client for NATS, a lightweight, high-performance messaging system",
"version": "0.3.0",
"repository": {
"type": "git",
"url": "git@github.com:derekcollison/node-nats.git"
"url": "git@github.com:nats-io/node-nats.git"
},
"homepage": "https://nats.io",
"author": "Derek Collison <derek.collison@gmail.com>",
"author": "Derek Collison <derek@apcera.com>",
"keywords": [

@@ -25,4 +25,4 @@ "messaging",

"mocha": "*",
"mocha-lcov-reporter": "^0.0.1",
"should": ">= 3.0.0"
"mocha-lcov-reporter": "^0.0.2",
"should": ">= 7.0.0"
},

@@ -35,4 +35,4 @@ "main": "index",

"engines": {
"node": ">= 0.8.x <= 0.10.x"
"node": ">= 0.10.x <= 0.12.x"
}
}

@@ -5,3 +5,4 @@ # NATS - Node.js Client

[![Build Status](https://secure.travis-ci.org/derekcollison/node-nats.png)](http://travis-ci.org/derekcollison/node-nats) [![npm version](https://badge.fury.io/js/nats.svg)](http://badge.fury.io/js/nats)[![Coverage Status](https://img.shields.io/coveralls/derekcollison/node-nats.svg)](https://coveralls.io/r/derekcollison/node-nats?branch=master)
[![License MIT](https://img.shields.io/npm/l/express.svg)](http://opensource.org/licenses/MIT)
[![Build Status](https://secure.travis-ci.org/nats-io/node-nats.png)](http://travis-ci.org/nats-io/node-nats) [![npm version](https://badge.fury.io/js/nats.svg)](http://badge.fury.io/js/nats)[![Coverage Status](https://coveralls.io/repos/nats-io/node-nats/badge.svg)](https://coveralls.io/r/nats-io/node-nats?branch=master)

@@ -82,3 +83,20 @@ ## Installation

```
## Clustered Usage
```javascript
var nats = require('nats');
var servers = ['nats://nats.io:4222', 'nats://nats.io:5222', 'nats://nats.io:6222'];
// Randomly connect to a server in the cluster group.
var nc = nats.connect({'servers': servers});
// currentServer is the URL of the connected server.
console.log("Connected to " + nc.currentServer.host);
// Preserve order when connecting to servers.
nc = nats.connect({'dontRandomize': true, 'servers':servers});
```
## Advanced Usage

@@ -129,2 +147,3 @@

Copyright (c) 2015 Apcera Inc.<br/>
Copyright (c) 2011-2014 Derek Collison

@@ -131,0 +150,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc