Comparing version 0.8.0 to 0.8.2
116
lib/nats.js
@@ -24,3 +24,3 @@ /*! | ||
*/ | ||
var VERSION = '0.8.0', | ||
var VERSION = '0.8.2', | ||
@@ -106,3 +106,10 @@ DEFAULT_PORT = 4222, | ||
/** | ||
* @param {String} message | ||
* @param {String} code | ||
* @param {Error} [chainedError] | ||
* @constructor | ||
* | ||
* @api private | ||
*/ | ||
function NatsError(message, code, chainedError) { | ||
@@ -1190,3 +1197,3 @@ Error.captureStackTrace(this, this.constructor); | ||
* @param {Function} callback | ||
* @return {Mixed} | ||
* @return {Number} | ||
* @api public | ||
@@ -1236,3 +1243,3 @@ */ | ||
* | ||
* @param {Mixed} sid | ||
* @param {Number} sid | ||
* @param {Number} [opt_max] | ||
@@ -1281,4 +1288,7 @@ * @api public | ||
* expected number of messages is reached or the timeout is reached. | ||
* If this function is called with an SID from a multiplexed | ||
* request call, the original timeout handler associated with the multiplexed | ||
* request is replaced with the one provided to this function. | ||
* | ||
* @param {Mixed} sid | ||
* @param {Number} sid | ||
* @param {Number} timeout | ||
@@ -1293,15 +1303,27 @@ * @param {Number} expected | ||
} | ||
var sub = this.subs[sid]; | ||
if (sub === null) { | ||
return; | ||
var sub = null; | ||
// check the sid is not a mux sid - which is always negative | ||
if(sid < 0) { | ||
var conf = this.getMuxRequestConfig(sid); | ||
if(conf && conf.timeout) { | ||
// clear auto-set timeout | ||
clearTimeout(conf.timeout); | ||
} | ||
sub = conf; | ||
} else { | ||
sub = this.subs[sid]; | ||
} | ||
sub.expected = expected; | ||
var that = this; | ||
sub.timeout = setTimeout(function() { | ||
callback(sid); | ||
// if callback fails unsubscribe will leak | ||
that.unsubscribe(sid); | ||
}, timeout); | ||
if(sub) { | ||
sub.expected = expected; | ||
var that = this; | ||
sub.timeout = setTimeout(function () { | ||
callback(sid); | ||
// if callback fails unsubscribe will leak | ||
that.unsubscribe(sid); | ||
}, timeout); | ||
} | ||
}; | ||
/** | ||
@@ -1324,3 +1346,3 @@ * Publish a message with an implicit inbox listener as the reply. Message is optional. | ||
* @param {Object} [opt_options] | ||
* @param {Function} callback | ||
* @param {Function} [callback] | ||
* @return {Number} | ||
@@ -1349,3 +1371,3 @@ * @api public | ||
var client = this; | ||
conf.timer = setTimeout(function() { | ||
conf.timeout = setTimeout(function() { | ||
if(conf.callback) { | ||
@@ -1409,11 +1431,11 @@ conf.callback(new NatsError(REQ_TIMEOUT_MSG_PREFIX + conf.id, REQ_TIMEOUT)); | ||
if (typeof opt_msg === 'number') { | ||
callback = opt_options; | ||
timeout = opt_msg; | ||
callback = opt_options; | ||
opt_options = null; | ||
opt_msg = EMPTY; | ||
opt_options = null; | ||
} | ||
if (typeof opt_options === 'number') { | ||
callback = timeout; | ||
timeout = opt_options; | ||
callback = timeout; | ||
opt_options = null; | ||
@@ -1454,9 +1476,11 @@ } | ||
if(conf) { | ||
if(conf.hasOwnProperty('max_messages')) { | ||
conf.max_messages--; | ||
if (conf.max_messages <= 0) { | ||
if(conf.hasOwnProperty('expected')) { | ||
conf.received++; | ||
if (conf.received >= conf.expected) { | ||
client.cancelMuxRequest(token); | ||
} | ||
} | ||
conf.callback(msg); | ||
if(conf.callback) { | ||
conf.callback(msg); | ||
} | ||
} | ||
@@ -1479,3 +1503,3 @@ }); | ||
*/ | ||
Client.prototype.initMuxRequestDetails = function(callback, max_messages) { | ||
Client.prototype.initMuxRequestDetails = function(callback, expected) { | ||
var ginbox = this.createResponseMux(); | ||
@@ -1485,5 +1509,5 @@ var token = nuid.next(); | ||
var conf = {token: token, callback: callback, inbox: inbox, id: this.respmux.nextID--}; | ||
if(max_messages > 0) { | ||
conf.max_messages = max_messages; | ||
var conf = {token: token, callback: callback, inbox: inbox, id: this.respmux.nextID--, received: 0}; | ||
if(expected > 0) { | ||
conf.expected = expected; | ||
} | ||
@@ -1495,13 +1519,8 @@ | ||
Client.prototype.getMuxRequestConfig = function(token) { | ||
return this.respmux.requestMap[token]; | ||
}; | ||
/** | ||
* Cancels the mux request | ||
* | ||
* @api private | ||
* Returns the mux request configuration | ||
* @param token | ||
* @returns Object | ||
*/ | ||
Client.prototype.cancelMuxRequest = function(token) { | ||
Client.prototype.getMuxRequestConfig = function(token) { | ||
// if the token is a number, we have a fake sid, find the request | ||
@@ -1523,9 +1542,18 @@ if (typeof token === 'number') { | ||
} | ||
return this.respmux.requestMap[token]; | ||
}; | ||
var conf = this.respmux.requestMap[token]; | ||
/** | ||
* Cancels the mux request | ||
* | ||
* @api private | ||
*/ | ||
Client.prototype.cancelMuxRequest = function(token) { | ||
var conf = this.getMuxRequestConfig(token); | ||
if (conf) { | ||
if(conf.timer) { | ||
clearTimeout(conf.timer); | ||
if(conf.timeout) { | ||
clearTimeout(conf.timeout); | ||
} | ||
delete this.respmux.requestMap[token]; | ||
// the token could be sid, so use the one in the conf | ||
delete this.respmux.requestMap[conf.token]; | ||
} | ||
@@ -1541,11 +1569,11 @@ return conf; | ||
if (typeof opt_msg === 'number') { | ||
callback = opt_options; | ||
timeout = opt_msg; | ||
callback = opt_options; | ||
opt_options = null; | ||
opt_msg = EMPTY; | ||
opt_options = null; | ||
} | ||
if (typeof opt_options === 'number') { | ||
callback = timeout; | ||
timeout = opt_options; | ||
callback = timeout; | ||
opt_options = null; | ||
@@ -1552,0 +1580,0 @@ } |
{ | ||
"name": "nats", | ||
"version": "0.8.0", | ||
"version": "0.8.2", | ||
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
2
65550
6
1613