Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqp

Package Overview
Dependencies
Maintainers
2
Versions
18
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp - npm Package Compare versions

Comparing version 0.2.4 to 0.2.5

test/test-exchange-publish-closed.js

28

lib/connection.js

@@ -356,4 +356,14 @@ 'use strict';

Connection.prototype.publish = function (routingKey, body, options, callback) {
if (!this._defaultExchange) this._defaultExchange = this.exchange();
return this._defaultExchange.publish(routingKey, body, options, callback);
if (!this._defaultExchange) {
this._defaultExchange = this.exchange();
}
var exchange = this._defaultExchange;
if (exchange.state === 'open') {
exchange.publish(routingKey, body, options, callback);
} else {
exchange.once('open', function() {
exchange.publish(routingKey, body, options, callback);
});
}
};

@@ -675,3 +685,3 @@

this.sslConnectionOptions = {};
if (this.options.ssl.pfxFile) {

@@ -687,8 +697,14 @@ this.sslConnectionOptions.pfx = fs.readFileSync(this.options.ssl.pfxFile);

if (this.options.ssl.caFile) {
this.sslConnectionOptions.ca = fs.readFileSync(this.options.ssl.caFile);
if (Array.isArray(this.options.ssl.caFile)) {
this.sslConnectionOptions.ca = this.options.ssl.caFile.map(function(f){
return fs.readFileSync(f);
});
} else {
this.sslConnectionOptions.ca = fs.readFileSync(this.options.ssl.caFile);
}
}
this.sslConnectionOptions.rejectUnauthorized = this.options.ssl.rejectUnauthorized;
this.sslConnectionOptions.passphrase = this.options.ssl.passphrase;
return this.sslConnectionOptions;

@@ -695,0 +711,0 @@ };

@@ -14,3 +14,3 @@ module.exports = {

, _64BIT_FLOAT: 'd'.charCodeAt(0)
, VOID: 'v'.charCodeAt(0)
, VOID: 'V'.charCodeAt(0)
, BYTE_ARRAY: 'x'.charCodeAt(0)

@@ -17,0 +17,0 @@ , ARRAY: 'A'.charCodeAt(0)

@@ -10,2 +10,3 @@ 'use strict';

var Channel = require('./channel');
var debug = require('./debug');

@@ -32,2 +33,5 @@ var Exchange = module.exports = function Exchange (connection, channel, name, options, openCallback) {

// should requeue instead?
// https://www.rabbitmq.com/reliability.html#producer
debug && debug('Exchange error handler triggered, erroring and wiping all unacked publishes');
for (var id in exchange._unAcked) {

@@ -38,4 +42,4 @@ var task = exchange._unAcked[id];

}
}
};
};
}

@@ -52,2 +56,11 @@ Exchange.prototype._onMethod = function (channel, method, args) {

case methods.channelOpenOk:
this._sequence = null;
if (!this._addedExchangeErrorHandler) {
var errorHandler = createExchangeErrorHandlerFor(this);
this.connection.on('error', errorHandler);
this.on('error', errorHandler);
this._addedExchangeErrorHandler = true;
}
// Pre-baked exchanges don't need to be declared

@@ -57,3 +70,3 @@ if (/^$|(amq\.)/.test(this.name)) {

if (this.options.confirm) {
this.connection._sendMethod(channel, methods.confirmSelect, { noWait: false });
this._confirmSelect(channel);
return;

@@ -75,19 +88,3 @@ }

if (this.options.confirm) {
this.connection._sendMethod(channel, methods.confirmSelect,
{ noWait: false });
// We should wait for the confirmSelectOk message to come back
// before we consider the exchange open and fire the open callback.
// the handler for confirmSelectOk initializes this._sequence.
// whose values are used to match published messages with their
// confirmations. If we fired the open callback here, publishes
// could happen before that intialization. If this happens,
// the first message gets a null value set as its "sequence" value
// but the ack comes back with a 1. The 1 doesn't match anything
// in the array of messages awaiting confirmation, and so the
// confirmation doesn't do anything and the callback passed into
// the publish method never fires. I've not full investigated, but
// presumably any message publishes that are performed before
// confirmSelectOk is received would have their confirmation
// behave abnormally.
} else {
this._confirmSelect(channel);
this.state = 'open';

@@ -111,3 +108,3 @@

, durable: !!this.options.durable
, autoDelete: !!this.options.autoDelete
, auto_delete: !!this.options.autoDelete
, internal: !!this.options.internal

@@ -121,7 +118,5 @@ , noWait: false

case methods.exchangeDeclareOk:
case methods.exchangeDeclareOk:
if (this.options.confirm) {
this.connection._sendMethod(channel, methods.confirmSelect,
{ noWait: false });
this._confirmSelect(channel);
} else {

@@ -136,3 +131,2 @@

}
break;

@@ -170,2 +164,3 @@

var sequenceNumber = args.deliveryTag.readUInt32BE(4), tag;
debug && debug("basic-ack, sequence: ", sequenceNumber);

@@ -245,11 +240,17 @@ if (sequenceNumber === 0 && args.multiple === true) {

var self = this;
callback = callback || function() {};
if (this.connection._blocked) {
if (callback) {
return callback(true, new Error('Connection is blocked, server reason: ' + this.connection._blockedReason));
} else {
return;
}
return callback(true, new Error('Connection is blocked, server reason: ' + this.connection._blockedReason));
}
if (this.state !== 'open') {
this._sequence = null;
return callback(true, new Error('Can not publish: exchange is not open'));
}
if (this.options.confirm && !this._readyToPublishWithConfirms()) {
return callback(true, new Error('Not yet ready to publish with confirms'));
}
options = _.extend({}, options || {});

@@ -281,2 +282,4 @@ options.routingKey = routingKey;

if (!this._addedExchangeErrorHandler) {
// if connection fails, we want to ack error all unacked publishes.
this.connection.on('error', createExchangeErrorHandlerFor(this));
this.on('error', createExchangeErrorHandlerFor(this));

@@ -286,2 +289,3 @@ this._addedExchangeErrorHandler = true;

debug && debug('awaiting confirmation for ' + this._sequence);
task.sequence = this._sequence;

@@ -428,1 +432,10 @@ this._unAcked[this._sequence] = task;

};
Exchange.prototype._confirmSelect = function(channel) {
this.connection._sendMethod(channel, methods.confirmSelect, { noWait: false });
};
Exchange.prototype._readyToPublishWithConfirms = function() {
return this._sequence != null;
};

@@ -252,2 +252,5 @@ 'use strict';

case AMQPTypes.VOID:
return null;
default:

@@ -254,0 +257,0 @@ throw new Error("Unknown field value type " + buffer[buffer.read-1]);

{ "name" : "amqp"
, "description" : "AMQP driver for node"
, "keywords" : [ "amqp" ]
, "version" : "0.2.4"
, "version" : "0.2.5"
, "author" : { "name" : "Ryan Dahl" }

@@ -31,3 +31,2 @@ , "contributors" :

, "main" : "./amqp"
, "engines" : { "node" : "0.4 || 0.6 || 0.8 || 0.9 || 0.10 || 0.11 || 0.12" }
, "licenses" :

@@ -34,0 +33,0 @@ [ { "type" : "MIT"

@@ -11,3 +11,3 @@ require('./harness').run();

q.bind("#"); // bind to queue
q.on('queueBindOk', function() { // wait until queue is bound

@@ -24,3 +24,3 @@ q.on('basicConsumeOk', function () { // wait until consumer is registered

});
q.subscribe({ routingKeyInPayload: true },

@@ -27,0 +27,0 @@ function (msg) { // register consumer

@@ -28,3 +28,3 @@ var conn = require('./harness').createConnection();

var thrown = false;
conn.addListener('error', function(e){
exchange.addListener('error', function(e){
thrown = true;

@@ -31,0 +31,0 @@ assert.equal(e.code, 404);

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc