Comparing version 0.2.4 to 0.2.5
@@ -9,4 +9,4 @@ | ||
var loop = 100000; | ||
var hash = 2500; | ||
var loop = 50000; | ||
var hash = 1000; | ||
@@ -18,3 +18,2 @@ console.log('Publish/Subscribe Performance Test'); | ||
var received = 0; | ||
var start = new Date(); | ||
@@ -24,2 +23,3 @@ | ||
received += 1; | ||
if (received === loop) { | ||
@@ -34,9 +34,12 @@ var stop = new Date(); | ||
for (var i=0; i<loop; i++) { | ||
nc2.publish('test', 'ok'); | ||
if (i % hash === 0) { | ||
process.stdout.write('+'); | ||
// Make sure sub is registered | ||
nc1.flush(function() { | ||
for (var i=0; i<loop; i++) { | ||
nc2.publish('test', 'ok'); | ||
if (i % hash === 0) { | ||
process.stdout.write('+'); | ||
} | ||
} | ||
} | ||
}); | ||
}); |
@@ -20,3 +20,3 @@ /*! | ||
var VERSION = '0.2.4', | ||
var VERSION = '0.2.5', | ||
@@ -51,7 +51,12 @@ DEFAULT_PORT = 4222, | ||
SPC = ' ', | ||
PUB = 'PUB', | ||
// Protocol | ||
PUB = 'PUB', | ||
SUB = 'SUB', | ||
UNSUB = 'UNSUB', | ||
CONNECT = 'CONNECT', | ||
// Responses | ||
PING_REQUEST = 'PING'+CR_LF, | ||
PONG_RESPONSE = 'PONG'+CR_LF, | ||
PING_REQUEST = 'PING' + CR_LF, | ||
PONG_RESPONSE = 'PONG' + CR_LF, | ||
@@ -61,5 +66,7 @@ EMPTY = '', | ||
// Pedantic Mode support | ||
SUB = /^([^\.\*>\s]+|>$|\*)(\.([^\.\*>\s]+|>$|\*))*$/, | ||
SUB_NO_WC = /^([^\.\*>\s]+)(\.([^\.\*>\s]+))*$/; | ||
Q_SUB = /^([^\.\*>\s]+|>$|\*)(\.([^\.\*>\s]+|>$|\*))*$/, | ||
Q_SUB_NO_WC = /^([^\.\*>\s]+)(\.([^\.\*>\s]+))*$/; | ||
FLUSH_THRESHOLD = 65536, | ||
/** | ||
@@ -211,2 +218,3 @@ * Library Version | ||
client.pending = []; | ||
client.pSize = 0; | ||
client.pstate = AWAITING_CONTROL; | ||
@@ -216,8 +224,6 @@ | ||
//stream.setEncoding('utf8'); // FIXME, Force Strings | ||
stream.on('connect', function() { | ||
var wasReconnecting = client.reconnecting; | ||
var event = wasReconnecting === true ? 'reconnect' : 'connect'; | ||
client.connected = true; | ||
client.connected = true; | ||
client.reconnecting = false; | ||
@@ -337,3 +343,3 @@ client.reconnects = 0; | ||
} | ||
this.sendCommand('CONNECT ' + JSON.stringify(cs) + CR_LF); | ||
this.sendCommand(CONNECT + SPC + JSON.stringify(cs) + CR_LF); | ||
} | ||
@@ -370,2 +376,3 @@ | ||
this.pending = null; | ||
this.pSize = 0; | ||
} | ||
@@ -388,2 +395,3 @@ | ||
this.pending = null; | ||
this.pSize = 0; | ||
this.connected = false; | ||
@@ -400,7 +408,9 @@ } | ||
Client.prototype.flushPending = function() { | ||
if (this.connected === false || this.pending == null) { | ||
return; | ||
} | ||
if (this.connected === false || | ||
this.pending == null || | ||
this.pending.length === 0) { return; } | ||
this.stream.write(this.pending.join(EMPTY)); | ||
this.pending = null; | ||
this.pending = []; | ||
this.pSize = 0; | ||
} | ||
@@ -415,7 +425,23 @@ | ||
Client.prototype.sendCommand = function(cmd) { | ||
// Buffer to cut down on system calls, increase throughput. | ||
// When receive gets faster, should make this Buffer based.. | ||
if (this.closed || this.pending == null) { return; } | ||
this.pending.push(cmd); | ||
this.pSize += Buffer.byteLength(cmd); | ||
if (this.connected === true) { | ||
this.stream.write(cmd); | ||
} else { | ||
this.pending.push(cmd); | ||
// First one let's setup flush.. | ||
if (this.pending.length === 1) { | ||
var self = this; | ||
process.nextTick(function() { | ||
self.flushPending(); | ||
}); | ||
} else if (this.pSize > FLUSH_THRESHOLD) { | ||
// Flush in place when threshold reached.. | ||
this.flushPending(); | ||
} | ||
} | ||
} | ||
@@ -430,9 +456,11 @@ | ||
Client.prototype.sendSubscriptions = function() { | ||
var proto; | ||
for(var sid in this.subs) { | ||
var sub = this.subs[sid]; | ||
if (sub.qgroup) { | ||
this.sendCommand('SUB ' + sub.subject + SPC + sub.qgroup + SPC + sid + CR_LF); | ||
proto = [SUB, sub.subject, sub.qgroup, sid + CR_LF]; | ||
} else { | ||
this.sendCommand('SUB ' + sub.subject + SPC + sid + CR_LF); | ||
proto = [SUB, sub.subject, sid + CR_LF]; | ||
} | ||
this.sendCommand(proto.join(SPC)); | ||
} | ||
@@ -546,9 +574,11 @@ } | ||
// FIXME, Array.join? | ||
var proto; | ||
if (typeof qgroup == 'string') { | ||
this.subs[this.ssid].qgroup = qgroup; | ||
this.sendCommand('SUB ' + subject + SPC + qgroup + SPC + this.ssid + CR_LF); | ||
proto = [SUB, subject, qgroup, this.ssid + CR_LF]; | ||
} else { | ||
this.sendCommand('SUB ' + subject + SPC + this.ssid + CR_LF); | ||
proto = [SUB, subject, this.ssid + CR_LF]; | ||
} | ||
this.sendCommand(proto.join(SPC)); | ||
if (max) { | ||
@@ -570,7 +600,11 @@ this.unsubscribe(this.ssid, max); | ||
if (!sid) { return; } | ||
var proto; | ||
if (opt_max) { | ||
this.sendCommand('UNSUB ' + sid + SPC + opt_max + CR_LF); | ||
proto = [UNSUB, sid, opt_max + CR_LF]; | ||
} else { | ||
this.sendCommand('UNSUB ' + sid + CR_LF); | ||
proto = [UNSUB, sid + CR_LF]; | ||
} | ||
this.sendCommand(proto.join(SPC)); | ||
var sub = this.subs[sid]; | ||
@@ -577,0 +611,0 @@ if (sub == null) { |
{ | ||
"name": "nats", | ||
"description": "Node.js client for NATS, a lightweight messaging system", | ||
"version": "0.2.4", | ||
"version": "0.2.5", | ||
"repository": { | ||
@@ -6,0 +6,0 @@ "type" : "git", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
23894
686