amqp-connection-manager
Advanced tools
Comparing version 2.3.1 to 2.3.2
@@ -0,1 +1,8 @@ | ||
## [2.3.2](https://github.com/benbria/node-amqp-connection-manager/compare/v2.3.1...v2.3.2) (2019-05-21) | ||
### Bug Fixes | ||
* Null delta to get semantic-release to pick up [#65](https://github.com/benbria/node-amqp-connection-manager/issues/65). Fix [#84](https://github.com/benbria/node-amqp-connection-manager/issues/84). ([9737135](https://github.com/benbria/node-amqp-connection-manager/commit/9737135)) | ||
## [2.3.1](https://github.com/benbria/node-amqp-connection-manager/compare/v2.3.0...v2.3.1) (2019-04-01) | ||
@@ -2,0 +9,0 @@ |
@@ -167,3 +167,5 @@ "use strict"; | ||
this._messages = []; // True if the "worker" is busy sending messages. False if we need to | ||
this._messages = []; // Place to store published, but not yet confirmed messages | ||
this._unconfirmedMessages = []; // True if the "worker" is busy sending messages. False if we need to | ||
// start the worker to get stuff done. | ||
@@ -225,2 +227,9 @@ | ||
return; | ||
} | ||
if (this._unconfirmedMessages.length > 0) { | ||
// requeu any messages that were left unconfirmed when connection was lost | ||
while (this._unconfirmedMessages.length) { | ||
this._messages.push(this._unconfirmedMessages.shift()); | ||
} | ||
} // Since we just connected, publish any queued messages | ||
@@ -276,2 +285,7 @@ | ||
if (this._unconfirmedMessages.length !== 0) { | ||
// Reject any unconfirmed messages. | ||
this._unconfirmedMessages.forEach(message => message.reject(new Error('Channel closed'))); | ||
} | ||
this._connectionManager.removeListener('connect', this._onConnect); | ||
@@ -310,3 +324,7 @@ | ||
const channel = this._channel; | ||
const message = this._messages[0]; | ||
const message = this._messages.shift(); | ||
this._unconfirmedMessages.push(message); | ||
Promise.resolve().then(() => { | ||
@@ -344,22 +362,23 @@ const encodedMessage = this._json ? new Buffer.from(JSON.stringify(message.content)) : message.content; | ||
} | ||
})(); | ||
})(); // Send some more! | ||
this._publishQueuedMessages(workerNumber); | ||
return sendPromise; | ||
}).then(result => { | ||
this._messages.shift(); | ||
this._unconfirmedMessages.shift(); | ||
message.resolve(result); // Send some more! | ||
this._publishQueuedMessages(workerNumber); | ||
message.resolve(result); | ||
}, err => { | ||
if (!this._channel) {// Tried to write to a closed channel. Leave the message in the queue and we'll try again when we | ||
if (!this._channel) { | ||
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when we | ||
// reconnect. | ||
this._messages.unshift(this._unconfirmedMessages.shift()); | ||
} else { | ||
// Something went wrong trying to send this message - could be JSON.stringify failed, could be the | ||
// broker rejected the message. Either way, reject it back | ||
this._messages.shift(); | ||
this._unconfirmedMessages.shift(); | ||
message.reject(err); // Send some more! | ||
this._publishQueuedMessages(workerNumber); | ||
message.reject(err); | ||
} | ||
@@ -366,0 +385,0 @@ }).catch( |
{ | ||
"name": "amqp-connection-manager", | ||
"version": "2.3.1", | ||
"version": "2.3.2", | ||
"description": "Auto-reconnect and round robin support for amqplib.", | ||
@@ -32,10 +32,10 @@ "main": "lib/index.js", | ||
"greenkeeper-lockfile": "^1.14.0", | ||
"husky": "^1.0.0-rc.2", | ||
"husky": "^2.0.0", | ||
"istanbul": "^0.4.0", | ||
"mocha": "^6.0.2", | ||
"nyc": "^13.3.0", | ||
"nyc": "^14.0.0", | ||
"promise-tools": "^2.0.0", | ||
"proxyquire": "^2.0.1", | ||
"semantic-release": "^15.13.0", | ||
"sinon": "^7.2.4" | ||
"sinon": "^7.3.2" | ||
}, | ||
@@ -42,0 +42,0 @@ "engines": { |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
168032
25
541