Comparing version 0.2.2 to 0.2.4
@@ -11,4 +11,3 @@ /*! | ||
var sys = require('sys'), | ||
net = require('net'), | ||
var net = require('net'), | ||
url = require('url'), | ||
@@ -22,3 +21,3 @@ util = require('util'), | ||
var VERSION = '0.2.2', | ||
var VERSION = '0.2.4', | ||
@@ -53,2 +52,3 @@ DEFAULT_PORT = 4222, | ||
SPC = ' ', | ||
PUB = 'PUB', | ||
@@ -214,4 +214,5 @@ // Responses | ||
var stream = client.stream = net.createConnection(client.url.port, client.url.hostname); | ||
stream.setEncoding('utf8'); // FIXME, Force Strings | ||
//stream.setEncoding('utf8'); // FIXME, Force Strings | ||
stream.on('connect', function() { | ||
@@ -255,3 +256,4 @@ var wasReconnecting = client.reconnecting; | ||
if (client.reconnecting === true) { | ||
if (client.closed === true || client.reconnects >= client.options.maxReconnectAttempts) { | ||
if (client.closed === true || | ||
client.reconnects >= client.options.maxReconnectAttempts) { | ||
client.emit('close'); | ||
@@ -266,2 +268,3 @@ } else { | ||
var m; | ||
client.inbound = client.inbound ? client.inbound + data : data; | ||
@@ -273,3 +276,8 @@ | ||
if (m = MSG.exec(client.inbound)) { | ||
client.payload = {subj : m[1], sid : m[2], reply : m[4], size : parseInt(m[5], 10)} | ||
client.payload = { | ||
subj : m[1], | ||
sid : m[2], | ||
reply : m[4], | ||
size : parseInt(m[5], 10) | ||
} | ||
client.pstate = AWAITING_MSG_PAYLOAD; | ||
@@ -295,6 +303,10 @@ } else if (m = OK.exec(client.inbound)) { | ||
case AWAITING_MSG_PAYLOAD: | ||
if (client.inbound.length < client.payload.size + CR_LF_LEN) { return; } | ||
client.payload.msg = client.inbound.slice(0, client.payload.size); | ||
if (client.inbound.length < client.payload.size + CR_LF_LEN) { | ||
return; | ||
} | ||
// FIXME, may be inefficient. | ||
client.payload.msg = client.inbound.slice(0, client.payload.size).toString(); | ||
if (client.inbound.length == client.payload.size + CR_LF_LEN) { | ||
@@ -391,6 +403,3 @@ client.inbound = null; | ||
} | ||
var l = this.pending.length; | ||
for (var i=0; i < l; i++) { | ||
this.stream.write(this.pending[i]); | ||
} | ||
this.stream.write(this.pending.join(EMPTY)); | ||
this.pending = null; | ||
@@ -489,3 +498,3 @@ } | ||
msg = EMPTY; | ||
opt_reply = null; | ||
opt_reply = undefined; | ||
} | ||
@@ -495,15 +504,15 @@ if (typeof opt_reply == 'function') { | ||
opt_callback = opt_reply; | ||
opt_reply = null; | ||
opt_reply = undefined; | ||
} | ||
// FIXME Array.join? | ||
if (opt_reply) { | ||
this.sendCommand('PUB ' + subject + SPC + opt_reply + SPC + msg.length + CR_LF); | ||
} else { | ||
this.sendCommand('PUB ' + subject + SPC + msg.length + CR_LF); | ||
var proto = [PUB, subject]; | ||
var msg = [Buffer.byteLength(msg), CR_LF, msg, CR_LF]; | ||
if (opt_reply !== undefined) { | ||
proto.push(opt_reply); | ||
} | ||
// Optimize? | ||
this.sendCommand(msg); | ||
this.sendCommand(CR_LF); | ||
if (opt_callback) { | ||
this.sendCommand(proto.concat(msg.join(EMPTY)).join(SPC)); | ||
if (opt_callback !== undefined) { | ||
this.flush(opt_callback); | ||
@@ -510,0 +519,0 @@ } |
{ | ||
"name": "nats", | ||
"description": "Node.js client for NATS, a lightweight messaging system", | ||
"version": "0.2.2", | ||
"version": "0.2.4", | ||
"repository": { | ||
@@ -6,0 +6,0 @@ "type" : "git", |
@@ -91,3 +91,5 @@ # Node_Nats | ||
// Flush connection to server, callback fires when all messages have been processed. | ||
nats.flush(function() { }); | ||
nats.flush(function() { | ||
console.log('All clear!'); | ||
}); | ||
@@ -101,3 +103,3 @@ // Timeouts for subscriptions | ||
nats.timeout(sid, timeout_ms, expected, function() { | ||
timeout_recvd = true; | ||
timeout = true; | ||
}); | ||
@@ -104,0 +106,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Shell access
Supply chain riskThis module accesses the system shell. Accessing the system shell increases the risk of executing arbitrary code.
Found 1 instance in 1 package
145
1
23145
12
658