@qrvey/event-broker
Version: 1.0.0
![coverage](https://img.shields.io/badge/unit_test_coverage-0%25-brightgreen)
The @qrvey/event-broker
package provides a unified interface for work with AWS SQS, AWS EventBridge and RabbitMQ based
on the specific configurations of the service.
Installation
You can install the package using npm or yarn:
npm install @qrvey/event-broker
Or with yarn:
yarn add @qrvey/event-broker
Note: If the application does not have @aws-sdk/aws-sdk/client-sqs, @aws-sdk/client-eventbridge,
@smithy/node-http-handler and @smithy/middleware-retry it must be installed manually to work with AWS SQS, AWS
EventBridge and amqplib to work with RabbitMQ
Require environment variables
AWS_ACCOUNT_ID;
AWS_DEFAULT_REGION;
AWS_ACCESS_KEY_ID;
AWS_SECRET_ACCESS_KEY;
RABBITMQ_HOST;
RABBITMQ_USER;
RABBITMQ_PASSWORD;
RABBITMQ_PORT;
PLATFORM_TYPE;
No-require environment variables
AWS_EVENT_BUS_NAME;
Conditions for AWS SQS or RA
@qrvey/event-broker
[QueueService] package can deliver message either to AWS SQS or RabbitMQ, depending on certain
conditions. These conditions are determined by:
-
AWS SQS:
- If the environment variable
PLATFORM_TYPE
is not set or have a different value to 'CONTAINER'
.
-
RabbitMQ:
- If the environment variable
PLATFORM_TYPE
is set to 'CONTAINER'
.
Usage Example
const { QueueService } = require('@qrvey/event-broker');
const queueSchema = {
name: 'MY_SAMPLE_QUEUE',
alias: 'MY_SAMPLE_QUEUE',
};
const queueMessage = {
company: 'Qrvey',
name: 'Jhon Doe',
};
const options = {
delaySeconds: 3,
messageGroupId:'GROUP1',
headers:{
'x-retry-count':0
}
};
const queueService = new QueueService();
let connected = false;
try {
await queueService.connect();
connected = true;
const res = await queueService.sendMessage(queueSchema, message, options);
await queueService.disconnect();
console.log('res: ', res);
} catch (error) {
console.log(error);
} finally {
if (connected) await queueService.disconnect();
}
Conditions for AWS EventBridge or RA
@qrvey/event-broker
[EventBusService] package can deliver events either to AWS EventBridge or RabbitMQ, depending
on certain conditions. These conditions are determined by:
-
AWS EventBridge:
- If the environment variable
PLATFORM_TYPE
is not set or have a different value to 'CONTAINER'
.
-
RabbitMQ:
- If the environment variable
PLATFORM_TYPE
is set to 'CONTAINER'
.
Usage Example
const { EventBusService } = require('@qrvey/event-broker');
const queueSchema = {
name: 'MY_SAMPLE_BUS',
alias: 'MY_SAMPLE_QUEUE',
};
const eventBridgeMessage = {
EventBusName: 'my-event-bus',
Detail: { 'key1': 'value1', 'key2': 'value2' },
DetailType: 'MyEventType',
Source: 'my.application',
Time: '2024-06-26T12:00:00Z',
Resources: ['arn:aws:s3:::my-bucket'],
AccountId: '123456789012',
};
const options = {
headers: {
DelaySeconds: 3,
},
};
const queueService = new QueueService();
let connected = false;
try {
await queueService.connect();
connected = true;
const res = await queueService.sendMessage(queueSchema, message, options);
await queueService.disconnect();
console.log('res: ', res);
} catch (error) {
console.log(error);
} finally {
if (connected) await queueService.disconnect();
}
API
Class
QueueService
Functions
connect(): Promise<any>
sendMessage(queue: IQueueSchema, message: object, options: IFunctionMapping, options: Options.Publish): Promise<any>
queue
: Object with the queue information.queue.name
: The name of the queue in AWS.queue.alias
: The name of the queue in RabbitMQ.message
: the body of the messages to be sent to the queue.options
: An object with additional configuration options.options.delaySeconds
(optional): number of second to wait before to process messageoptions.messageGroupId
(optional): Create dynamics queues, that will be the original queue name, plus the messageGroupId. Defaults: Concurrency 1, timeOut for the queue after not receive new message: 1 minuteoptions.headers
(optional): headers of the message- Returns: A promise that resolves with the result of the invoked function or rejects with an error if the invocation fails.
deleteMessage(queue: string, receiptHandle: string): Promise<any>
queue
: The name of the queue.receiptHandle
:
disconnect(): Promise<any>
checkQueue(queue: IQueueSchema): Promise<any>
queue
: Object with the queue information.queue.name
: The name of the queue in AWS.queue.alias
: The name of the queue in RabbitMQ.- Returns:
status
: true id the queue existconsumerCount
: number of consumers of the queuemessageCount
: number of messages in the queuequeue
: name of the queue
confirmMessage(channelId: string, messageId: string, ack: boolean): Promise<any>
channelId
: id of the channel who consumed the message.messageId
: id of the message to be confirmed.ack
: status of the acknowledge.(true
message processed successfully and removed of the queue, false
message failed to be processed and require retry or send to DLQ)- Returns: http response
Class
EventBusService
Functions
connect(): Promise<any>
sendMessage(queue: IQueueSchema, message: IEventBridgeMessage): Promise<any>
IQueueSchema
: Object with the queue information.queueSchema.name
: The name of the bus in AWS EventBridge.queueSchema.alias
: The name of the queue in RabbitMQ.message
: The body of the messages to be sent to the queue.
message.EventBusName
: The name of the event bus in AWS EventBridge. Optional.message.Detail
: A JSON string containing detailed information about the event. Required.message.DetailType
: An arbitrary identifier for the type of the event. Required.message.Source
: The source of the event, typically an identifier of the application or service generating the
event. Required.message.Time
: The time the event was generated. Optional.message.Resources
: A list of related resources, each represented as a string containing the ARN of an AWS
resource. Optional.message.AccountId
: The AWS account ID where the event was generated. Optional. If not specified, the current
context's account ID is used.- Returns: A promise that resolves with the result of the invoked function or rejects with an error if the
invocation fails.
disconnect(): Promise<any>