Comparing version
@@ -532,2 +532,3 @@ 'use strict'; | ||
this.socket.end(); | ||
this.socket.destroy(); | ||
break; | ||
@@ -711,2 +712,9 @@ | ||
if (this.options.ssl.ciphers) { | ||
this.sslConnectionOptions.ciphers = this.options.ssl.ciphers; | ||
} | ||
if (this.options.ssl.secureProtocol) { | ||
this.sslConnectionOptions.secureProtocol = this.options.ssl.secureProtocol; | ||
} | ||
return this.sslConnectionOptions; | ||
@@ -713,0 +721,0 @@ }; |
@@ -84,11 +84,13 @@ 'use strict'; | ||
this._confirmSelect(channel); | ||
this.state = 'open'; | ||
return; | ||
} | ||
if (this._openCallback) { | ||
this._openCallback(this); | ||
this._openCallback = null; | ||
} | ||
this.state = 'open'; | ||
this.emit('open'); | ||
if (this._openCallback) { | ||
this._openCallback(this); | ||
this._openCallback = null; | ||
} | ||
this.emit('open'); | ||
} else { | ||
@@ -95,0 +97,0 @@ this.connection._sendMethod(channel, methods.exchangeDeclare, |
@@ -37,3 +37,3 @@ 'use strict'; | ||
Queue.prototype.subscribeRaw = function (options, messageListener) { | ||
Queue.prototype.subscribeRaw = function (options, messageListener, oldConsumerTag) { | ||
var self = this; | ||
@@ -43,2 +43,3 @@ | ||
if (typeof options === "function") { | ||
oldConsumerTag = messageListener; | ||
messageListener = options; | ||
@@ -48,3 +49,8 @@ options = {}; | ||
var consumerTag = 'node-amqp-' + process.pid + '-' + Math.random(); | ||
var consumerTag; | ||
if (options.consumerTag !== undefined) { | ||
consumerTag = options.consumerTag + '-' + Math.random(); | ||
} else { | ||
consumerTag = 'node-amqp-' + process.pid + '-' + Math.random(); | ||
} | ||
this.consumerTagListeners[consumerTag] = messageListener; | ||
@@ -64,2 +70,8 @@ | ||
// If this is a reconnection, we should probably tell folks their tag has changed | ||
if (oldConsumerTag) { | ||
debug && debug('Existing consumer tag changed', util.inspect({ oldConsumerTag: oldConsumerTag, consumerTag: consumerTag })); | ||
self.connection.emit('tag.change', { oldConsumerTag: oldConsumerTag, consumerTag: consumerTag }); | ||
} | ||
return this._taskPush(methods.basicConsumeOk, function () { | ||
@@ -123,2 +135,6 @@ self.connection._sendMethod(self.channel, methods.basicConsume, | ||
if (options.consumerTag) { | ||
rawOptions['consumerTag'] = options.consumerTag; | ||
} | ||
return this.subscribeRaw(rawOptions, function (m) { | ||
@@ -274,11 +290,13 @@ var contentType = m.contentType; | ||
// Decrement binding count. | ||
this._bindings[exchangeName][routingKey]--; | ||
if (!this._bindings[exchangeName][routingKey]) { | ||
delete this._bindings[exchangeName][routingKey]; | ||
} | ||
if(this._bindings[exchangeName]) { | ||
// Decrement binding count. | ||
this._bindings[exchangeName][routingKey]--; | ||
if (!this._bindings[exchangeName][routingKey]) { | ||
delete this._bindings[exchangeName][routingKey]; | ||
} | ||
// If there are no more bindings to this exchange, delete the key for the exchange. | ||
if (!_.keys(this._bindings[exchangeName]).length){ | ||
delete this._bindings[exchangeName]; | ||
// If there are no more bindings to this exchange, delete the key for the exchange. | ||
if (!_.keys(this._bindings[exchangeName]).length){ | ||
delete this._bindings[exchangeName]; | ||
} | ||
} | ||
@@ -453,3 +471,3 @@ | ||
if (this.consumerTagOptions[consumerTags[index]]['state'] === 'closed') { | ||
this.subscribeRaw(this.consumerTagOptions[consumerTags[index]], this.consumerTagListeners[consumerTags[index]]); | ||
this.subscribeRaw(this.consumerTagOptions[consumerTags[index]], this.consumerTagListeners[consumerTags[index]], consumerTags[index]); | ||
// Having called subscribeRaw, we are now a new consumer with a new consumerTag. | ||
@@ -456,0 +474,0 @@ delete this.consumerTagListeners[consumerTags[index]]; |
{ "name" : "amqp" | ||
, "description" : "AMQP driver for node" | ||
, "keywords" : [ "amqp" ] | ||
, "version" : "0.2.6" | ||
, "version" : "0.2.7" | ||
, "author" : { "name" : "Ryan Dahl" } | ||
@@ -6,0 +6,0 @@ , "contributors" : |
@@ -16,2 +16,3 @@ [](http://travis-ci.org/postwait/node-amqp) | ||
- [connection.disconnect()](#connectiondisconnect) | ||
- [connection.on('tag.change', callback)](#connectionontagchange-callback) | ||
- [Queue](#queue) | ||
@@ -234,4 +235,79 @@ - [connection.queue(name[, options][, openCallback])](#connectionqueuename-options-opencallback) | ||
### connection.on('tag.change', callback) | ||
Fired when an existing consumer tag has changed. Use this event to update your consumer tag references. | ||
When an error or reconnection occurs, any existing consumers will be automatically replaced with new ones. | ||
If your application is holding onto a reference to a consumer tag (e.g. to unsubscribe later) and reconnects, | ||
the held tag will no longer be valid, preventing the application from gracefully unsubscribing. | ||
The `callback` function takes one parameter, `event`, which contains two properties: `oldConsumerTag` and `consumerTag`. | ||
```javascript | ||
var connection = amqp.createConnection({ host: 'dev.rabbitmq.com' }); | ||
// Local references to the exchange, queue and consumer tag | ||
var _exchange = null; | ||
var _queue = null; | ||
var _consumerTag = null; | ||
// Report errors | ||
connection.on('error', function(err) { | ||
console.error('Connection error', err); | ||
}); | ||
// Update our stored tag when it changes | ||
connection.on('tag.change', function(event) { | ||
if (_consumerTag === event.oldConsumerTag) { | ||
_consumerTag = event.consumerTag; | ||
// Consider unsubscribing from the old tag just in case it lingers | ||
_queue.unsubscribe(event.oldConsumerTag); | ||
} | ||
}); | ||
// Initialize the exchange, queue and subscription | ||
connection.on('ready', function() { | ||
connection.exchange('exchange-name', function(exchange) { | ||
_exchange = exchange; | ||
connection.queue('queue-name', function(queue) { | ||
_queue = queue; | ||
// Bind to the exchange | ||
queue.bind('exchange-name', 'routing-key'); | ||
// Subscribe to the queue | ||
queue | ||
.subscribe(function(message) { | ||
// Handle message here | ||
console.log('Got message', message); | ||
queue.shift(false, false); | ||
}) | ||
.addCallback(function(res) { | ||
// Hold on to the consumer tag so we can unsubscribe later | ||
_consumerTag = res.consumerTag; | ||
}) | ||
; | ||
}); | ||
}); | ||
}); | ||
// Some time in the future, you'll want to unsubscribe or shutdown | ||
setTimeout(function() { | ||
if (_queue) { | ||
_queue | ||
.unsubscribe(_consumerTag) | ||
.addCallback(function() { | ||
// unsubscribed | ||
}) | ||
; | ||
} else { | ||
// unsubscribed | ||
} | ||
}, 60000); | ||
``` | ||
## Queue | ||
@@ -238,0 +314,0 @@ |
Sorry, the diff of this file is not supported yet
URL strings
Supply chain riskPackage contains fragments of external URLs or IP addresses, which the package may be accessing at runtime.
Found 1 instance in 1 package
URL strings
Supply chain riskPackage contains fragments of external URLs or IP addresses, which the package may be accessing at runtime.
Found 1 instance in 1 package
506202
0.98%92
3.37%6398
1.11%633
13.64%