amqp-cacoon
Advanced tools
Comparing version 2.0.0 to 3.0.0
@@ -20,2 +20,12 @@ import { ChannelWrapper, ConsumeMessage, ConsumeBatchMessages } from '../index'; | ||
/** | ||
* Returns the number of messages in the unackedMessageList | ||
* Primarily used for automated testing, but safe to use by others. | ||
*/ | ||
getMessageCount(): number; | ||
/** | ||
* Returns the size of all the message sin the unackedMessageList | ||
* Primarily used for automated testing, but safe to use by others. | ||
*/ | ||
getBufferSize(): number; | ||
/** | ||
* resetMessages | ||
@@ -27,4 +37,2 @@ * Reset message list | ||
* | ||
* @param originalMessage: ConsumeMessage | ||
* @param json: object | ||
*/ | ||
@@ -39,3 +47,3 @@ resetMessages(): void; | ||
* | ||
* @param originalMessage: ConsumeMessage | ||
* @param msg | ||
*/ | ||
@@ -58,3 +66,3 @@ addMessage(msg: ConsumeMessage): void; | ||
* Do this by... | ||
* 1. Ack the last message using allUpTo argumetn to specify that all messages up to the last should be nacked | ||
* 1. Calling ack on each message. | ||
* | ||
@@ -69,6 +77,7 @@ * @param channel: Channel - Channel | ||
* Do this by... | ||
* 1. Nack the last message using the allUpTo argument to specify that all messages up to the last should be nacked | ||
* 1. Calling Nack on each message. | ||
* | ||
* @param channel: Channel - Channel | ||
* @param messageList: Array<ConsumeMessage> - Messages to be nacked | ||
* @param requeue | ||
*/ | ||
@@ -82,4 +91,3 @@ nackMessageList(channel: ChannelWrapper, messageList: Array<ConsumeMessage>, requeue?: boolean): void; | ||
* 2. Call handler function (Assume it nacks or acks) | ||
* 3. Ack unackedMessageList | ||
* 4. If error then nack unackedMessageList. Assume nack never occured if here | ||
* 3. If error then nack unackedMessageList. Assume nack never occurred if here | ||
* | ||
@@ -93,17 +101,18 @@ * @returns void | ||
* The rules are as follows. | ||
* 1. If buffer size > MAX_FILES_SIZE_BYTES then send the message | ||
* 2. If time elapsed since first message in group > MAX_BUFFER_TIME_MS then send grouped messages | ||
* 1. If buffer size > MAX_FILES_SIZE_BYTES then send the buffered message | ||
* 2. If time elapsed since first message in group > MAX_BUFFER_TIME_MS then send buffered messages | ||
* | ||
* Do this by... | ||
* 1. Set channel variable for this object. This way we don't have to do an async call elswhere. | ||
* 1. Set channel variable for this object. This way we don't have to do an async call elsewhere. | ||
* 2. Add message to a buffer | ||
* 3. Check buffer byte length and clear timer and send message if files size > MAX_FILES_SIZE_BYTES | ||
* 4. Else Setup timer if it is not already setup to send buffered messages | ||
* 5. When timer expires, send message | ||
* 5. When timer expires, send buffered messages | ||
* | ||
* @param originalMessage: ConsumeMessage - Used for ack and nack of messages | ||
* @param transformedMessage: object - Sent in buffered message | ||
* @returns void | ||
* @param channel | ||
* @param msg | ||
* @param handler | ||
*/ | ||
handleMessageBuffering(channel: ChannelWrapper, msg: ConsumeMessage, handler: (channel: ChannelWrapper, msg: ConsumeBatchMessages) => Promise<void>): Promise<void>; | ||
} |
@@ -48,2 +48,16 @@ "use strict"; | ||
/** | ||
* Returns the number of messages in the unackedMessageList | ||
* Primarily used for automated testing, but safe to use by others. | ||
*/ | ||
MessageBatchingManager.prototype.getMessageCount = function () { | ||
return this.unackedMessageList.length; | ||
}; | ||
/** | ||
* Returns the size of all the message sin the unackedMessageList | ||
* Primarily used for automated testing, but safe to use by others. | ||
*/ | ||
MessageBatchingManager.prototype.getBufferSize = function () { | ||
return this.bufferSize; | ||
}; | ||
/** | ||
* resetMessages | ||
@@ -55,4 +69,2 @@ * Reset message list | ||
* | ||
* @param originalMessage: ConsumeMessage | ||
* @param json: object | ||
*/ | ||
@@ -72,3 +84,3 @@ MessageBatchingManager.prototype.resetMessages = function () { | ||
* | ||
* @param originalMessage: ConsumeMessage | ||
* @param msg | ||
*/ | ||
@@ -100,3 +112,3 @@ MessageBatchingManager.prototype.addMessage = function (msg) { | ||
* Do this by... | ||
* 1. Ack the last message using allUpTo argumetn to specify that all messages up to the last should be nacked | ||
* 1. Calling ack on each message. | ||
* | ||
@@ -110,4 +122,2 @@ * @param channel: Channel - Channel | ||
} | ||
// 1. Ack the last message using allUpTo argumetn to specify that all messages up to the last should be nacked | ||
//channel.ack(messageList[messageList.length - 1], true); | ||
for (var _i = 0, messageList_1 = messageList; _i < messageList_1.length; _i++) { | ||
@@ -125,6 +135,7 @@ var msg = messageList_1[_i]; | ||
* Do this by... | ||
* 1. Nack the last message using the allUpTo argument to specify that all messages up to the last should be nacked | ||
* 1. Calling Nack on each message. | ||
* | ||
* @param channel: Channel - Channel | ||
* @param messageList: Array<ConsumeMessage> - Messages to be nacked | ||
* @param requeue | ||
*/ | ||
@@ -135,4 +146,2 @@ MessageBatchingManager.prototype.nackMessageList = function (channel, messageList, requeue) { | ||
} | ||
// 1. Nack the last message using the allUpTo argument to specify that all messages up to the last should be nacked | ||
//channel.nack(messageList[messageList.length - 1], true, requeue); | ||
for (var _i = 0, messageList_2 = messageList; _i < messageList_2.length; _i++) { | ||
@@ -152,4 +161,3 @@ var msg = messageList_2[_i]; | ||
* 2. Call handler function (Assume it nacks or acks) | ||
* 3. Ack unackedMessageList | ||
* 4. If error then nack unackedMessageList. Assume nack never occured if here | ||
* 3. If error then nack unackedMessageList. Assume nack never occurred if here | ||
* | ||
@@ -190,3 +198,3 @@ * @returns void | ||
e_1 = _b.sent(); | ||
// 4. If error then nack unackedMessageList | ||
// 3. If error then nack unackedMessageList | ||
if (!this.config.skipNackOnFail && unackedMessageList.length > 0) { | ||
@@ -205,15 +213,16 @@ this.nackMessageList(channel, unackedMessageList); | ||
* The rules are as follows. | ||
* 1. If buffer size > MAX_FILES_SIZE_BYTES then send the message | ||
* 2. If time elapsed since first message in group > MAX_BUFFER_TIME_MS then send grouped messages | ||
* 1. If buffer size > MAX_FILES_SIZE_BYTES then send the buffered message | ||
* 2. If time elapsed since first message in group > MAX_BUFFER_TIME_MS then send buffered messages | ||
* | ||
* Do this by... | ||
* 1. Set channel variable for this object. This way we don't have to do an async call elswhere. | ||
* 1. Set channel variable for this object. This way we don't have to do an async call elsewhere. | ||
* 2. Add message to a buffer | ||
* 3. Check buffer byte length and clear timer and send message if files size > MAX_FILES_SIZE_BYTES | ||
* 4. Else Setup timer if it is not already setup to send buffered messages | ||
* 5. When timer expires, send message | ||
* 5. When timer expires, send buffered messages | ||
* | ||
* @param originalMessage: ConsumeMessage - Used for ack and nack of messages | ||
* @param transformedMessage: object - Sent in buffered message | ||
* @returns void | ||
* @param channel | ||
* @param msg | ||
* @param handler | ||
*/ | ||
@@ -226,3 +235,3 @@ MessageBatchingManager.prototype.handleMessageBuffering = function (channel, msg, handler) { | ||
case 0: | ||
// 1. Set channel variable for this object. This way we don't have to do an async call elswhere. | ||
// 1. Set channel variable for this object. This way we don't have to do an async call elsewhere. | ||
this.amqpChannel = channel; | ||
@@ -247,3 +256,3 @@ // 2. Add message to a buffer | ||
if (this.config.maxTimeMs) { | ||
// 4. Else Setup timer if it is not alread setup to send buffered messages | ||
// 4. Else Setup timer if it is not already setup to send buffered messages | ||
if (!this.timerHandle) { | ||
@@ -250,0 +259,0 @@ this.timerHandle = setTimeout(function () { |
@@ -1,6 +0,6 @@ | ||
import amqp, { ChannelWrapper } from 'amqp-connection-manager'; | ||
import amqp, { ChannelWrapper, AmqpConnectionManagerOptions } from 'amqp-connection-manager'; | ||
import { ConsumeMessage, Channel, ConfirmChannel, Options } from 'amqplib'; | ||
import { Logger } from 'log4js'; | ||
declare type ConnectCallback = (channel: ConfirmChannel) => Promise<any>; | ||
export { ConsumeMessage, ChannelWrapper, Channel, ConfirmChannel, ConnectCallback }; | ||
export { ConsumeMessage, ChannelWrapper, Channel, ConfirmChannel, ConnectCallback, AmqpConnectionManagerOptions }; | ||
export interface ConsumeBatchMessages { | ||
@@ -7,0 +7,0 @@ batchingOptions: { |
@@ -271,9 +271,9 @@ "use strict"; | ||
AmqpCacoon.prototype.registerConsumerBatch = function (queue, handler, options) { | ||
var _a, _b, _c, _d, _e, _f; | ||
var _a, _b; | ||
return __awaiter(this, void 0, void 0, function () { | ||
var messageBatchingHandler; | ||
var _this = this; | ||
return __generator(this, function (_g) { | ||
return __generator(this, function (_c) { | ||
// Set some default options | ||
if (!((_a = options) === null || _a === void 0 ? void 0 : _a.batching)) { | ||
if (!(options === null || options === void 0 ? void 0 : options.batching)) { | ||
options = Object.assign({}, { | ||
@@ -288,5 +288,5 @@ batching: { | ||
providers: { logger: this.logger }, | ||
maxSizeBytes: (_c = (_b = options) === null || _b === void 0 ? void 0 : _b.batching) === null || _c === void 0 ? void 0 : _c.maxSizeBytes, | ||
maxTimeMs: (_e = (_d = options) === null || _d === void 0 ? void 0 : _d.batching) === null || _e === void 0 ? void 0 : _e.maxTimeMs, | ||
skipNackOnFail: (_f = options) === null || _f === void 0 ? void 0 : _f.noAck, | ||
maxSizeBytes: (_a = options === null || options === void 0 ? void 0 : options.batching) === null || _a === void 0 ? void 0 : _a.maxSizeBytes, | ||
maxTimeMs: (_b = options === null || options === void 0 ? void 0 : options.batching) === null || _b === void 0 ? void 0 : _b.maxTimeMs, | ||
skipNackOnFail: options === null || options === void 0 ? void 0 : options.noAck, | ||
}); | ||
@@ -326,10 +326,10 @@ // Register consumer | ||
channel = _a.sent(); | ||
// TODO: Alex: Does the guaranteed publish eliminate the need to handle drain events? | ||
// There's currently a reported bug in node-amqp-connection-manager saying the lib does | ||
// not handle drain events properly...we should fix this. | ||
// not handle drain events properly... requires research. | ||
// See https://github.com/valtech-sd/amqp-cacoon/issues/20 | ||
return [4 /*yield*/, channel.publish(exchange, routingKey, msgBuffer, options)]; | ||
case 2: | ||
// TODO: Alex: Does the guaranteed publish eliminate the need to handle drain events? | ||
// There's currently a reported bug in node-amqp-connection-manager saying the lib does | ||
// not handle drain events properly...we should fix this. | ||
// not handle drain events properly... requires research. | ||
// See https://github.com/valtech-sd/amqp-cacoon/issues/20 | ||
_a.sent(); | ||
@@ -336,0 +336,0 @@ return [2 /*return*/]; |
{ | ||
"name": "amqp-cacoon", | ||
"version": "2.0.0", | ||
"version": "3.0.0", | ||
"description": "AmqpCacoon is an abstraction around amqplib that provides a simple interface with flow control included out of the box", | ||
@@ -16,30 +16,30 @@ "main": "build/index.js", | ||
"build": "rimraf ./build && tsc", | ||
"test": "mocha -r ts-node/register ./tests/**/*.test.ts" | ||
"test": "rimraf ./build && tsc && mocha --exit -r ts-node/register ./tests/**/*.test.ts" | ||
}, | ||
"dependencies": { | ||
"amqp-connection-manager": "^3.2.1", | ||
"amqplib": "^0.6.0", | ||
"lodash": "^4.17.15", | ||
"log4js": "^6.1.0" | ||
"amqp-connection-manager": "^3.2.3", | ||
"amqplib": "^0.8.0", | ||
"lodash": "^4.17.21", | ||
"log4js": "^6.3.0" | ||
}, | ||
"author": "Valtech: Daniel Morris", | ||
"license": "MIT", | ||
"devDependencies": { | ||
"@types/amqp-connection-manager": "^2.0.10", | ||
"@types/amqplib": "^0.5.13", | ||
"@types/chai": "^4.2.7", | ||
"@types/lodash": "^4.14.149", | ||
"@types/mocha": "^5.2.7", | ||
"@types/node": "^13.1.6", | ||
"@types/simple-mock": "^0.8.1", | ||
"chai": "^4.2.0", | ||
"mocha": "^7.1.1", | ||
"nodemon": "^2.0.3", | ||
"rimraf": "^3.0.0", | ||
"@types/amqp-connection-manager": "^2.0.12", | ||
"@types/amqplib": "^0.8.2", | ||
"@types/chai": "^4.2.21", | ||
"@types/lodash": "^4.14.172", | ||
"@types/mocha": "^9.0.0", | ||
"@types/node": "^16.7.1", | ||
"@types/simple-mock": "^0.8.2", | ||
"@types/sinon": "^10.0.2", | ||
"chai": "^4.3.4", | ||
"mocha": "^9.1.0", | ||
"nodemon": "^2.0.12", | ||
"rimraf": "^3.0.2", | ||
"simple-mock": "^0.8.0", | ||
"sinon": "^8.1.0", | ||
"ts-mock-imports": "^1.2.6", | ||
"ts-node": "^8.5.4", | ||
"ts-sinon": "^1.0.24", | ||
"typescript": "^3.7.4" | ||
"sinon": "^11.1.2", | ||
"ts-mock-imports": "^1.3.7", | ||
"ts-node": "^10.2.1", | ||
"ts-sinon": "^2.0.1", | ||
"typescript": "^4.3.5" | ||
}, | ||
@@ -46,0 +46,0 @@ "publishConfig": { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
No contributors or author data
MaintenancePackage does not specify a list of contributors or an author in package.json.
Found 1 instance in 1 package
66094
967
317
18
1
+ Addedamqplib@0.8.0(transitive)
+ Addedsafe-buffer@5.2.1(transitive)
+ Addedurl-parse@1.5.10(transitive)
- Removedamqplib@0.6.0(transitive)
- Removedurl-parse@1.4.7(transitive)
Updatedamqplib@^0.8.0
Updatedlodash@^4.17.21
Updatedlog4js@^6.3.0