antenna-amqp
Advanced tools
Comparing version 0.1.2 to 0.1.3
@@ -213,2 +213,3 @@ /** | ||
/** | ||
@@ -294,3 +295,2 @@ * Subscribe to messages of `topic` broadcast on the bus. | ||
m.bus = self; | ||
self.emit('message', m); | ||
@@ -302,2 +302,6 @@ }).addCallback(function(ok) { | ||
q.removeListener('error', onError); | ||
// Store the consumerTag so that this queue may also be unsubscribed from. | ||
self._consumerTag = ok.consumerTag; | ||
return cb(); | ||
@@ -321,6 +325,70 @@ }).addErrback(function(err) { | ||
/** | ||
* Desubscribe to messages of `topic` broadcast on the bus. | ||
* | ||
* | ||
* Examples: | ||
* | ||
* bus.unsubscribe('events/on'); | ||
* | ||
* @param {String} queue | ||
* @param {Object} options | ||
* @param {Function} cb | ||
* @api public | ||
*/ | ||
Bus.prototype.unsubscribe = function(topic, options, cb) { | ||
if (typeof options == 'function') { | ||
cb = options; | ||
options = undefined; | ||
} | ||
options = options || {}; | ||
if (!this._queue) { return cb(new NoQueueError('Bus not in listening mode')); } | ||
// AMQP uses period ('.') separators rather than slash ('/') | ||
topic = topic.replace(/\//g, '.'); | ||
var q = this._queue | ||
, exchange = this._exchange.name; | ||
var onError = function(err) { | ||
return cb(err); | ||
}; | ||
debug('unbind %s %s %s', q.name, exchange, topic); | ||
q.unbind(exchange, topic) | ||
.on('queueUnbindOk', function(){ | ||
return cb(); | ||
}); | ||
} | ||
/** | ||
* Unsubscribe to an internal queue. | ||
* | ||
* @param {Object} options | ||
* @param {Function} cb | ||
* @api private | ||
*/ | ||
Bus.prototype._unsubscribe = function(options, cb) { | ||
if (typeof options == 'function') { | ||
cb = options; | ||
options = undefined; | ||
} | ||
options = options || {}; | ||
var self = this | ||
, q = this._queue; | ||
var onError = function(err) { | ||
return cb(err); | ||
}; | ||
debug('unsubscribe %s', q.name); | ||
q.unsubscribe(self._consumerTag); | ||
} | ||
/** | ||
* Expose `Bus`. | ||
*/ | ||
module.exports = Bus; |
{ | ||
"name": "antenna-amqp", | ||
"version": "0.1.2", | ||
"version": "0.1.3", | ||
"description": "AMQP adapter for Antenna.", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
15642
398