Socket
Socket
Sign inDemoInstall

amqp-cacoon

Package Overview
Dependencies
28
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.0.2 to 1.1.0

build/helpers/message_batching_manager.d.ts

50

build/index.d.ts

@@ -5,2 +5,20 @@ import amqp from 'amqplib';

export { ConsumeMessage, Channel };
export interface ConsumeBatchMessages {
batchingOptions: {
maxSizeBytes?: number;
maxTimeMs?: number;
};
totalSizeInBytes: number;
messages: Array<ConsumeMessage>;
ackAll: (allUpTo?: boolean) => {};
nackAll: (allUpTo?: boolean, requeue?: boolean) => {};
}
export interface ConsumerOptions extends Options.Consume {
}
export interface ConsumerBatchOptions extends Options.Consume {
batching?: {
maxSizeBytes?: number;
maxTimeMs?: number;
};
}
export interface IAmqpCacoonConfig {

@@ -64,12 +82,36 @@ protocol?: string;

/**
* registerConsumerPrivate
* registerConsumer and registerConsumerBatch use this function to register consumers
*
* @param queue - Name of the queue
* @param handler: (channel: Channel, msg: object) => Promise<any> - A handler that receives the message
* @param options : ConsumerOptions - Used to pass in consumer options
* @return Promise<void>
**/
private registerConsumerPrivate;
/**
* registerConsumer
* After registering a handler on a queue that handler will
* get called for messages received on the specified queueu
* get called for messages received on the specified queue
*
* @param queue - Name of the queue
* @param handler: (msg: object) => Promise<any> - A handler that receives the message
* @return channel
* @param handler: (channel: Channel, msg: object) => Promise<any> - A handler that receives the message
* @param options : ConsumerOptions - Used to pass in consumer options
* @return Promise<void>
**/
registerConsumer(queue: string, handler: (channel: Channel, msg: ConsumeMessage) => Promise<void>): Promise<void>;
registerConsumer(queue: string, handler: (channel: Channel, msg: ConsumeMessage) => Promise<void>, options?: ConsumerOptions): Promise<void>;
/**
* registerConsumerBatch
* This is very similar to registerConsumer except this enables message batching.
* The following options are configurable
* 1. batching.maxTimeMs - Max time in milliseconds before we return the batch
* 2. batching.maxSizeBytes - Max size in bytes before we return
*
* @param queue - Name of the queue
* @param handler: (channel: Channel, msg: object) => Promise<any> - A handler that receives the message
* @param options : ConsumerOptions - Used to pass in consumer options
* @return Promise<void>
**/
registerConsumerBatch(queue: string, handler: (channel: Channel, msg: ConsumeBatchMessages) => Promise<void>, options?: ConsumerBatchOptions): Promise<void>;
/**
* publish

@@ -76,0 +118,0 @@ * publish to an exchange

120

build/index.js

@@ -43,2 +43,5 @@ "use strict";

var amqplib_1 = __importDefault(require("amqplib"));
var message_batching_manager_1 = __importDefault(require("./helpers/message_batching_manager"));
var DEFAULT_MAX_FILES_SIZE_BYTES = 1024 * 1024 * 2; // 2 MB
var DEFAULT_MAX_BUFFER_TIME_MS = 60 * 1000; // 60 seconds
/**

@@ -179,14 +182,13 @@ * AmqpCacoon

/**
* registerConsumer
* After registering a handler on a queue that handler will
* get called for messages received on the specified queueu
* registerConsumerPrivate
* registerConsumer and registerConsumerBatch use this function to register consumers
*
* @param queue - Name of the queue
* @param handler: (msg: object) => Promise<any> - A handler that receives the message
* @return channel
* @param handler: (channel: Channel, msg: object) => Promise<any> - A handler that receives the message
* @param options : ConsumerOptions - Used to pass in consumer options
* @return Promise<void>
**/
AmqpCacoon.prototype.registerConsumer = function (queue, handler) {
AmqpCacoon.prototype.registerConsumerPrivate = function (queue, consumerHandler, options) {
return __awaiter(this, void 0, void 0, function () {
var channel_1, e_3;
var _this = this;
var channel, e_3;
return __generator(this, function (_a) {

@@ -198,17 +200,5 @@ switch (_a.label) {

case 1:
channel_1 = _a.sent();
channel = _a.sent();
// Register a consume on the current channel
return [4 /*yield*/, channel_1.consume(queue, function (msg) { return __awaiter(_this, void 0, void 0, function () {
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
if (!msg)
return [2 /*return*/]; // We know this will always be true but typescript requires this
return [4 /*yield*/, handler(channel_1, msg)];
case 1:
_a.sent();
return [2 /*return*/];
}
});
}); })];
return [4 /*yield*/, channel.consume(queue, consumerHandler.bind(this, channel), options)];
case 2:

@@ -221,3 +211,3 @@ // Register a consume on the current channel

if (this.logger)
this.logger.error('AMQPCacoon.registerConsumer: Error: ', e_3);
this.logger.error('AMQPCacoon.registerConsumerPrivate: Error: ', e_3);
throw e_3;

@@ -230,2 +220,78 @@ case 4: return [2 /*return*/];

/**
* registerConsumer
* After registering a handler on a queue that handler will
* get called for messages received on the specified queue
*
* @param queue - Name of the queue
* @param handler: (channel: Channel, msg: object) => Promise<any> - A handler that receives the message
* @param options : ConsumerOptions - Used to pass in consumer options
* @return Promise<void>
**/
AmqpCacoon.prototype.registerConsumer = function (queue, handler, options) {
return __awaiter(this, void 0, void 0, function () {
var _this = this;
return __generator(this, function (_a) {
return [2 /*return*/, this.registerConsumerPrivate(queue, function (channel, msg) { return __awaiter(_this, void 0, void 0, function () {
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
if (!msg)
return [2 /*return*/]; // We know this will always be true but typescript requires this
return [4 /*yield*/, handler(channel, msg)];
case 1:
_a.sent();
return [2 /*return*/];
}
});
}); }, options)];
});
});
};
/**
* registerConsumerBatch
* This is very similar to registerConsumer except this enables message batching.
* The following options are configurable
* 1. batching.maxTimeMs - Max time in milliseconds before we return the batch
* 2. batching.maxSizeBytes - Max size in bytes before we return
*
* @param queue - Name of the queue
* @param handler: (channel: Channel, msg: object) => Promise<any> - A handler that receives the message
* @param options : ConsumerOptions - Used to pass in consumer options
* @return Promise<void>
**/
AmqpCacoon.prototype.registerConsumerBatch = function (queue, handler, options) {
var _a, _b, _c, _d, _e, _f;
return __awaiter(this, void 0, void 0, function () {
var messageBatchingHandler;
var _this = this;
return __generator(this, function (_g) {
// Set some default options
if (!((_a = options) === null || _a === void 0 ? void 0 : _a.batching)) {
options = Object.assign({}, {
batching: {
maxTimeMs: DEFAULT_MAX_BUFFER_TIME_MS,
maxSizeBytes: DEFAULT_MAX_FILES_SIZE_BYTES,
},
}, options);
}
messageBatchingHandler = new message_batching_manager_1.default({
providers: { logger: this.logger },
maxSizeBytes: (_c = (_b = options) === null || _b === void 0 ? void 0 : _b.batching) === null || _c === void 0 ? void 0 : _c.maxSizeBytes,
maxTimeMs: (_e = (_d = options) === null || _d === void 0 ? void 0 : _d.batching) === null || _e === void 0 ? void 0 : _e.maxTimeMs,
skipNackOnFail: (_f = options) === null || _f === void 0 ? void 0 : _f.noAck,
});
// Register consumer
return [2 /*return*/, this.registerConsumerPrivate(queue, function (channel, msg) { return __awaiter(_this, void 0, void 0, function () {
return __generator(this, function (_a) {
if (!msg)
return [2 /*return*/]; // We know this will always be true but typescript requires this
// Handle message batching
messageBatchingHandler.handleMessageBuffering(channel, msg, handler);
return [2 /*return*/];
});
}); }, options)];
});
});
};
/**
* publish

@@ -242,3 +308,3 @@ * publish to an exchange

return __awaiter(this, void 0, void 0, function () {
var channel_2, channelReady, e_4;
var channel_1, channelReady, e_4;
var _this = this;

@@ -251,4 +317,4 @@ return __generator(this, function (_a) {

case 1:
channel_2 = _a.sent();
channelReady = channel_2.publish(exchange, routingKey, msgBuffer, options);
channel_1 = _a.sent();
channelReady = channel_1.publish(exchange, routingKey, msgBuffer, options);
if (this.logger) {

@@ -282,3 +348,3 @@ this.logger.trace("AMQPCacoon.publish result: " + channelReady);

: true;
channel_2.once('drain', function () {
channel_1.once('drain', function () {
if (_this.logger) {

@@ -285,0 +351,0 @@ _this.logger.trace('AMQPCacoon.publish: "drain" received.');

{
"name": "amqp-cacoon",
"version": "1.0.2",
"version": "1.1.0",
"description": "AmqpCacoon is an abstraction around amqplib that provides a simple interface with flow control included out of the box",

@@ -5,0 +5,0 @@ "main": "build/index.js",

@@ -7,6 +7,2 @@ # AmqpCacoon

## Caution
WIP: Documetation is very much a work in progress.
## Overview

@@ -20,3 +16,4 @@

- Publish flow control included out of the box (Wait for drain event if we can't publish)
- TODO timeout if drain event does not occure after some amount of time
- timeout if drain event does not occurs after some amount of time when channel is not ready to receive a publish
- Consume single or batch of messages

@@ -45,90 +42,114 @@ ## Requirements to tests

## Usage
## Simple Usage
### Connect and publish
### Connect
This allows you to connect to an amqp server
```javascript
const config = {
messageBus: {
// Protocol should be "amqp" or "amqps"
protocol: 'amqp',
// Username + Password on the RabbitMQ host
username: 'valtech',
password: 'iscool',
// Host
host: 'localhost',
// Port
port: 5672,
// Queue setup
testQueue: 'test-queue',
},
};
const config = {
messageBus: {
// Protocol should be "amqp" or "amqps"
protocol: 'amqp',
// Username + Password on the RabbitMQ host
username: 'valtech',
password: 'iscool',
// Host
host: 'localhost',
// Port
port: 5672,
// Queue setup
testQueue: 'test-queue',
},
};
let amqpCacoon = new AmqpCacoon({
protocol: config.messageBus.protocol,
username: config.messageBus.username,
password: config.messageBus.password,
host: config.messageBus.host,
port: config.messageBus.port,
amqp_opts: {},
providers: {
logger: logger,
},
});
let channel = await amqpCacoon.getPublishChannel(); // Connects and sets up a publish channel
let amqpCacoon = new AmqpCacoon({
protocol: config.messageBus.protocol,
username: config.messageBus.username,
password: config.messageBus.password,
host: config.messageBus.host,
port: config.messageBus.port,
amqp_opts: {},
providers: {
logger: logger,
},
});
```
// Create queue and setup publish channel
channel.assertQueue(config.messageBus.testQueue);
### Publish
// Publish
await amqpCacoon.publish(
'', // Publish directly to queue
config.messageBus.testQueue,
Buffer.from('TestMessage')
);
This allows you to publish a message
```javascript
let channel = await amqpCacoon.getPublishChannel(); // Connects and sets up a publish channel
// Create queue and setup publish channel
channel.assertQueue(config.messageBus.testQueue);
// Publish
await amqpCacoon.publish(
'', // Publish directly to queue
config.messageBus.testQueue,
Buffer.from('TestMessage')
);
```
### Connect and Consume
### Consume Single Message
This will allow use to consume a single message.
```javascript
const config = {
messageBus: {
// Protocol should be "amqp" or "amqps"
protocol: 'amqp',
// Username + Password on the RabbitMQ host
username: 'valtech',
password: 'iscool',
// Host
host: 'localhost',
// Port
port: 5672,
// Queue setup
testQueue: 'test-queue',
},
};
let channel = await amqpCacoon.getConsumerChannel(); // Connects and sets up a subscription channel
let amqpCacoon = new AmqpCacoon({
protocol: config.messageBus.protocol,
username: config.messageBus.username,
password: config.messageBus.password,
host: config.messageBus.host,
port: config.messageBus.port,
amqp_opts: {},
providers: {
logger: logger,
},
});
// Create queue
channel.assertQueue(config.messageBus.testQueue);
let channel = await amqpCacoon.getConsumerChannel(); // Connects and sets up a subscription channel
// Consume single message at a time
amqpCacoon.registerConsumer(
config.messageBus.testQueue,
async (channel: Channel, msg: ConsumeMessage) => {
try {
console.log('Messsage content', msg.content.toString());
// ... Do other processing here
channel.ack(msg) // To ack a messages
} catch (e) {
channel.nack(msg) // To ack a messages
}
}
);
```
// Create queue
channel.assertQueue(config.messageBus.testQueue);
### Consume Message Batch
// Consume
amqpCacoon.registerConsumer(
config.messageBus.testQueue,
async (channel: Channel, msg: ConsumeMessage) => {
console.log("Messsage content", msg.content.toString());
This allows you to wait until either some time has elapsed or a specified message size has been exceeded before the messages are consumed
```javascript
// Consume batch of message at a time. Configuration for time based or size based batching is provided
amqpCacoon.registerConsumerBatch(
config.messageBus.testQueue,
async (channel: Channel, msg: ConsumeBatchMessages) => {
try {
console.log('Messsage content', msg.content.toString());
// ... Do other processing here
msg.ackAll() // To ack all messages
} catch (e) {
msg.nackAll() // To nack all messages
}
);
},
{
batching: {
maxTimeMs: 60000, // Don't provide messages to the callback until at least 60000 ms have passed
maxSizeBytes: 1024 * 1024 * 2, // 1024 * 1024 * 2 = 2 MB -don't provide message to the callback until 2 MB of data have been received
}
}
);
```
## Dealing With Channels
This library expose amqplib channel when you call either `getConsumerChannel` or `getPublishChannel`. The channel is also exposed when registering a consumer. To learn more about that api see documentation for [amqplib](https://www.npmjs.com/package/amqplib). Just a couple thing that you should remember to do.
1. Remember to ack or nack on all messages.
2. An alternative is to pass an option into the `registerConsumer` to not require an ack (noAck). The problem with this is that if your application is reset or errors out, you may loose the message or messages.

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc