@rewaa/event-broker
Advanced tools
Comparing version 6.0.1 to 6.1.0
@@ -15,1 +15,2 @@ export declare const DEFAULT_MESSAGE_DELAY = 0; | ||
export declare const TOPIC_SUBSCRIBE_CHUNK_SIZE = 50; | ||
export declare const PAYLOAD_STRUCTURE_VERSION_V2 = "v2"; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.TOPIC_SUBSCRIBE_CHUNK_SIZE = exports.DEFAULT_QUEUE_NAME_STANDARD = exports.DEFAULT_QUEUE_NAME_FIFO = exports.CUSTOM_HANDLER_NAME = exports.SOURCE_QUEUE_PREFIX = exports.DLQ_PREFIX = exports.DEFAULT_MAX_RETRIES = exports.DEFAULT_DLQ_MESSAGE_RETENTION_PERIOD = exports.DEFAULT_MESSAGE_RETENTION_PERIOD = exports.DEFAULT_VISIBILITY_TIMEOUT = exports.DEFAULT_BATCH_SIZE = exports.DEFAULT_MAX_PROCESSING_TIME = exports.DEFAULT_RETRY_COUNT = exports.DEFAULT_MESSAGE_DELAY = void 0; | ||
exports.PAYLOAD_STRUCTURE_VERSION_V2 = exports.TOPIC_SUBSCRIBE_CHUNK_SIZE = exports.DEFAULT_QUEUE_NAME_STANDARD = exports.DEFAULT_QUEUE_NAME_FIFO = exports.CUSTOM_HANDLER_NAME = exports.SOURCE_QUEUE_PREFIX = exports.DLQ_PREFIX = exports.DEFAULT_MAX_RETRIES = exports.DEFAULT_DLQ_MESSAGE_RETENTION_PERIOD = exports.DEFAULT_MESSAGE_RETENTION_PERIOD = exports.DEFAULT_VISIBILITY_TIMEOUT = exports.DEFAULT_BATCH_SIZE = exports.DEFAULT_MAX_PROCESSING_TIME = exports.DEFAULT_RETRY_COUNT = exports.DEFAULT_MESSAGE_DELAY = void 0; | ||
exports.DEFAULT_MESSAGE_DELAY = 0; | ||
@@ -18,2 +18,3 @@ exports.DEFAULT_RETRY_COUNT = 0; | ||
exports.TOPIC_SUBSCRIBE_CHUNK_SIZE = 50; | ||
exports.PAYLOAD_STRUCTURE_VERSION_V2 = 'v2'; | ||
//# sourceMappingURL=constants.js.map |
@@ -10,3 +10,3 @@ import { Message } from "@aws-sdk/client-sqs"; | ||
bootstrap(topics?: Topic[]): Promise<void>; | ||
emit(eventName: string, options?: IEmitOptions, ...args: any[]): Promise<boolean>; | ||
emit(eventName: string, options?: IEmitOptions, payload?: any): Promise<boolean>; | ||
emitBatch(eventName: string, messages: IBatchMessage[], options?: IBatchEmitOptions): Promise<IFailedEmitBatchMessage[]>; | ||
@@ -13,0 +13,0 @@ emitLocal(eventName: string, ...args: any[]): boolean; |
@@ -22,5 +22,5 @@ "use strict"; | ||
} | ||
async emit(eventName, options, ...args) { | ||
async emit(eventName, options, payload) { | ||
if (this.options.useExternalBroker) { | ||
return await this.emitter.emit(eventName, options, ...args); | ||
return await this.emitter.emit(eventName, options, payload); | ||
} | ||
@@ -27,0 +27,0 @@ return false; |
@@ -32,6 +32,6 @@ import { IEmitterOptions, IEmitter, Queue, Topic, IEmitOptions, EventListener, ConsumeOptions, ProcessMessageOptions, IBatchMessage, IBatchEmitOptions, IFailedEmitBatchMessage, IFailedConsumerMessages, Logger, IMessage, EmitPayload, EmitBatchPayload } from "../types"; | ||
private logFailedEvent; | ||
emitToTopic(topic: Topic, options?: IEmitOptions, ...args: any[]): Promise<boolean>; | ||
emitToQueue(topic: Topic, options?: IEmitOptions, ...args: any[]): Promise<boolean>; | ||
getEmitPayload(eventName: string, options?: IEmitOptions, ...args: any[]): EmitPayload; | ||
emit(eventName: string, options?: IEmitOptions, ...args: any[]): Promise<boolean>; | ||
emitToTopic(topic: Topic, options?: IEmitOptions, payload?: any): Promise<boolean>; | ||
emitToQueue(topic: Topic, options?: IEmitOptions, payload?: any): Promise<boolean>; | ||
getEmitPayload(eventName: string, options?: IEmitOptions, payload?: any): EmitPayload; | ||
emit(eventName: string, options?: IEmitOptions, payload?: any): Promise<boolean>; | ||
emitBatchToTopic(topic: Topic, messages: IBatchMessage[]): Promise<IFailedEmitBatchMessage[]>; | ||
@@ -38,0 +38,0 @@ emitBatchToQueue(topic: Topic, messages: IBatchMessage[]): Promise<IFailedEmitBatchMessage[]>; |
@@ -235,3 +235,3 @@ "use strict"; | ||
const topicArn = this.getTopicArn(this.getTopicName(topic)); | ||
subscriptionPromises.push(this.snsProducer.subscribeToTopic(topicArn, queueArn, topic.filterPolicy, topic.deliverRawMessage)); | ||
subscriptionPromises.push(this.snsProducer.subscribeToTopic(topicArn, queueArn, topic.filterPolicy, true)); | ||
} | ||
@@ -244,3 +244,3 @@ await Promise.all(subscriptionPromises); | ||
} | ||
async emitToTopic(topic, options, ...args) { | ||
async emitToTopic(topic, options, payload) { | ||
const topicArn = this.getTopicArn(this.getTopicName(topic)); | ||
@@ -252,7 +252,7 @@ await this.snsProducer.send(topicArn, { | ||
deduplicationId: options === null || options === void 0 ? void 0 : options.deduplicationId, | ||
data: args, | ||
data: payload, | ||
}); | ||
return true; | ||
} | ||
async emitToQueue(topic, options, ...args) { | ||
async emitToQueue(topic, options, payload) { | ||
const queueUrl = this.getQueueUrl(this.getQueueName(topic)); | ||
@@ -262,3 +262,3 @@ await this.sqsProducer.send(queueUrl, { | ||
eventName: topic.name, | ||
data: args, | ||
data: payload, | ||
messageAttributes: options === null || options === void 0 ? void 0 : options.MessageAttributes, | ||
@@ -271,3 +271,3 @@ deduplicationId: options === null || options === void 0 ? void 0 : options.deduplicationId, | ||
} | ||
getEmitPayload(eventName, options, ...args) { | ||
getEmitPayload(eventName, options, payload) { | ||
const topic = { | ||
@@ -282,3 +282,3 @@ name: eventName, | ||
eventName: topic.name, | ||
data: args, | ||
data: payload, | ||
messageAttributes: options === null || options === void 0 ? void 0 : options.MessageAttributes, | ||
@@ -298,4 +298,5 @@ deduplicationId: options === null || options === void 0 ? void 0 : options.deduplicationId, | ||
} | ||
async emit(eventName, options, ...args) { | ||
async emit(eventName, options, payload) { | ||
var _a, _b, _c, _d; | ||
let modifiedArgs; | ||
try { | ||
@@ -309,10 +310,10 @@ const topic = { | ||
let response = false; | ||
const modifiedArgs = (await ((_b = (_a = this.options.hooks) === null || _a === void 0 ? void 0 : _a.beforeEmit) === null || _b === void 0 ? void 0 : _b.call(_a, eventName, args))) || args; | ||
modifiedArgs = (await ((_b = (_a = this.options.hooks) === null || _a === void 0 ? void 0 : _a.beforeEmit) === null || _b === void 0 ? void 0 : _b.call(_a, eventName, payload))) || payload; | ||
if (topic.exchangeType === types_1.ExchangeType.Queue) { | ||
response = await this.emitToQueue(topic, options, ...modifiedArgs); | ||
response = await this.emitToQueue(topic, options, modifiedArgs); | ||
} | ||
else { | ||
response = await this.emitToTopic(topic, options, ...args); | ||
response = await this.emitToTopic(topic, options, modifiedArgs); | ||
} | ||
await ((_d = (_c = this.options.hooks) === null || _c === void 0 ? void 0 : _c.afterEmit) === null || _d === void 0 ? void 0 : _d.call(_c, eventName, args)); | ||
await ((_d = (_c = this.options.hooks) === null || _c === void 0 ? void 0 : _c.afterEmit) === null || _d === void 0 ? void 0 : _d.call(_c, eventName, modifiedArgs)); | ||
return response; | ||
@@ -325,6 +326,6 @@ } | ||
topic: eventName, | ||
event: args, | ||
event: modifiedArgs !== null && modifiedArgs !== void 0 ? modifiedArgs : payload, | ||
error: error, | ||
}); | ||
return false; | ||
throw error; | ||
} | ||
@@ -517,3 +518,3 @@ } | ||
async onMessageReceived(receivedMessage, queueUrl, executionContext) { | ||
var _a, _b, _c, _d; | ||
var _a, _b, _c, _d, _e, _f, _g, _h; | ||
let message; | ||
@@ -534,2 +535,7 @@ try { | ||
} | ||
const payloadStructureVersion = ((_b = (_a = message.messageAttributes) === null || _a === void 0 ? void 0 : _a.PayloadVersion) === null || _b === void 0 ? void 0 : _b.StringValue) || | ||
((_d = (_c = message.messageAttributes) === null || _c === void 0 ? void 0 : _c.PayloadVersion) === null || _d === void 0 ? void 0 : _d.stringValue); | ||
if (payloadStructureVersion !== constants_1.PAYLOAD_STRUCTURE_VERSION_V2) { | ||
message.data = message.data[0]; | ||
} | ||
const listeners = this.getTopicListeners(message.eventName, this.getQueueNameFromUrl(queueUrl)); | ||
@@ -548,3 +554,3 @@ if (!listeners) { | ||
try { | ||
const data = await ((_b = (_a = this.options.hooks) === null || _a === void 0 ? void 0 : _a.beforeConsume) === null || _b === void 0 ? void 0 : _b.call(_a, message.eventName, message.data)); | ||
const data = await ((_f = (_e = this.options.hooks) === null || _e === void 0 ? void 0 : _e.beforeConsume) === null || _f === void 0 ? void 0 : _f.call(_e, message.eventName, message.data)); | ||
for (const listener of listeners) { | ||
@@ -557,3 +563,3 @@ await listener(data || message.data, { | ||
} | ||
await ((_d = (_c = this.options.hooks) === null || _c === void 0 ? void 0 : _c.afterConsume) === null || _d === void 0 ? void 0 : _d.call(_c, message.eventName, message.data)); | ||
await ((_h = (_g = this.options.hooks) === null || _g === void 0 ? void 0 : _g.afterConsume) === null || _h === void 0 ? void 0 : _h.call(_g, message.eventName, message.data)); | ||
} | ||
@@ -582,2 +588,7 @@ catch (error) { | ||
} | ||
// TODO: Remove message.messageAttributes in future release | ||
message.messageAttributes = | ||
receivedMessage.MessageAttributes || | ||
receivedMessage.messageAttributes || | ||
message.messageAttributes; | ||
return message; | ||
@@ -584,0 +595,0 @@ } |
@@ -10,2 +10,3 @@ import { SNS, SNSClientConfig, PublishResponse, PublishInput, SubscribeResponse, PublishBatchResponse, PublishBatchInput } from "@aws-sdk/client-sns"; | ||
getPublishInput(topicArn: string, message: ISNSMessage): PublishInput; | ||
private getMessageAttributesForPublish; | ||
sendBatch: (topicArn: string, messages: ISNSMessage[]) => Promise<PublishBatchResponse>; | ||
@@ -12,0 +13,0 @@ getBatchPublishInput(topicArn: string, messages: ISNSMessage[]): PublishBatchInput; |
@@ -6,2 +6,3 @@ "use strict"; | ||
const uuid_1 = require("uuid"); | ||
const constants_1 = require("../constants"); | ||
class SNSProducer { | ||
@@ -74,5 +75,11 @@ constructor(logger, config) { | ||
} | ||
params.MessageAttributes = message.messageAttributes; | ||
params.MessageAttributes = this.getMessageAttributesForPublish(message.messageAttributes); | ||
return params; | ||
} | ||
getMessageAttributesForPublish(messageAttributes) { | ||
return Object.assign(Object.assign({}, messageAttributes), { PayloadVersion: { | ||
DataType: "String", | ||
StringValue: constants_1.PAYLOAD_STRUCTURE_VERSION_V2, | ||
} }); | ||
} | ||
getBatchPublishInput(topicArn, messages) { | ||
@@ -83,3 +90,3 @@ const isFifo = this.isFifoTopic(topicArn); | ||
PublishBatchRequestEntries: messages.map((message) => { | ||
return Object.assign({ Id: message.id, Message: JSON.stringify(message), MessageAttributes: message.messageAttributes }, (isFifo && { | ||
return Object.assign({ Id: message.id, Message: JSON.stringify(message), MessageAttributes: this.getMessageAttributesForPublish(message.messageAttributes) }, (isFifo && { | ||
MessageDeduplicationId: message.deduplicationId || (0, uuid_1.v4)(), | ||
@@ -86,0 +93,0 @@ MessageGroupId: message.messageGroupId, |
@@ -204,2 +204,5 @@ "use strict"; | ||
StringValue: queueUrl, | ||
}, PayloadVersion: { | ||
DataType: "String", | ||
StringValue: constants_1.PAYLOAD_STRUCTURE_VERSION_V2, | ||
} }); | ||
@@ -206,0 +209,0 @@ } |
@@ -230,11 +230,2 @@ /// <reference types="node" /> | ||
contentBasedDeduplication?: boolean; | ||
/** | ||
* For Fanout based Topics, when this is set to true, the message is delivered | ||
* in the format that SQS expects, as is | ||
* When set to false, the message will be delivered with SNS Metadata as well | ||
* The broker will parse both messages for the body | ||
* | ||
* Refer: https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html | ||
*/ | ||
deliverRawMessage?: boolean; | ||
} | ||
@@ -369,3 +360,3 @@ export interface Hooks { | ||
} | ||
export type EventListener<T> = (args: T[], metadata?: MessageMetaData) => Promise<void>; | ||
export type EventListener<T> = (args: T, metadata?: MessageMetaData) => Promise<void>; | ||
export interface IEmitter { | ||
@@ -378,3 +369,3 @@ /** | ||
bootstrap(topics?: Topic[]): Promise<void>; | ||
emit(eventName: string, options?: IEmitOptions, ...args: any[]): Promise<boolean>; | ||
emit(eventName: string, options?: IEmitOptions, payload?: any): Promise<boolean>; | ||
/** | ||
@@ -388,3 +379,3 @@ * @param eventName Name of the topic/event to emit in batch | ||
removeAllListener(): void; | ||
removeListener(eventName: string, listener: EventListener<any>): void; | ||
removeListener(eventName: string, listener: EventListener<any>, consumeOptions?: ConsumeOptions): void; | ||
/** | ||
@@ -460,3 +451,3 @@ * Use this method to when you need to consume messages by yourself | ||
*/ | ||
getEmitPayload(eventName: string, options?: IEmitOptions, ...args: any[]): EmitPayload; | ||
getEmitPayload(eventName: string, options?: IEmitOptions, payload?: any): EmitPayload; | ||
/** | ||
@@ -484,2 +475,5 @@ * @return Returns an exact copy of batch payload handed to aws client for sending. | ||
UnsubscribeURL: string; | ||
MessageAttributes: { | ||
[key: string]: MessageAttributeValue; | ||
}; | ||
} | ||
@@ -486,0 +480,0 @@ export type QueueEmitPayload = SendMessageRequest; |
{ | ||
"name": "@rewaa/event-broker", | ||
"version": "6.0.1", | ||
"version": "6.1.0", | ||
"description": "A broker for all the events that Rewaa will ever produce or consume", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
236219
1965