amqp-cacoon
Advanced tools
Comparing version 1.0.2 to 1.1.0
@@ -5,2 +5,20 @@ import amqp from 'amqplib'; | ||
export { ConsumeMessage, Channel }; | ||
export interface ConsumeBatchMessages { | ||
batchingOptions: { | ||
maxSizeBytes?: number; | ||
maxTimeMs?: number; | ||
}; | ||
totalSizeInBytes: number; | ||
messages: Array<ConsumeMessage>; | ||
ackAll: (allUpTo?: boolean) => {}; | ||
nackAll: (allUpTo?: boolean, requeue?: boolean) => {}; | ||
} | ||
export interface ConsumerOptions extends Options.Consume { | ||
} | ||
export interface ConsumerBatchOptions extends Options.Consume { | ||
batching?: { | ||
maxSizeBytes?: number; | ||
maxTimeMs?: number; | ||
}; | ||
} | ||
export interface IAmqpCacoonConfig { | ||
@@ -64,12 +82,36 @@ protocol?: string; | ||
/** | ||
* registerConsumerPrivate | ||
* registerConsumer and registerConsumerBatch use this function to register consumers | ||
* | ||
* @param queue - Name of the queue | ||
* @param handler: (channel: Channel, msg: object) => Promise<any> - A handler that receives the message | ||
* @param options : ConsumerOptions - Used to pass in consumer options | ||
* @return Promise<void> | ||
**/ | ||
private registerConsumerPrivate; | ||
/** | ||
* registerConsumer | ||
* After registering a handler on a queue that handler will | ||
* get called for messages received on the specified queueu | ||
* get called for messages received on the specified queue | ||
* | ||
* @param queue - Name of the queue | ||
* @param handler: (msg: object) => Promise<any> - A handler that receives the message | ||
* @return channel | ||
* @param handler: (channel: Channel, msg: object) => Promise<any> - A handler that receives the message | ||
* @param options : ConsumerOptions - Used to pass in consumer options | ||
* @return Promise<void> | ||
**/ | ||
registerConsumer(queue: string, handler: (channel: Channel, msg: ConsumeMessage) => Promise<void>): Promise<void>; | ||
registerConsumer(queue: string, handler: (channel: Channel, msg: ConsumeMessage) => Promise<void>, options?: ConsumerOptions): Promise<void>; | ||
/** | ||
* registerConsumerBatch | ||
* This is very similar to registerConsumer except this enables message batching. | ||
* The following options are configurable | ||
* 1. batching.maxTimeMs - Max time in milliseconds before we return the batch | ||
* 2. batching.maxSizeBytes - Max size in bytes before we return | ||
* | ||
* @param queue - Name of the queue | ||
* @param handler: (channel: Channel, msg: object) => Promise<any> - A handler that receives the message | ||
* @param options : ConsumerOptions - Used to pass in consumer options | ||
* @return Promise<void> | ||
**/ | ||
registerConsumerBatch(queue: string, handler: (channel: Channel, msg: ConsumeBatchMessages) => Promise<void>, options?: ConsumerBatchOptions): Promise<void>; | ||
/** | ||
* publish | ||
@@ -76,0 +118,0 @@ * publish to an exchange |
@@ -43,2 +43,5 @@ "use strict"; | ||
var amqplib_1 = __importDefault(require("amqplib")); | ||
var message_batching_manager_1 = __importDefault(require("./helpers/message_batching_manager")); | ||
var DEFAULT_MAX_FILES_SIZE_BYTES = 1024 * 1024 * 2; // 2 MB | ||
var DEFAULT_MAX_BUFFER_TIME_MS = 60 * 1000; // 60 seconds | ||
/** | ||
@@ -179,14 +182,13 @@ * AmqpCacoon | ||
/** | ||
* registerConsumer | ||
* After registering a handler on a queue that handler will | ||
* get called for messages received on the specified queueu | ||
* registerConsumerPrivate | ||
* registerConsumer and registerConsumerBatch use this function to register consumers | ||
* | ||
* @param queue - Name of the queue | ||
* @param handler: (msg: object) => Promise<any> - A handler that receives the message | ||
* @return channel | ||
* @param handler: (channel: Channel, msg: object) => Promise<any> - A handler that receives the message | ||
* @param options : ConsumerOptions - Used to pass in consumer options | ||
* @return Promise<void> | ||
**/ | ||
AmqpCacoon.prototype.registerConsumer = function (queue, handler) { | ||
AmqpCacoon.prototype.registerConsumerPrivate = function (queue, consumerHandler, options) { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var channel_1, e_3; | ||
var _this = this; | ||
var channel, e_3; | ||
return __generator(this, function (_a) { | ||
@@ -198,17 +200,5 @@ switch (_a.label) { | ||
case 1: | ||
channel_1 = _a.sent(); | ||
channel = _a.sent(); | ||
// Register a consume on the current channel | ||
return [4 /*yield*/, channel_1.consume(queue, function (msg) { return __awaiter(_this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
if (!msg) | ||
return [2 /*return*/]; // We know this will always be true but typescript requires this | ||
return [4 /*yield*/, handler(channel_1, msg)]; | ||
case 1: | ||
_a.sent(); | ||
return [2 /*return*/]; | ||
} | ||
}); | ||
}); })]; | ||
return [4 /*yield*/, channel.consume(queue, consumerHandler.bind(this, channel), options)]; | ||
case 2: | ||
@@ -221,3 +211,3 @@ // Register a consume on the current channel | ||
if (this.logger) | ||
this.logger.error('AMQPCacoon.registerConsumer: Error: ', e_3); | ||
this.logger.error('AMQPCacoon.registerConsumerPrivate: Error: ', e_3); | ||
throw e_3; | ||
@@ -230,2 +220,78 @@ case 4: return [2 /*return*/]; | ||
/** | ||
* registerConsumer | ||
* After registering a handler on a queue that handler will | ||
* get called for messages received on the specified queue | ||
* | ||
* @param queue - Name of the queue | ||
* @param handler: (channel: Channel, msg: object) => Promise<any> - A handler that receives the message | ||
* @param options : ConsumerOptions - Used to pass in consumer options | ||
* @return Promise<void> | ||
**/ | ||
AmqpCacoon.prototype.registerConsumer = function (queue, handler, options) { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var _this = this; | ||
return __generator(this, function (_a) { | ||
return [2 /*return*/, this.registerConsumerPrivate(queue, function (channel, msg) { return __awaiter(_this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
if (!msg) | ||
return [2 /*return*/]; // We know this will always be true but typescript requires this | ||
return [4 /*yield*/, handler(channel, msg)]; | ||
case 1: | ||
_a.sent(); | ||
return [2 /*return*/]; | ||
} | ||
}); | ||
}); }, options)]; | ||
}); | ||
}); | ||
}; | ||
/** | ||
* registerConsumerBatch | ||
* This is very similar to registerConsumer except this enables message batching. | ||
* The following options are configurable | ||
* 1. batching.maxTimeMs - Max time in milliseconds before we return the batch | ||
* 2. batching.maxSizeBytes - Max size in bytes before we return | ||
* | ||
* @param queue - Name of the queue | ||
* @param handler: (channel: Channel, msg: object) => Promise<any> - A handler that receives the message | ||
* @param options : ConsumerOptions - Used to pass in consumer options | ||
* @return Promise<void> | ||
**/ | ||
AmqpCacoon.prototype.registerConsumerBatch = function (queue, handler, options) { | ||
var _a, _b, _c, _d, _e, _f; | ||
return __awaiter(this, void 0, void 0, function () { | ||
var messageBatchingHandler; | ||
var _this = this; | ||
return __generator(this, function (_g) { | ||
// Set some default options | ||
if (!((_a = options) === null || _a === void 0 ? void 0 : _a.batching)) { | ||
options = Object.assign({}, { | ||
batching: { | ||
maxTimeMs: DEFAULT_MAX_BUFFER_TIME_MS, | ||
maxSizeBytes: DEFAULT_MAX_FILES_SIZE_BYTES, | ||
}, | ||
}, options); | ||
} | ||
messageBatchingHandler = new message_batching_manager_1.default({ | ||
providers: { logger: this.logger }, | ||
maxSizeBytes: (_c = (_b = options) === null || _b === void 0 ? void 0 : _b.batching) === null || _c === void 0 ? void 0 : _c.maxSizeBytes, | ||
maxTimeMs: (_e = (_d = options) === null || _d === void 0 ? void 0 : _d.batching) === null || _e === void 0 ? void 0 : _e.maxTimeMs, | ||
skipNackOnFail: (_f = options) === null || _f === void 0 ? void 0 : _f.noAck, | ||
}); | ||
// Register consumer | ||
return [2 /*return*/, this.registerConsumerPrivate(queue, function (channel, msg) { return __awaiter(_this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
if (!msg) | ||
return [2 /*return*/]; // We know this will always be true but typescript requires this | ||
// Handle message batching | ||
messageBatchingHandler.handleMessageBuffering(channel, msg, handler); | ||
return [2 /*return*/]; | ||
}); | ||
}); }, options)]; | ||
}); | ||
}); | ||
}; | ||
/** | ||
* publish | ||
@@ -242,3 +308,3 @@ * publish to an exchange | ||
return __awaiter(this, void 0, void 0, function () { | ||
var channel_2, channelReady, e_4; | ||
var channel_1, channelReady, e_4; | ||
var _this = this; | ||
@@ -251,4 +317,4 @@ return __generator(this, function (_a) { | ||
case 1: | ||
channel_2 = _a.sent(); | ||
channelReady = channel_2.publish(exchange, routingKey, msgBuffer, options); | ||
channel_1 = _a.sent(); | ||
channelReady = channel_1.publish(exchange, routingKey, msgBuffer, options); | ||
if (this.logger) { | ||
@@ -282,3 +348,3 @@ this.logger.trace("AMQPCacoon.publish result: " + channelReady); | ||
: true; | ||
channel_2.once('drain', function () { | ||
channel_1.once('drain', function () { | ||
if (_this.logger) { | ||
@@ -285,0 +351,0 @@ _this.logger.trace('AMQPCacoon.publish: "drain" received.'); |
{ | ||
"name": "amqp-cacoon", | ||
"version": "1.0.2", | ||
"version": "1.1.0", | ||
"description": "AmqpCacoon is an abstraction around amqplib that provides a simple interface with flow control included out of the box", | ||
@@ -5,0 +5,0 @@ "main": "build/index.js", |
177
readme.md
@@ -7,6 +7,2 @@ # AmqpCacoon | ||
## Caution | ||
WIP: Documetation is very much a work in progress. | ||
## Overview | ||
@@ -20,3 +16,4 @@ | ||
- Publish flow control included out of the box (Wait for drain event if we can't publish) | ||
- TODO timeout if drain event does not occure after some amount of time | ||
- timeout if drain event does not occurs after some amount of time when channel is not ready to receive a publish | ||
- Consume single or batch of messages | ||
@@ -45,90 +42,114 @@ ## Requirements to tests | ||
## Usage | ||
## Simple Usage | ||
### Connect and publish | ||
### Connect | ||
This allows you to connect to an amqp server | ||
```javascript | ||
const config = { | ||
messageBus: { | ||
// Protocol should be "amqp" or "amqps" | ||
protocol: 'amqp', | ||
// Username + Password on the RabbitMQ host | ||
username: 'valtech', | ||
password: 'iscool', | ||
// Host | ||
host: 'localhost', | ||
// Port | ||
port: 5672, | ||
// Queue setup | ||
testQueue: 'test-queue', | ||
}, | ||
}; | ||
const config = { | ||
messageBus: { | ||
// Protocol should be "amqp" or "amqps" | ||
protocol: 'amqp', | ||
// Username + Password on the RabbitMQ host | ||
username: 'valtech', | ||
password: 'iscool', | ||
// Host | ||
host: 'localhost', | ||
// Port | ||
port: 5672, | ||
// Queue setup | ||
testQueue: 'test-queue', | ||
}, | ||
}; | ||
let amqpCacoon = new AmqpCacoon({ | ||
protocol: config.messageBus.protocol, | ||
username: config.messageBus.username, | ||
password: config.messageBus.password, | ||
host: config.messageBus.host, | ||
port: config.messageBus.port, | ||
amqp_opts: {}, | ||
providers: { | ||
logger: logger, | ||
}, | ||
}); | ||
let channel = await amqpCacoon.getPublishChannel(); // Connects and sets up a publish channel | ||
let amqpCacoon = new AmqpCacoon({ | ||
protocol: config.messageBus.protocol, | ||
username: config.messageBus.username, | ||
password: config.messageBus.password, | ||
host: config.messageBus.host, | ||
port: config.messageBus.port, | ||
amqp_opts: {}, | ||
providers: { | ||
logger: logger, | ||
}, | ||
}); | ||
``` | ||
// Create queue and setup publish channel | ||
channel.assertQueue(config.messageBus.testQueue); | ||
### Publish | ||
// Publish | ||
await amqpCacoon.publish( | ||
'', // Publish directly to queue | ||
config.messageBus.testQueue, | ||
Buffer.from('TestMessage') | ||
); | ||
This allows you to publish a message | ||
```javascript | ||
let channel = await amqpCacoon.getPublishChannel(); // Connects and sets up a publish channel | ||
// Create queue and setup publish channel | ||
channel.assertQueue(config.messageBus.testQueue); | ||
// Publish | ||
await amqpCacoon.publish( | ||
'', // Publish directly to queue | ||
config.messageBus.testQueue, | ||
Buffer.from('TestMessage') | ||
); | ||
``` | ||
### Connect and Consume | ||
### Consume Single Message | ||
This will allow use to consume a single message. | ||
```javascript | ||
const config = { | ||
messageBus: { | ||
// Protocol should be "amqp" or "amqps" | ||
protocol: 'amqp', | ||
// Username + Password on the RabbitMQ host | ||
username: 'valtech', | ||
password: 'iscool', | ||
// Host | ||
host: 'localhost', | ||
// Port | ||
port: 5672, | ||
// Queue setup | ||
testQueue: 'test-queue', | ||
}, | ||
}; | ||
let channel = await amqpCacoon.getConsumerChannel(); // Connects and sets up a subscription channel | ||
let amqpCacoon = new AmqpCacoon({ | ||
protocol: config.messageBus.protocol, | ||
username: config.messageBus.username, | ||
password: config.messageBus.password, | ||
host: config.messageBus.host, | ||
port: config.messageBus.port, | ||
amqp_opts: {}, | ||
providers: { | ||
logger: logger, | ||
}, | ||
}); | ||
// Create queue | ||
channel.assertQueue(config.messageBus.testQueue); | ||
let channel = await amqpCacoon.getConsumerChannel(); // Connects and sets up a subscription channel | ||
// Consume single message at a time | ||
amqpCacoon.registerConsumer( | ||
config.messageBus.testQueue, | ||
async (channel: Channel, msg: ConsumeMessage) => { | ||
try { | ||
console.log('Messsage content', msg.content.toString()); | ||
// ... Do other processing here | ||
channel.ack(msg) // To ack a messages | ||
} catch (e) { | ||
channel.nack(msg) // To ack a messages | ||
} | ||
} | ||
); | ||
``` | ||
// Create queue | ||
channel.assertQueue(config.messageBus.testQueue); | ||
### Consume Message Batch | ||
// Consume | ||
amqpCacoon.registerConsumer( | ||
config.messageBus.testQueue, | ||
async (channel: Channel, msg: ConsumeMessage) => { | ||
console.log("Messsage content", msg.content.toString()); | ||
This allows you to wait until either some time has elapsed or a specified message size has been exceeded before the messages are consumed | ||
```javascript | ||
// Consume batch of message at a time. Configuration for time based or size based batching is provided | ||
amqpCacoon.registerConsumerBatch( | ||
config.messageBus.testQueue, | ||
async (channel: Channel, msg: ConsumeBatchMessages) => { | ||
try { | ||
console.log('Messsage content', msg.content.toString()); | ||
// ... Do other processing here | ||
msg.ackAll() // To ack all messages | ||
} catch (e) { | ||
msg.nackAll() // To nack all messages | ||
} | ||
); | ||
}, | ||
{ | ||
batching: { | ||
maxTimeMs: 60000, // Don't provide messages to the callback until at least 60000 ms have passed | ||
maxSizeBytes: 1024 * 1024 * 2, // 1024 * 1024 * 2 = 2 MB -don't provide message to the callback until 2 MB of data have been received | ||
} | ||
} | ||
); | ||
``` | ||
## Dealing With Channels | ||
This library expose amqplib channel when you call either `getConsumerChannel` or `getPublishChannel`. The channel is also exposed when registering a consumer. To learn more about that api see documentation for [amqplib](https://www.npmjs.com/package/amqplib). Just a couple thing that you should remember to do. | ||
1. Remember to ack or nack on all messages. | ||
2. An alternative is to pass an option into the `registerConsumer` to not require an ack (noAck). The problem with this is that if your application is reset or errors out, you may loose the message or messages. | ||
Sorry, the diff of this file is not supported yet
55150
8
912
153