Comparing version 0.2.6 to 0.2.7
@@ -532,2 +532,3 @@ 'use strict'; | ||
this.socket.end(); | ||
this.socket.destroy(); | ||
break; | ||
@@ -711,2 +712,9 @@ | ||
if (this.options.ssl.ciphers) { | ||
this.sslConnectionOptions.ciphers = this.options.ssl.ciphers; | ||
} | ||
if (this.options.ssl.secureProtocol) { | ||
this.sslConnectionOptions.secureProtocol = this.options.ssl.secureProtocol; | ||
} | ||
return this.sslConnectionOptions; | ||
@@ -713,0 +721,0 @@ }; |
@@ -84,11 +84,13 @@ 'use strict'; | ||
this._confirmSelect(channel); | ||
this.state = 'open'; | ||
return; | ||
} | ||
if (this._openCallback) { | ||
this._openCallback(this); | ||
this._openCallback = null; | ||
} | ||
this.state = 'open'; | ||
this.emit('open'); | ||
if (this._openCallback) { | ||
this._openCallback(this); | ||
this._openCallback = null; | ||
} | ||
this.emit('open'); | ||
} else { | ||
@@ -95,0 +97,0 @@ this.connection._sendMethod(channel, methods.exchangeDeclare, |
@@ -37,3 +37,3 @@ 'use strict'; | ||
Queue.prototype.subscribeRaw = function (options, messageListener) { | ||
Queue.prototype.subscribeRaw = function (options, messageListener, oldConsumerTag) { | ||
var self = this; | ||
@@ -43,2 +43,3 @@ | ||
if (typeof options === "function") { | ||
oldConsumerTag = messageListener; | ||
messageListener = options; | ||
@@ -48,3 +49,8 @@ options = {}; | ||
var consumerTag = 'node-amqp-' + process.pid + '-' + Math.random(); | ||
var consumerTag; | ||
if (options.consumerTag !== undefined) { | ||
consumerTag = options.consumerTag + '-' + Math.random(); | ||
} else { | ||
consumerTag = 'node-amqp-' + process.pid + '-' + Math.random(); | ||
} | ||
this.consumerTagListeners[consumerTag] = messageListener; | ||
@@ -64,2 +70,8 @@ | ||
// If this is a reconnection, we should probably tell folks their tag has changed | ||
if (oldConsumerTag) { | ||
debug && debug('Existing consumer tag changed', util.inspect({ oldConsumerTag: oldConsumerTag, consumerTag: consumerTag })); | ||
self.connection.emit('tag.change', { oldConsumerTag: oldConsumerTag, consumerTag: consumerTag }); | ||
} | ||
return this._taskPush(methods.basicConsumeOk, function () { | ||
@@ -123,2 +135,6 @@ self.connection._sendMethod(self.channel, methods.basicConsume, | ||
if (options.consumerTag) { | ||
rawOptions['consumerTag'] = options.consumerTag; | ||
} | ||
return this.subscribeRaw(rawOptions, function (m) { | ||
@@ -274,11 +290,13 @@ var contentType = m.contentType; | ||
// Decrement binding count. | ||
this._bindings[exchangeName][routingKey]--; | ||
if (!this._bindings[exchangeName][routingKey]) { | ||
delete this._bindings[exchangeName][routingKey]; | ||
} | ||
if(this._bindings[exchangeName]) { | ||
// Decrement binding count. | ||
this._bindings[exchangeName][routingKey]--; | ||
if (!this._bindings[exchangeName][routingKey]) { | ||
delete this._bindings[exchangeName][routingKey]; | ||
} | ||
// If there are no more bindings to this exchange, delete the key for the exchange. | ||
if (!_.keys(this._bindings[exchangeName]).length){ | ||
delete this._bindings[exchangeName]; | ||
// If there are no more bindings to this exchange, delete the key for the exchange. | ||
if (!_.keys(this._bindings[exchangeName]).length){ | ||
delete this._bindings[exchangeName]; | ||
} | ||
} | ||
@@ -453,3 +471,3 @@ | ||
if (this.consumerTagOptions[consumerTags[index]]['state'] === 'closed') { | ||
this.subscribeRaw(this.consumerTagOptions[consumerTags[index]], this.consumerTagListeners[consumerTags[index]]); | ||
this.subscribeRaw(this.consumerTagOptions[consumerTags[index]], this.consumerTagListeners[consumerTags[index]], consumerTags[index]); | ||
// Having called subscribeRaw, we are now a new consumer with a new consumerTag. | ||
@@ -456,0 +474,0 @@ delete this.consumerTagListeners[consumerTags[index]]; |
{ "name" : "amqp" | ||
, "description" : "AMQP driver for node" | ||
, "keywords" : [ "amqp" ] | ||
, "version" : "0.2.6" | ||
, "version" : "0.2.7" | ||
, "author" : { "name" : "Ryan Dahl" } | ||
@@ -6,0 +6,0 @@ , "contributors" : |
@@ -16,2 +16,3 @@ [![build status](https://secure.travis-ci.org/postwait/node-amqp.png)](http://travis-ci.org/postwait/node-amqp) | ||
- [connection.disconnect()](#connectiondisconnect) | ||
- [connection.on('tag.change', callback)](#connectionontagchange-callback) | ||
- [Queue](#queue) | ||
@@ -234,4 +235,79 @@ - [connection.queue(name[, options][, openCallback])](#connectionqueuename-options-opencallback) | ||
### connection.on('tag.change', callback) | ||
Fired when an existing consumer tag has changed. Use this event to update your consumer tag references. | ||
When an error or reconnection occurs, any existing consumers will be automatically replaced with new ones. | ||
If your application is holding onto a reference to a consumer tag (e.g. to unsubscribe later) and reconnects, | ||
the held tag will no longer be valid, preventing the application from gracefully unsubscribing. | ||
The `callback` function takes one parameter, `event`, which contains two properties: `oldConsumerTag` and `consumerTag`. | ||
```javascript | ||
var connection = amqp.createConnection({ host: 'dev.rabbitmq.com' }); | ||
// Local references to the exchange, queue and consumer tag | ||
var _exchange = null; | ||
var _queue = null; | ||
var _consumerTag = null; | ||
// Report errors | ||
connection.on('error', function(err) { | ||
console.error('Connection error', err); | ||
}); | ||
// Update our stored tag when it changes | ||
connection.on('tag.change', function(event) { | ||
if (_consumerTag === event.oldConsumerTag) { | ||
_consumerTag = event.consumerTag; | ||
// Consider unsubscribing from the old tag just in case it lingers | ||
_queue.unsubscribe(event.oldConsumerTag); | ||
} | ||
}); | ||
// Initialize the exchange, queue and subscription | ||
connection.on('ready', function() { | ||
connection.exchange('exchange-name', function(exchange) { | ||
_exchange = exchange; | ||
connection.queue('queue-name', function(queue) { | ||
_queue = queue; | ||
// Bind to the exchange | ||
queue.bind('exchange-name', 'routing-key'); | ||
// Subscribe to the queue | ||
queue | ||
.subscribe(function(message) { | ||
// Handle message here | ||
console.log('Got message', message); | ||
queue.shift(false, false); | ||
}) | ||
.addCallback(function(res) { | ||
// Hold on to the consumer tag so we can unsubscribe later | ||
_consumerTag = res.consumerTag; | ||
}) | ||
; | ||
}); | ||
}); | ||
}); | ||
// Some time in the future, you'll want to unsubscribe or shutdown | ||
setTimeout(function() { | ||
if (_queue) { | ||
_queue | ||
.unsubscribe(_consumerTag) | ||
.addCallback(function() { | ||
// unsubscribed | ||
}) | ||
; | ||
} else { | ||
// unsubscribed | ||
} | ||
}, 60000); | ||
``` | ||
## Queue | ||
@@ -238,0 +314,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
506202
92
6207
633