Socket
Socket
Sign inDemoInstall

amqp-cacoon

Package Overview
Dependencies
29
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 2.0.0 to 3.0.0

README.md

35

build/helpers/message_batching_manager.d.ts

@@ -20,2 +20,12 @@ import { ChannelWrapper, ConsumeMessage, ConsumeBatchMessages } from '../index';

/**
* Returns the number of messages in the unackedMessageList
* Primarily used for automated testing, but safe to use by others.
*/
getMessageCount(): number;
/**
* Returns the size of all the message sin the unackedMessageList
* Primarily used for automated testing, but safe to use by others.
*/
getBufferSize(): number;
/**
* resetMessages

@@ -27,4 +37,2 @@ * Reset message list

*
* @param originalMessage: ConsumeMessage
* @param json: object
*/

@@ -39,3 +47,3 @@ resetMessages(): void;

*
* @param originalMessage: ConsumeMessage
* @param msg
*/

@@ -58,3 +66,3 @@ addMessage(msg: ConsumeMessage): void;

* Do this by...
* 1. Ack the last message using allUpTo argumetn to specify that all messages up to the last should be nacked
* 1. Calling ack on each message.
*

@@ -69,6 +77,7 @@ * @param channel: Channel - Channel

* Do this by...
* 1. Nack the last message using the allUpTo argument to specify that all messages up to the last should be nacked
* 1. Calling Nack on each message.
*
* @param channel: Channel - Channel
* @param messageList: Array<ConsumeMessage> - Messages to be nacked
* @param requeue
*/

@@ -82,4 +91,3 @@ nackMessageList(channel: ChannelWrapper, messageList: Array<ConsumeMessage>, requeue?: boolean): void;

* 2. Call handler function (Assume it nacks or acks)
* 3. Ack unackedMessageList
* 4. If error then nack unackedMessageList. Assume nack never occured if here
* 3. If error then nack unackedMessageList. Assume nack never occurred if here
*

@@ -93,17 +101,18 @@ * @returns void

* The rules are as follows.
* 1. If buffer size > MAX_FILES_SIZE_BYTES then send the message
* 2. If time elapsed since first message in group > MAX_BUFFER_TIME_MS then send grouped messages
* 1. If buffer size > MAX_FILES_SIZE_BYTES then send the buffered message
* 2. If time elapsed since first message in group > MAX_BUFFER_TIME_MS then send buffered messages
*
* Do this by...
* 1. Set channel variable for this object. This way we don't have to do an async call elswhere.
* 1. Set channel variable for this object. This way we don't have to do an async call elsewhere.
* 2. Add message to a buffer
* 3. Check buffer byte length and clear timer and send message if files size > MAX_FILES_SIZE_BYTES
* 4. Else Setup timer if it is not already setup to send buffered messages
* 5. When timer expires, send message
* 5. When timer expires, send buffered messages
*
* @param originalMessage: ConsumeMessage - Used for ack and nack of messages
* @param transformedMessage: object - Sent in buffered message
* @returns void
* @param channel
* @param msg
* @param handler
*/
handleMessageBuffering(channel: ChannelWrapper, msg: ConsumeMessage, handler: (channel: ChannelWrapper, msg: ConsumeBatchMessages) => Promise<void>): Promise<void>;
}

@@ -48,2 +48,16 @@ "use strict";

/**
* Returns the number of messages in the unackedMessageList
* Primarily used for automated testing, but safe to use by others.
*/
MessageBatchingManager.prototype.getMessageCount = function () {
return this.unackedMessageList.length;
};
/**
* Returns the size of all the message sin the unackedMessageList
* Primarily used for automated testing, but safe to use by others.
*/
MessageBatchingManager.prototype.getBufferSize = function () {
return this.bufferSize;
};
/**
* resetMessages

@@ -55,4 +69,2 @@ * Reset message list

*
* @param originalMessage: ConsumeMessage
* @param json: object
*/

@@ -72,3 +84,3 @@ MessageBatchingManager.prototype.resetMessages = function () {

*
* @param originalMessage: ConsumeMessage
* @param msg
*/

@@ -100,3 +112,3 @@ MessageBatchingManager.prototype.addMessage = function (msg) {

* Do this by...
* 1. Ack the last message using allUpTo argumetn to specify that all messages up to the last should be nacked
* 1. Calling ack on each message.
*

@@ -110,4 +122,2 @@ * @param channel: Channel - Channel

}
// 1. Ack the last message using allUpTo argumetn to specify that all messages up to the last should be nacked
//channel.ack(messageList[messageList.length - 1], true);
for (var _i = 0, messageList_1 = messageList; _i < messageList_1.length; _i++) {

@@ -125,6 +135,7 @@ var msg = messageList_1[_i];

* Do this by...
* 1. Nack the last message using the allUpTo argument to specify that all messages up to the last should be nacked
* 1. Calling Nack on each message.
*
* @param channel: Channel - Channel
* @param messageList: Array<ConsumeMessage> - Messages to be nacked
* @param requeue
*/

@@ -135,4 +146,2 @@ MessageBatchingManager.prototype.nackMessageList = function (channel, messageList, requeue) {

}
// 1. Nack the last message using the allUpTo argument to specify that all messages up to the last should be nacked
//channel.nack(messageList[messageList.length - 1], true, requeue);
for (var _i = 0, messageList_2 = messageList; _i < messageList_2.length; _i++) {

@@ -152,4 +161,3 @@ var msg = messageList_2[_i];

* 2. Call handler function (Assume it nacks or acks)
* 3. Ack unackedMessageList
* 4. If error then nack unackedMessageList. Assume nack never occured if here
* 3. If error then nack unackedMessageList. Assume nack never occurred if here
*

@@ -190,3 +198,3 @@ * @returns void

e_1 = _b.sent();
// 4. If error then nack unackedMessageList
// 3. If error then nack unackedMessageList
if (!this.config.skipNackOnFail && unackedMessageList.length > 0) {

@@ -205,15 +213,16 @@ this.nackMessageList(channel, unackedMessageList);

* The rules are as follows.
* 1. If buffer size > MAX_FILES_SIZE_BYTES then send the message
* 2. If time elapsed since first message in group > MAX_BUFFER_TIME_MS then send grouped messages
* 1. If buffer size > MAX_FILES_SIZE_BYTES then send the buffered message
* 2. If time elapsed since first message in group > MAX_BUFFER_TIME_MS then send buffered messages
*
* Do this by...
* 1. Set channel variable for this object. This way we don't have to do an async call elswhere.
* 1. Set channel variable for this object. This way we don't have to do an async call elsewhere.
* 2. Add message to a buffer
* 3. Check buffer byte length and clear timer and send message if files size > MAX_FILES_SIZE_BYTES
* 4. Else Setup timer if it is not already setup to send buffered messages
* 5. When timer expires, send message
* 5. When timer expires, send buffered messages
*
* @param originalMessage: ConsumeMessage - Used for ack and nack of messages
* @param transformedMessage: object - Sent in buffered message
* @returns void
* @param channel
* @param msg
* @param handler
*/

@@ -226,3 +235,3 @@ MessageBatchingManager.prototype.handleMessageBuffering = function (channel, msg, handler) {

case 0:
// 1. Set channel variable for this object. This way we don't have to do an async call elswhere.
// 1. Set channel variable for this object. This way we don't have to do an async call elsewhere.
this.amqpChannel = channel;

@@ -247,3 +256,3 @@ // 2. Add message to a buffer

if (this.config.maxTimeMs) {
// 4. Else Setup timer if it is not alread setup to send buffered messages
// 4. Else Setup timer if it is not already setup to send buffered messages
if (!this.timerHandle) {

@@ -250,0 +259,0 @@ this.timerHandle = setTimeout(function () {

@@ -1,6 +0,6 @@

import amqp, { ChannelWrapper } from 'amqp-connection-manager';
import amqp, { ChannelWrapper, AmqpConnectionManagerOptions } from 'amqp-connection-manager';
import { ConsumeMessage, Channel, ConfirmChannel, Options } from 'amqplib';
import { Logger } from 'log4js';
declare type ConnectCallback = (channel: ConfirmChannel) => Promise<any>;
export { ConsumeMessage, ChannelWrapper, Channel, ConfirmChannel, ConnectCallback };
export { ConsumeMessage, ChannelWrapper, Channel, ConfirmChannel, ConnectCallback, AmqpConnectionManagerOptions };
export interface ConsumeBatchMessages {

@@ -7,0 +7,0 @@ batchingOptions: {

@@ -271,9 +271,9 @@ "use strict";

AmqpCacoon.prototype.registerConsumerBatch = function (queue, handler, options) {
var _a, _b, _c, _d, _e, _f;
var _a, _b;
return __awaiter(this, void 0, void 0, function () {
var messageBatchingHandler;
var _this = this;
return __generator(this, function (_g) {
return __generator(this, function (_c) {
// Set some default options
if (!((_a = options) === null || _a === void 0 ? void 0 : _a.batching)) {
if (!(options === null || options === void 0 ? void 0 : options.batching)) {
options = Object.assign({}, {

@@ -288,5 +288,5 @@ batching: {

providers: { logger: this.logger },
maxSizeBytes: (_c = (_b = options) === null || _b === void 0 ? void 0 : _b.batching) === null || _c === void 0 ? void 0 : _c.maxSizeBytes,
maxTimeMs: (_e = (_d = options) === null || _d === void 0 ? void 0 : _d.batching) === null || _e === void 0 ? void 0 : _e.maxTimeMs,
skipNackOnFail: (_f = options) === null || _f === void 0 ? void 0 : _f.noAck,
maxSizeBytes: (_a = options === null || options === void 0 ? void 0 : options.batching) === null || _a === void 0 ? void 0 : _a.maxSizeBytes,
maxTimeMs: (_b = options === null || options === void 0 ? void 0 : options.batching) === null || _b === void 0 ? void 0 : _b.maxTimeMs,
skipNackOnFail: options === null || options === void 0 ? void 0 : options.noAck,
});

@@ -326,10 +326,10 @@ // Register consumer

channel = _a.sent();
// TODO: Alex: Does the guaranteed publish eliminate the need to handle drain events?
// There's currently a reported bug in node-amqp-connection-manager saying the lib does
// not handle drain events properly...we should fix this.
// not handle drain events properly... requires research.
// See https://github.com/valtech-sd/amqp-cacoon/issues/20
return [4 /*yield*/, channel.publish(exchange, routingKey, msgBuffer, options)];
case 2:
// TODO: Alex: Does the guaranteed publish eliminate the need to handle drain events?
// There's currently a reported bug in node-amqp-connection-manager saying the lib does
// not handle drain events properly...we should fix this.
// not handle drain events properly... requires research.
// See https://github.com/valtech-sd/amqp-cacoon/issues/20
_a.sent();

@@ -336,0 +336,0 @@ return [2 /*return*/];

{
"name": "amqp-cacoon",
"version": "2.0.0",
"version": "3.0.0",
"description": "AmqpCacoon is an abstraction around amqplib that provides a simple interface with flow control included out of the box",

@@ -16,30 +16,30 @@ "main": "build/index.js",

"build": "rimraf ./build && tsc",
"test": "mocha -r ts-node/register ./tests/**/*.test.ts"
"test": "rimraf ./build && tsc && mocha --exit -r ts-node/register ./tests/**/*.test.ts"
},
"dependencies": {
"amqp-connection-manager": "^3.2.1",
"amqplib": "^0.6.0",
"lodash": "^4.17.15",
"log4js": "^6.1.0"
"amqp-connection-manager": "^3.2.3",
"amqplib": "^0.8.0",
"lodash": "^4.17.21",
"log4js": "^6.3.0"
},
"author": "Valtech: Daniel Morris",
"license": "MIT",
"devDependencies": {
"@types/amqp-connection-manager": "^2.0.10",
"@types/amqplib": "^0.5.13",
"@types/chai": "^4.2.7",
"@types/lodash": "^4.14.149",
"@types/mocha": "^5.2.7",
"@types/node": "^13.1.6",
"@types/simple-mock": "^0.8.1",
"chai": "^4.2.0",
"mocha": "^7.1.1",
"nodemon": "^2.0.3",
"rimraf": "^3.0.0",
"@types/amqp-connection-manager": "^2.0.12",
"@types/amqplib": "^0.8.2",
"@types/chai": "^4.2.21",
"@types/lodash": "^4.14.172",
"@types/mocha": "^9.0.0",
"@types/node": "^16.7.1",
"@types/simple-mock": "^0.8.2",
"@types/sinon": "^10.0.2",
"chai": "^4.3.4",
"mocha": "^9.1.0",
"nodemon": "^2.0.12",
"rimraf": "^3.0.2",
"simple-mock": "^0.8.0",
"sinon": "^8.1.0",
"ts-mock-imports": "^1.2.6",
"ts-node": "^8.5.4",
"ts-sinon": "^1.0.24",
"typescript": "^3.7.4"
"sinon": "^11.1.2",
"ts-mock-imports": "^1.3.7",
"ts-node": "^10.2.1",
"ts-sinon": "^2.0.1",
"typescript": "^4.3.5"
},

@@ -46,0 +46,0 @@ "publishConfig": {

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc