Comparing version 0.2.4 to 0.2.5
@@ -356,4 +356,14 @@ 'use strict'; | ||
Connection.prototype.publish = function (routingKey, body, options, callback) { | ||
if (!this._defaultExchange) this._defaultExchange = this.exchange(); | ||
return this._defaultExchange.publish(routingKey, body, options, callback); | ||
if (!this._defaultExchange) { | ||
this._defaultExchange = this.exchange(); | ||
} | ||
var exchange = this._defaultExchange; | ||
if (exchange.state === 'open') { | ||
exchange.publish(routingKey, body, options, callback); | ||
} else { | ||
exchange.once('open', function() { | ||
exchange.publish(routingKey, body, options, callback); | ||
}); | ||
} | ||
}; | ||
@@ -675,3 +685,3 @@ | ||
this.sslConnectionOptions = {}; | ||
if (this.options.ssl.pfxFile) { | ||
@@ -687,8 +697,14 @@ this.sslConnectionOptions.pfx = fs.readFileSync(this.options.ssl.pfxFile); | ||
if (this.options.ssl.caFile) { | ||
this.sslConnectionOptions.ca = fs.readFileSync(this.options.ssl.caFile); | ||
if (Array.isArray(this.options.ssl.caFile)) { | ||
this.sslConnectionOptions.ca = this.options.ssl.caFile.map(function(f){ | ||
return fs.readFileSync(f); | ||
}); | ||
} else { | ||
this.sslConnectionOptions.ca = fs.readFileSync(this.options.ssl.caFile); | ||
} | ||
} | ||
this.sslConnectionOptions.rejectUnauthorized = this.options.ssl.rejectUnauthorized; | ||
this.sslConnectionOptions.passphrase = this.options.ssl.passphrase; | ||
return this.sslConnectionOptions; | ||
@@ -695,0 +711,0 @@ }; |
@@ -14,3 +14,3 @@ module.exports = { | ||
, _64BIT_FLOAT: 'd'.charCodeAt(0) | ||
, VOID: 'v'.charCodeAt(0) | ||
, VOID: 'V'.charCodeAt(0) | ||
, BYTE_ARRAY: 'x'.charCodeAt(0) | ||
@@ -17,0 +17,0 @@ , ARRAY: 'A'.charCodeAt(0) |
@@ -10,2 +10,3 @@ 'use strict'; | ||
var Channel = require('./channel'); | ||
var debug = require('./debug'); | ||
@@ -32,2 +33,5 @@ var Exchange = module.exports = function Exchange (connection, channel, name, options, openCallback) { | ||
// should requeue instead? | ||
// https://www.rabbitmq.com/reliability.html#producer | ||
debug && debug('Exchange error handler triggered, erroring and wiping all unacked publishes'); | ||
for (var id in exchange._unAcked) { | ||
@@ -38,4 +42,4 @@ var task = exchange._unAcked[id]; | ||
} | ||
} | ||
}; | ||
}; | ||
} | ||
@@ -52,2 +56,11 @@ Exchange.prototype._onMethod = function (channel, method, args) { | ||
case methods.channelOpenOk: | ||
this._sequence = null; | ||
if (!this._addedExchangeErrorHandler) { | ||
var errorHandler = createExchangeErrorHandlerFor(this); | ||
this.connection.on('error', errorHandler); | ||
this.on('error', errorHandler); | ||
this._addedExchangeErrorHandler = true; | ||
} | ||
// Pre-baked exchanges don't need to be declared | ||
@@ -57,3 +70,3 @@ if (/^$|(amq\.)/.test(this.name)) { | ||
if (this.options.confirm) { | ||
this.connection._sendMethod(channel, methods.confirmSelect, { noWait: false }); | ||
this._confirmSelect(channel); | ||
return; | ||
@@ -75,19 +88,3 @@ } | ||
if (this.options.confirm) { | ||
this.connection._sendMethod(channel, methods.confirmSelect, | ||
{ noWait: false }); | ||
// We should wait for the confirmSelectOk message to come back | ||
// before we consider the exchange open and fire the open callback. | ||
// the handler for confirmSelectOk initializes this._sequence. | ||
// whose values are used to match published messages with their | ||
// confirmations. If we fired the open callback here, publishes | ||
// could happen before that intialization. If this happens, | ||
// the first message gets a null value set as its "sequence" value | ||
// but the ack comes back with a 1. The 1 doesn't match anything | ||
// in the array of messages awaiting confirmation, and so the | ||
// confirmation doesn't do anything and the callback passed into | ||
// the publish method never fires. I've not full investigated, but | ||
// presumably any message publishes that are performed before | ||
// confirmSelectOk is received would have their confirmation | ||
// behave abnormally. | ||
} else { | ||
this._confirmSelect(channel); | ||
this.state = 'open'; | ||
@@ -111,3 +108,3 @@ | ||
, durable: !!this.options.durable | ||
, autoDelete: !!this.options.autoDelete | ||
, auto_delete: !!this.options.autoDelete | ||
, internal: !!this.options.internal | ||
@@ -121,7 +118,5 @@ , noWait: false | ||
case methods.exchangeDeclareOk: | ||
case methods.exchangeDeclareOk: | ||
if (this.options.confirm) { | ||
this.connection._sendMethod(channel, methods.confirmSelect, | ||
{ noWait: false }); | ||
this._confirmSelect(channel); | ||
} else { | ||
@@ -136,3 +131,2 @@ | ||
} | ||
break; | ||
@@ -170,2 +164,3 @@ | ||
var sequenceNumber = args.deliveryTag.readUInt32BE(4), tag; | ||
debug && debug("basic-ack, sequence: ", sequenceNumber); | ||
@@ -245,11 +240,17 @@ if (sequenceNumber === 0 && args.multiple === true) { | ||
var self = this; | ||
callback = callback || function() {}; | ||
if (this.connection._blocked) { | ||
if (callback) { | ||
return callback(true, new Error('Connection is blocked, server reason: ' + this.connection._blockedReason)); | ||
} else { | ||
return; | ||
} | ||
return callback(true, new Error('Connection is blocked, server reason: ' + this.connection._blockedReason)); | ||
} | ||
if (this.state !== 'open') { | ||
this._sequence = null; | ||
return callback(true, new Error('Can not publish: exchange is not open')); | ||
} | ||
if (this.options.confirm && !this._readyToPublishWithConfirms()) { | ||
return callback(true, new Error('Not yet ready to publish with confirms')); | ||
} | ||
options = _.extend({}, options || {}); | ||
@@ -281,2 +282,4 @@ options.routingKey = routingKey; | ||
if (!this._addedExchangeErrorHandler) { | ||
// if connection fails, we want to ack error all unacked publishes. | ||
this.connection.on('error', createExchangeErrorHandlerFor(this)); | ||
this.on('error', createExchangeErrorHandlerFor(this)); | ||
@@ -286,2 +289,3 @@ this._addedExchangeErrorHandler = true; | ||
debug && debug('awaiting confirmation for ' + this._sequence); | ||
task.sequence = this._sequence; | ||
@@ -428,1 +432,10 @@ this._unAcked[this._sequence] = task; | ||
}; | ||
Exchange.prototype._confirmSelect = function(channel) { | ||
this.connection._sendMethod(channel, methods.confirmSelect, { noWait: false }); | ||
}; | ||
Exchange.prototype._readyToPublishWithConfirms = function() { | ||
return this._sequence != null; | ||
}; | ||
@@ -252,2 +252,5 @@ 'use strict'; | ||
case AMQPTypes.VOID: | ||
return null; | ||
default: | ||
@@ -254,0 +257,0 @@ throw new Error("Unknown field value type " + buffer[buffer.read-1]); |
{ "name" : "amqp" | ||
, "description" : "AMQP driver for node" | ||
, "keywords" : [ "amqp" ] | ||
, "version" : "0.2.4" | ||
, "version" : "0.2.5" | ||
, "author" : { "name" : "Ryan Dahl" } | ||
@@ -31,3 +31,2 @@ , "contributors" : | ||
, "main" : "./amqp" | ||
, "engines" : { "node" : "0.4 || 0.6 || 0.8 || 0.9 || 0.10 || 0.11 || 0.12" } | ||
, "licenses" : | ||
@@ -34,0 +33,0 @@ [ { "type" : "MIT" |
@@ -11,3 +11,3 @@ require('./harness').run(); | ||
q.bind("#"); // bind to queue | ||
q.on('queueBindOk', function() { // wait until queue is bound | ||
@@ -24,3 +24,3 @@ q.on('basicConsumeOk', function () { // wait until consumer is registered | ||
}); | ||
q.subscribe({ routingKeyInPayload: true }, | ||
@@ -27,0 +27,0 @@ function (msg) { // register consumer |
@@ -28,3 +28,3 @@ var conn = require('./harness').createConnection(); | ||
var thrown = false; | ||
conn.addListener('error', function(e){ | ||
exchange.addListener('error', function(e){ | ||
thrown = true; | ||
@@ -31,0 +31,0 @@ assert.equal(e.code, 404); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
500287
89
6120
13