Comparing version 0.5.0 to 0.5.4
@@ -17,4 +17,7 @@ | ||
var invalid2octet = new Buffer('\xc3\x28', 'binary'); | ||
for (var i=0; i<loop; i++) { | ||
nats.publish('test', 'ok'); | ||
nats.publish('test', invalid2octet); | ||
//nats.publish('test', 'ok'); | ||
if (i % hash === 0) { | ||
@@ -21,0 +24,0 @@ process.stdout.write('+'); |
130
lib/nats.js
@@ -25,3 +25,3 @@ /*! | ||
var VERSION = '0.5.0', | ||
var VERSION = '0.5.4', | ||
@@ -54,2 +54,3 @@ DEFAULT_PORT = 4222, | ||
CR_LF_LEN = CR_LF.length, | ||
CR_LF_BUF = new Buffer(CR_LF), | ||
EMPTY = '', | ||
@@ -70,2 +71,8 @@ SPC = ' ', | ||
// Errors | ||
BAD_SUBJECT_ERR = new Error('Subject must be supplied'), | ||
BAD_MSG_ERR = new Error('Message can\'t be a function'), | ||
BAD_REPLY_ERR = new Error('Reply can\'t be a function'), | ||
CONN_CLOSED_ERR = new Error('Connection closed'), | ||
// Pedantic Mode support | ||
@@ -332,6 +339,9 @@ //Q_SUB = /^([^\.\*>\s]+|>$|\*)(\.([^\.\*>\s]+|>$|\*))*$/, // TODO: remove / never used | ||
var wasReconnecting = this.reconnecting; | ||
var event = wasReconnecting === true ? 'reconnect' : 'connect'; | ||
var event = (wasReconnecting === true) ? 'reconnect' : 'connect'; | ||
this.reconnecting = false; | ||
this.reconnects = 0; | ||
this.flushPending(); | ||
this.wasConnected = true; | ||
this.currentServer.didConnect = true; | ||
if (wasReconnecting) { | ||
@@ -341,5 +351,2 @@ this.sendSubscriptions(); | ||
this.wasConnected = true; | ||
this.currentServer.didConnect = true; | ||
this.emit(event, this); | ||
@@ -448,5 +455,4 @@ }; | ||
Client.prototype.createConnection = function() { | ||
// Initialize | ||
this.pongs = []; | ||
this.pending = []; | ||
this.pSize = 0; | ||
this.pstate = AWAITING_CONTROL; | ||
@@ -458,2 +464,8 @@ | ||
// Setup pending if needed. | ||
if (this.pending === null) { | ||
this.pending = []; | ||
this.pSize = 0; | ||
} | ||
// Select a server to connect to. | ||
@@ -465,2 +477,4 @@ this.selectServer(); | ||
this.setupHandlers(); | ||
// Queue up the connect message. | ||
this.sendConnect(); | ||
}; | ||
@@ -482,2 +496,3 @@ | ||
this.server = null; | ||
this.pending = []; | ||
}; | ||
@@ -534,7 +549,35 @@ | ||
this.pending.length === 0 || | ||
this.infoReceived !== true) { return; } | ||
this.infoReceived !== true) { | ||
return; | ||
} | ||
var result = true; | ||
this.stream.write(this.pending.join(EMPTY)); | ||
if (!this.pBufs) { | ||
// All strings, fastest for now. | ||
result = this.stream.write(this.pending.join(EMPTY)); | ||
} else { | ||
// We have some or all Buffers. Figure out if we can optimize. | ||
var allBufs = true; | ||
for (var i=0; i < this.pending.length; i++){ | ||
if (!Buffer.isBuffer(this.pending[i])) { | ||
allBufs = false; | ||
break; | ||
} | ||
} | ||
// If all buffers, concat together and write once. | ||
if (allBufs) { | ||
result = this.stream.write(Buffer.concat(this.pending, this.pSize)); | ||
} else { | ||
// We have a mix, so write each one individually. | ||
for (i=0; i < this.pending.length; i++){ | ||
result = this.stream.write(this.pending[i]) && result; | ||
} | ||
} | ||
} | ||
this.pending = []; | ||
this.pSize = 0; | ||
this.pBufs = undefined; | ||
return result; | ||
}; | ||
@@ -555,3 +598,8 @@ | ||
this.pending.push(cmd); | ||
this.pSize += Buffer.byteLength(cmd); | ||
if (!Buffer.isBuffer(cmd)) { | ||
this.pSize += Buffer.byteLength(cmd); | ||
} else { | ||
this.pSize += cmd.length; | ||
this.pBufs = true; | ||
} | ||
@@ -609,5 +657,4 @@ if (this.connected === true) { | ||
// unpause if needed. | ||
if (client.stream.isPaused()) { | ||
client.stream.resume(); | ||
} | ||
// FIXME(dlc) client.stream.isPaused() causes 0.10 to fail | ||
client.stream.resume(); | ||
@@ -641,3 +688,3 @@ /* jshint -W083 */ | ||
} else if ((m = PONG.exec(buf)) !== null) { | ||
var cb = client.pongs.shift(); | ||
var cb = client.pongs && client.pongs.shift(); | ||
if (cb) { cb(); } // FIXME: Should we check for exceptions? | ||
@@ -670,3 +717,3 @@ } else if ((m = PING.exec(buf)) !== null) { | ||
// Queue up the connect message. | ||
client.sendConnect(); | ||
// client.sendConnect(); | ||
@@ -778,2 +825,3 @@ // Perform a PING/PONG flush to emit connected event. | ||
delete this.subs[this.payload.sid]; | ||
this.emit('unsubscribe', this.payload.sid, sub.subject); | ||
} else if (sub.received > sub.max) { | ||
@@ -800,3 +848,11 @@ this.unsubscribe(this.payload.sid); | ||
Client.prototype.flush = function(opt_callback) { | ||
if (typeof opt_callback === 'function') { | ||
if (this.closed) { | ||
if (typeof opt_callback === 'function') { | ||
opt_callback(CONN_CLOSED_ERR); | ||
return; | ||
} else { | ||
throw(CONN_CLOSED_ERR); | ||
} | ||
} | ||
if (this.pongs) { | ||
this.pongs.push(opt_callback); | ||
@@ -812,3 +868,3 @@ this.sendCommand(PING_REQUEST); | ||
* @param {String} subject | ||
* @param {String} msg | ||
* @param {String} opt_msg | ||
* @param {String} opt_reply | ||
@@ -820,6 +876,19 @@ * @param {Function} opt_callback | ||
Client.prototype.publish = function(subject, msg, opt_reply, opt_callback) { | ||
// They only supplied a callback function. | ||
if (typeof subject === 'function') { | ||
opt_callback = subject; | ||
subject = undefined; | ||
} | ||
if (!msg) { msg = EMPTY; } | ||
if (!subject) { | ||
if (opt_callback) { | ||
opt_callback(BAD_SUBJECT_ERR); | ||
} else { | ||
throw(BAD_SUBJECT_ERR); | ||
} | ||
} | ||
if (typeof msg === 'function') { | ||
if (opt_callback || opt_reply) { | ||
throw(new Error('Message can\'t be a function')); | ||
opt_callback(BAD_MSG_ERR); | ||
return; | ||
} | ||
@@ -832,3 +901,4 @@ opt_callback = msg; | ||
if (opt_callback) { | ||
throw(new Error('Reply can\'t be a function')); | ||
opt_callback(BAD_REPLY_ERR); | ||
return; | ||
} | ||
@@ -846,6 +916,18 @@ opt_callback = opt_reply; | ||
} | ||
this.sendCommand(psub + Buffer.byteLength(msg) + CR_LF + msg + CR_LF); | ||
// Need to treat sending buffers different. | ||
if (!Buffer.isBuffer(msg)) { | ||
this.sendCommand(psub + Buffer.byteLength(msg) + CR_LF + msg + CR_LF); | ||
} else { | ||
var b = new Buffer(psub.length + msg.length + (2 * CR_LF_LEN) + msg.length.toString().length); | ||
var len = b.write(psub + msg.length + CR_LF); | ||
msg.copy(b, len); | ||
b.write(CR_LF, len + msg.length); | ||
this.sendCommand(b); | ||
} | ||
if (opt_callback !== undefined) { | ||
this.flush(opt_callback); | ||
} else if (this.closed) { | ||
throw(CONN_CLOSED_ERR); | ||
} | ||
@@ -866,6 +948,9 @@ }; | ||
Client.prototype.subscribe = function(subject, opts, callback) { | ||
if (this.closed) { | ||
throw(CONN_CLOSED_ERR); | ||
} | ||
var qgroup, max; | ||
if (typeof opts === 'function') { | ||
callback = opts; | ||
opts = null; | ||
opts = undefined; | ||
} else if (opts && typeof opts === 'object') { | ||
@@ -886,3 +971,5 @@ // FIXME, check exists, error otherwise.. | ||
} | ||
this.sendCommand(proto.join(SPC)); | ||
this.emit('subscribe', this.ssid, subject, opts); | ||
@@ -921,2 +1008,3 @@ if (max) { | ||
delete this.subs[sid]; | ||
this.emit('unsubscribe', sid, sub.subject); | ||
} | ||
@@ -923,0 +1011,0 @@ }; |
{ | ||
"name": "nats", | ||
"version": "0.5.0", | ||
"version": "0.5.4", | ||
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
@@ -152,2 +152,8 @@ # NATS - Node.js Client | ||
// If you want to make sure NATS yields during the processing | ||
// of messages, you can use an option to specify a yieldTime in ms. | ||
// During the processing of the inbound stream, we will yield if we | ||
// spend more then yieldTime milliseconds processing. | ||
var nc = nats.connect({port: PORT, yieldTime: 10}); | ||
// Timeouts for subscriptions | ||
@@ -154,0 +160,0 @@ var sid = nats.subscribe('foo', function() { |
@@ -51,2 +51,9 @@ /* jslint node: true */ | ||
it('should connect with proper credentials as server url', function(done) { | ||
var nc = NATS.connect({'servers':[authUrl]}); | ||
nc.on('connect', function(/*nc*/) { | ||
setTimeout(done, 100); | ||
}); | ||
}); | ||
}); |
@@ -154,2 +154,2 @@ /* jslint node: true */ | ||
}); | ||
}); |
@@ -62,2 +62,36 @@ /* jslint node: true */ | ||
}); | ||
it('should still receive publish when some servers are invalid', function(done){ | ||
var natsServers = ['nats://localhost:22222', uri, 'nats://localhost:22223']; | ||
var ua = NATS.connect({servers: natsServers}); | ||
var ub = NATS.connect({servers: natsServers}); | ||
var recvMsg = ''; | ||
ua.subscribe('topic1', function(msg, reply, subject){ | ||
recvMsg = msg; | ||
}); | ||
setTimeout(function(){ | ||
ub.publish('topic1', 'hello'); | ||
}, 100 * 1); | ||
setTimeout(function(){ | ||
recvMsg.should.equal('hello'); | ||
done(); | ||
}, 100 * 2); | ||
}); | ||
it('should still receive publish when some servers[noRandomize] are invalid', function(done){ | ||
var natsServers = ['nats://localhost:22222', uri, 'nats://localhost:22223']; | ||
var ua = NATS.connect({servers: natsServers, noRandomize:true}); | ||
var ub = NATS.connect({servers: natsServers, noRandomize:true}); | ||
var recvMsg = ""; | ||
ua.subscribe('topic1', function(msg, reply, subject){ | ||
recvMsg = msg; | ||
}); | ||
setTimeout(function(){ | ||
ub.publish('topic1', 'hello'); | ||
}, 100 * 1); | ||
setTimeout(function(){ | ||
recvMsg.should.equal('hello'); | ||
done(); | ||
}, 100 * 2); | ||
}); | ||
}); |
@@ -213,2 +213,24 @@ /* jslint node: true */ | ||
}); | ||
it('should not crash when sending a publish with a callback after connection loss', function(done) { | ||
var nc = NATS.connect({'port':PORT, 'reconnectTimeWait':WAIT}); | ||
var startTime; | ||
should.exist(nc); | ||
nc.on('connect', function() { | ||
server.kill(); | ||
startTime = new Date(); | ||
}); | ||
nc.on('disconnect', function() { | ||
nc.publish('foo', 'bar', 'reply', function() { | ||
// fails to get here, but should not crash | ||
}); | ||
server = nsc.start_server(PORT); | ||
}); | ||
nc.on('reconnect', function() { | ||
nc.flush(function() { | ||
nc.close(); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); |
@@ -11,3 +11,3 @@ /* jslint node: true */ | ||
var PORT = 1429; | ||
var PORT = 1430; | ||
var server; | ||
@@ -77,2 +77,2 @@ | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
108367
42
2845
215