New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

nats

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nats - npm Package Compare versions

Comparing version 0.5.0 to 0.5.4

examples/nats-sub-test

5

benchmark/pub_perf.js

@@ -17,4 +17,7 @@

var invalid2octet = new Buffer('\xc3\x28', 'binary');
for (var i=0; i<loop; i++) {
nats.publish('test', 'ok');
nats.publish('test', invalid2octet);
//nats.publish('test', 'ok');
if (i % hash === 0) {

@@ -21,0 +24,0 @@ process.stdout.write('+');

130

lib/nats.js

@@ -25,3 +25,3 @@ /*!

var VERSION = '0.5.0',
var VERSION = '0.5.4',

@@ -54,2 +54,3 @@ DEFAULT_PORT = 4222,

CR_LF_LEN = CR_LF.length,
CR_LF_BUF = new Buffer(CR_LF),
EMPTY = '',

@@ -70,2 +71,8 @@ SPC = ' ',

// Errors
BAD_SUBJECT_ERR = new Error('Subject must be supplied'),
BAD_MSG_ERR = new Error('Message can\'t be a function'),
BAD_REPLY_ERR = new Error('Reply can\'t be a function'),
CONN_CLOSED_ERR = new Error('Connection closed'),
// Pedantic Mode support

@@ -332,6 +339,9 @@ //Q_SUB = /^([^\.\*>\s]+|>$|\*)(\.([^\.\*>\s]+|>$|\*))*$/, // TODO: remove / never used

var wasReconnecting = this.reconnecting;
var event = wasReconnecting === true ? 'reconnect' : 'connect';
var event = (wasReconnecting === true) ? 'reconnect' : 'connect';
this.reconnecting = false;
this.reconnects = 0;
this.flushPending();
this.wasConnected = true;
this.currentServer.didConnect = true;
if (wasReconnecting) {

@@ -341,5 +351,2 @@ this.sendSubscriptions();

this.wasConnected = true;
this.currentServer.didConnect = true;
this.emit(event, this);

@@ -448,5 +455,4 @@ };

Client.prototype.createConnection = function() {
// Initialize
this.pongs = [];
this.pending = [];
this.pSize = 0;
this.pstate = AWAITING_CONTROL;

@@ -458,2 +464,8 @@

// Setup pending if needed.
if (this.pending === null) {
this.pending = [];
this.pSize = 0;
}
// Select a server to connect to.

@@ -465,2 +477,4 @@ this.selectServer();

this.setupHandlers();
// Queue up the connect message.
this.sendConnect();
};

@@ -482,2 +496,3 @@

this.server = null;
this.pending = [];
};

@@ -534,7 +549,35 @@

this.pending.length === 0 ||
this.infoReceived !== true) { return; }
this.infoReceived !== true) {
return;
}
var result = true;
this.stream.write(this.pending.join(EMPTY));
if (!this.pBufs) {
// All strings, fastest for now.
result = this.stream.write(this.pending.join(EMPTY));
} else {
// We have some or all Buffers. Figure out if we can optimize.
var allBufs = true;
for (var i=0; i < this.pending.length; i++){
if (!Buffer.isBuffer(this.pending[i])) {
allBufs = false;
break;
}
}
// If all buffers, concat together and write once.
if (allBufs) {
result = this.stream.write(Buffer.concat(this.pending, this.pSize));
} else {
// We have a mix, so write each one individually.
for (i=0; i < this.pending.length; i++){
result = this.stream.write(this.pending[i]) && result;
}
}
}
this.pending = [];
this.pSize = 0;
this.pBufs = undefined;
return result;
};

@@ -555,3 +598,8 @@

this.pending.push(cmd);
this.pSize += Buffer.byteLength(cmd);
if (!Buffer.isBuffer(cmd)) {
this.pSize += Buffer.byteLength(cmd);
} else {
this.pSize += cmd.length;
this.pBufs = true;
}

@@ -609,5 +657,4 @@ if (this.connected === true) {

// unpause if needed.
if (client.stream.isPaused()) {
client.stream.resume();
}
// FIXME(dlc) client.stream.isPaused() causes 0.10 to fail
client.stream.resume();

@@ -641,3 +688,3 @@ /* jshint -W083 */

} else if ((m = PONG.exec(buf)) !== null) {
var cb = client.pongs.shift();
var cb = client.pongs && client.pongs.shift();
if (cb) { cb(); } // FIXME: Should we check for exceptions?

@@ -670,3 +717,3 @@ } else if ((m = PING.exec(buf)) !== null) {

// Queue up the connect message.
client.sendConnect();
// client.sendConnect();

@@ -778,2 +825,3 @@ // Perform a PING/PONG flush to emit connected event.

delete this.subs[this.payload.sid];
this.emit('unsubscribe', this.payload.sid, sub.subject);
} else if (sub.received > sub.max) {

@@ -800,3 +848,11 @@ this.unsubscribe(this.payload.sid);

Client.prototype.flush = function(opt_callback) {
if (typeof opt_callback === 'function') {
if (this.closed) {
if (typeof opt_callback === 'function') {
opt_callback(CONN_CLOSED_ERR);
return;
} else {
throw(CONN_CLOSED_ERR);
}
}
if (this.pongs) {
this.pongs.push(opt_callback);

@@ -812,3 +868,3 @@ this.sendCommand(PING_REQUEST);

* @param {String} subject
* @param {String} msg
* @param {String} opt_msg
* @param {String} opt_reply

@@ -820,6 +876,19 @@ * @param {Function} opt_callback

Client.prototype.publish = function(subject, msg, opt_reply, opt_callback) {
// They only supplied a callback function.
if (typeof subject === 'function') {
opt_callback = subject;
subject = undefined;
}
if (!msg) { msg = EMPTY; }
if (!subject) {
if (opt_callback) {
opt_callback(BAD_SUBJECT_ERR);
} else {
throw(BAD_SUBJECT_ERR);
}
}
if (typeof msg === 'function') {
if (opt_callback || opt_reply) {
throw(new Error('Message can\'t be a function'));
opt_callback(BAD_MSG_ERR);
return;
}

@@ -832,3 +901,4 @@ opt_callback = msg;

if (opt_callback) {
throw(new Error('Reply can\'t be a function'));
opt_callback(BAD_REPLY_ERR);
return;
}

@@ -846,6 +916,18 @@ opt_callback = opt_reply;

}
this.sendCommand(psub + Buffer.byteLength(msg) + CR_LF + msg + CR_LF);
// Need to treat sending buffers different.
if (!Buffer.isBuffer(msg)) {
this.sendCommand(psub + Buffer.byteLength(msg) + CR_LF + msg + CR_LF);
} else {
var b = new Buffer(psub.length + msg.length + (2 * CR_LF_LEN) + msg.length.toString().length);
var len = b.write(psub + msg.length + CR_LF);
msg.copy(b, len);
b.write(CR_LF, len + msg.length);
this.sendCommand(b);
}
if (opt_callback !== undefined) {
this.flush(opt_callback);
} else if (this.closed) {
throw(CONN_CLOSED_ERR);
}

@@ -866,6 +948,9 @@ };

Client.prototype.subscribe = function(subject, opts, callback) {
if (this.closed) {
throw(CONN_CLOSED_ERR);
}
var qgroup, max;
if (typeof opts === 'function') {
callback = opts;
opts = null;
opts = undefined;
} else if (opts && typeof opts === 'object') {

@@ -886,3 +971,5 @@ // FIXME, check exists, error otherwise..

}
this.sendCommand(proto.join(SPC));
this.emit('subscribe', this.ssid, subject, opts);

@@ -921,2 +1008,3 @@ if (max) {

delete this.subs[sid];
this.emit('unsubscribe', sid, sub.subject);
}

@@ -923,0 +1011,0 @@ };

{
"name": "nats",
"version": "0.5.0",
"version": "0.5.4",
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system",

@@ -5,0 +5,0 @@ "keywords": [

@@ -152,2 +152,8 @@ # NATS - Node.js Client

// If you want to make sure NATS yields during the processing
// of messages, you can use an option to specify a yieldTime in ms.
// During the processing of the inbound stream, we will yield if we
// spend more then yieldTime milliseconds processing.
var nc = nats.connect({port: PORT, yieldTime: 10});
// Timeouts for subscriptions

@@ -154,0 +160,0 @@ var sid = nats.subscribe('foo', function() {

@@ -51,2 +51,9 @@ /* jslint node: true */

it('should connect with proper credentials as server url', function(done) {
var nc = NATS.connect({'servers':[authUrl]});
nc.on('connect', function(/*nc*/) {
setTimeout(done, 100);
});
});
});

@@ -154,2 +154,2 @@ /* jslint node: true */

});
});

@@ -62,2 +62,36 @@ /* jslint node: true */

});
it('should still receive publish when some servers are invalid', function(done){
var natsServers = ['nats://localhost:22222', uri, 'nats://localhost:22223'];
var ua = NATS.connect({servers: natsServers});
var ub = NATS.connect({servers: natsServers});
var recvMsg = '';
ua.subscribe('topic1', function(msg, reply, subject){
recvMsg = msg;
});
setTimeout(function(){
ub.publish('topic1', 'hello');
}, 100 * 1);
setTimeout(function(){
recvMsg.should.equal('hello');
done();
}, 100 * 2);
});
it('should still receive publish when some servers[noRandomize] are invalid', function(done){
var natsServers = ['nats://localhost:22222', uri, 'nats://localhost:22223'];
var ua = NATS.connect({servers: natsServers, noRandomize:true});
var ub = NATS.connect({servers: natsServers, noRandomize:true});
var recvMsg = "";
ua.subscribe('topic1', function(msg, reply, subject){
recvMsg = msg;
});
setTimeout(function(){
ub.publish('topic1', 'hello');
}, 100 * 1);
setTimeout(function(){
recvMsg.should.equal('hello');
done();
}, 100 * 2);
});
});

@@ -213,2 +213,24 @@ /* jslint node: true */

});
it('should not crash when sending a publish with a callback after connection loss', function(done) {
var nc = NATS.connect({'port':PORT, 'reconnectTimeWait':WAIT});
var startTime;
should.exist(nc);
nc.on('connect', function() {
server.kill();
startTime = new Date();
});
nc.on('disconnect', function() {
nc.publish('foo', 'bar', 'reply', function() {
// fails to get here, but should not crash
});
server = nsc.start_server(PORT);
});
nc.on('reconnect', function() {
nc.flush(function() {
nc.close();
done();
});
});
});
});

@@ -11,3 +11,3 @@ /* jslint node: true */

var PORT = 1429;
var PORT = 1430;
var server;

@@ -77,2 +77,2 @@

});
});

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc