Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqp

Package Overview
Dependencies
Maintainers
2
Versions
18
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp - npm Package Compare versions

Comparing version 0.2.6 to 0.2.7

test/test-consumer-tag.js

8

lib/connection.js

@@ -532,2 +532,3 @@ 'use strict';

this.socket.end();
this.socket.destroy();
break;

@@ -711,2 +712,9 @@

if (this.options.ssl.ciphers) {
this.sslConnectionOptions.ciphers = this.options.ssl.ciphers;
}
if (this.options.ssl.secureProtocol) {
this.sslConnectionOptions.secureProtocol = this.options.ssl.secureProtocol;
}
return this.sslConnectionOptions;

@@ -713,0 +721,0 @@ };

14

lib/exchange.js

@@ -84,11 +84,13 @@ 'use strict';

this._confirmSelect(channel);
this.state = 'open';
return;
}
if (this._openCallback) {
this._openCallback(this);
this._openCallback = null;
}
this.state = 'open';
this.emit('open');
if (this._openCallback) {
this._openCallback(this);
this._openCallback = null;
}
this.emit('open');
} else {

@@ -95,0 +97,0 @@ this.connection._sendMethod(channel, methods.exchangeDeclare,

@@ -37,3 +37,3 @@ 'use strict';

Queue.prototype.subscribeRaw = function (options, messageListener) {
Queue.prototype.subscribeRaw = function (options, messageListener, oldConsumerTag) {
var self = this;

@@ -43,2 +43,3 @@

if (typeof options === "function") {
oldConsumerTag = messageListener;
messageListener = options;

@@ -48,3 +49,8 @@ options = {};

var consumerTag = 'node-amqp-' + process.pid + '-' + Math.random();
var consumerTag;
if (options.consumerTag !== undefined) {
consumerTag = options.consumerTag + '-' + Math.random();
} else {
consumerTag = 'node-amqp-' + process.pid + '-' + Math.random();
}
this.consumerTagListeners[consumerTag] = messageListener;

@@ -64,2 +70,8 @@

// If this is a reconnection, we should probably tell folks their tag has changed
if (oldConsumerTag) {
debug && debug('Existing consumer tag changed', util.inspect({ oldConsumerTag: oldConsumerTag, consumerTag: consumerTag }));
self.connection.emit('tag.change', { oldConsumerTag: oldConsumerTag, consumerTag: consumerTag });
}
return this._taskPush(methods.basicConsumeOk, function () {

@@ -123,2 +135,6 @@ self.connection._sendMethod(self.channel, methods.basicConsume,

if (options.consumerTag) {
rawOptions['consumerTag'] = options.consumerTag;
}
return this.subscribeRaw(rawOptions, function (m) {

@@ -274,11 +290,13 @@ var contentType = m.contentType;

// Decrement binding count.
this._bindings[exchangeName][routingKey]--;
if (!this._bindings[exchangeName][routingKey]) {
delete this._bindings[exchangeName][routingKey];
}
if(this._bindings[exchangeName]) {
// Decrement binding count.
this._bindings[exchangeName][routingKey]--;
if (!this._bindings[exchangeName][routingKey]) {
delete this._bindings[exchangeName][routingKey];
}
// If there are no more bindings to this exchange, delete the key for the exchange.
if (!_.keys(this._bindings[exchangeName]).length){
delete this._bindings[exchangeName];
// If there are no more bindings to this exchange, delete the key for the exchange.
if (!_.keys(this._bindings[exchangeName]).length){
delete this._bindings[exchangeName];
}
}

@@ -453,3 +471,3 @@

if (this.consumerTagOptions[consumerTags[index]]['state'] === 'closed') {
this.subscribeRaw(this.consumerTagOptions[consumerTags[index]], this.consumerTagListeners[consumerTags[index]]);
this.subscribeRaw(this.consumerTagOptions[consumerTags[index]], this.consumerTagListeners[consumerTags[index]], consumerTags[index]);
// Having called subscribeRaw, we are now a new consumer with a new consumerTag.

@@ -456,0 +474,0 @@ delete this.consumerTagListeners[consumerTags[index]];

{ "name" : "amqp"
, "description" : "AMQP driver for node"
, "keywords" : [ "amqp" ]
, "version" : "0.2.6"
, "version" : "0.2.7"
, "author" : { "name" : "Ryan Dahl" }

@@ -6,0 +6,0 @@ , "contributors" :

@@ -16,2 +16,3 @@ [![build status](https://secure.travis-ci.org/postwait/node-amqp.png)](http://travis-ci.org/postwait/node-amqp)

- [connection.disconnect()](#connectiondisconnect)
- [connection.on('tag.change', callback)](#connectionontagchange-callback)
- [Queue](#queue)

@@ -234,4 +235,79 @@ - [connection.queue(name[, options][, openCallback])](#connectionqueuename-options-opencallback)

### connection.on('tag.change', callback)
Fired when an existing consumer tag has changed. Use this event to update your consumer tag references.
When an error or reconnection occurs, any existing consumers will be automatically replaced with new ones.
If your application is holding onto a reference to a consumer tag (e.g. to unsubscribe later) and reconnects,
the held tag will no longer be valid, preventing the application from gracefully unsubscribing.
The `callback` function takes one parameter, `event`, which contains two properties: `oldConsumerTag` and `consumerTag`.
```javascript
var connection = amqp.createConnection({ host: 'dev.rabbitmq.com' });
// Local references to the exchange, queue and consumer tag
var _exchange = null;
var _queue = null;
var _consumerTag = null;
// Report errors
connection.on('error', function(err) {
console.error('Connection error', err);
});
// Update our stored tag when it changes
connection.on('tag.change', function(event) {
if (_consumerTag === event.oldConsumerTag) {
_consumerTag = event.consumerTag;
// Consider unsubscribing from the old tag just in case it lingers
_queue.unsubscribe(event.oldConsumerTag);
}
});
// Initialize the exchange, queue and subscription
connection.on('ready', function() {
connection.exchange('exchange-name', function(exchange) {
_exchange = exchange;
connection.queue('queue-name', function(queue) {
_queue = queue;
// Bind to the exchange
queue.bind('exchange-name', 'routing-key');
// Subscribe to the queue
queue
.subscribe(function(message) {
// Handle message here
console.log('Got message', message);
queue.shift(false, false);
})
.addCallback(function(res) {
// Hold on to the consumer tag so we can unsubscribe later
_consumerTag = res.consumerTag;
})
;
});
});
});
// Some time in the future, you'll want to unsubscribe or shutdown
setTimeout(function() {
if (_queue) {
_queue
.unsubscribe(_consumerTag)
.addCallback(function() {
// unsubscribed
})
;
} else {
// unsubscribed
}
}, 60000);
```
## Queue

@@ -238,0 +314,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc