New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@rewaa/event-broker

Package Overview
Dependencies
Maintainers
1
Versions
49
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@rewaa/event-broker - npm Package Compare versions

Comparing version 6.0.1 to 6.1.0

1

dist/constants.d.ts

@@ -15,1 +15,2 @@ export declare const DEFAULT_MESSAGE_DELAY = 0;

export declare const TOPIC_SUBSCRIBE_CHUNK_SIZE = 50;
export declare const PAYLOAD_STRUCTURE_VERSION_V2 = "v2";

3

dist/constants.js
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.TOPIC_SUBSCRIBE_CHUNK_SIZE = exports.DEFAULT_QUEUE_NAME_STANDARD = exports.DEFAULT_QUEUE_NAME_FIFO = exports.CUSTOM_HANDLER_NAME = exports.SOURCE_QUEUE_PREFIX = exports.DLQ_PREFIX = exports.DEFAULT_MAX_RETRIES = exports.DEFAULT_DLQ_MESSAGE_RETENTION_PERIOD = exports.DEFAULT_MESSAGE_RETENTION_PERIOD = exports.DEFAULT_VISIBILITY_TIMEOUT = exports.DEFAULT_BATCH_SIZE = exports.DEFAULT_MAX_PROCESSING_TIME = exports.DEFAULT_RETRY_COUNT = exports.DEFAULT_MESSAGE_DELAY = void 0;
exports.PAYLOAD_STRUCTURE_VERSION_V2 = exports.TOPIC_SUBSCRIBE_CHUNK_SIZE = exports.DEFAULT_QUEUE_NAME_STANDARD = exports.DEFAULT_QUEUE_NAME_FIFO = exports.CUSTOM_HANDLER_NAME = exports.SOURCE_QUEUE_PREFIX = exports.DLQ_PREFIX = exports.DEFAULT_MAX_RETRIES = exports.DEFAULT_DLQ_MESSAGE_RETENTION_PERIOD = exports.DEFAULT_MESSAGE_RETENTION_PERIOD = exports.DEFAULT_VISIBILITY_TIMEOUT = exports.DEFAULT_BATCH_SIZE = exports.DEFAULT_MAX_PROCESSING_TIME = exports.DEFAULT_RETRY_COUNT = exports.DEFAULT_MESSAGE_DELAY = void 0;
exports.DEFAULT_MESSAGE_DELAY = 0;

@@ -18,2 +18,3 @@ exports.DEFAULT_RETRY_COUNT = 0;

exports.TOPIC_SUBSCRIBE_CHUNK_SIZE = 50;
exports.PAYLOAD_STRUCTURE_VERSION_V2 = 'v2';
//# sourceMappingURL=constants.js.map

@@ -10,3 +10,3 @@ import { Message } from "@aws-sdk/client-sqs";

bootstrap(topics?: Topic[]): Promise<void>;
emit(eventName: string, options?: IEmitOptions, ...args: any[]): Promise<boolean>;
emit(eventName: string, options?: IEmitOptions, payload?: any): Promise<boolean>;
emitBatch(eventName: string, messages: IBatchMessage[], options?: IBatchEmitOptions): Promise<IFailedEmitBatchMessage[]>;

@@ -13,0 +13,0 @@ emitLocal(eventName: string, ...args: any[]): boolean;

@@ -22,5 +22,5 @@ "use strict";

}
async emit(eventName, options, ...args) {
async emit(eventName, options, payload) {
if (this.options.useExternalBroker) {
return await this.emitter.emit(eventName, options, ...args);
return await this.emitter.emit(eventName, options, payload);
}

@@ -27,0 +27,0 @@ return false;

@@ -32,6 +32,6 @@ import { IEmitterOptions, IEmitter, Queue, Topic, IEmitOptions, EventListener, ConsumeOptions, ProcessMessageOptions, IBatchMessage, IBatchEmitOptions, IFailedEmitBatchMessage, IFailedConsumerMessages, Logger, IMessage, EmitPayload, EmitBatchPayload } from "../types";

private logFailedEvent;
emitToTopic(topic: Topic, options?: IEmitOptions, ...args: any[]): Promise<boolean>;
emitToQueue(topic: Topic, options?: IEmitOptions, ...args: any[]): Promise<boolean>;
getEmitPayload(eventName: string, options?: IEmitOptions, ...args: any[]): EmitPayload;
emit(eventName: string, options?: IEmitOptions, ...args: any[]): Promise<boolean>;
emitToTopic(topic: Topic, options?: IEmitOptions, payload?: any): Promise<boolean>;
emitToQueue(topic: Topic, options?: IEmitOptions, payload?: any): Promise<boolean>;
getEmitPayload(eventName: string, options?: IEmitOptions, payload?: any): EmitPayload;
emit(eventName: string, options?: IEmitOptions, payload?: any): Promise<boolean>;
emitBatchToTopic(topic: Topic, messages: IBatchMessage[]): Promise<IFailedEmitBatchMessage[]>;

@@ -38,0 +38,0 @@ emitBatchToQueue(topic: Topic, messages: IBatchMessage[]): Promise<IFailedEmitBatchMessage[]>;

@@ -235,3 +235,3 @@ "use strict";

const topicArn = this.getTopicArn(this.getTopicName(topic));
subscriptionPromises.push(this.snsProducer.subscribeToTopic(topicArn, queueArn, topic.filterPolicy, topic.deliverRawMessage));
subscriptionPromises.push(this.snsProducer.subscribeToTopic(topicArn, queueArn, topic.filterPolicy, true));
}

@@ -244,3 +244,3 @@ await Promise.all(subscriptionPromises);

}
async emitToTopic(topic, options, ...args) {
async emitToTopic(topic, options, payload) {
const topicArn = this.getTopicArn(this.getTopicName(topic));

@@ -252,7 +252,7 @@ await this.snsProducer.send(topicArn, {

deduplicationId: options === null || options === void 0 ? void 0 : options.deduplicationId,
data: args,
data: payload,
});
return true;
}
async emitToQueue(topic, options, ...args) {
async emitToQueue(topic, options, payload) {
const queueUrl = this.getQueueUrl(this.getQueueName(topic));

@@ -262,3 +262,3 @@ await this.sqsProducer.send(queueUrl, {

eventName: topic.name,
data: args,
data: payload,
messageAttributes: options === null || options === void 0 ? void 0 : options.MessageAttributes,

@@ -271,3 +271,3 @@ deduplicationId: options === null || options === void 0 ? void 0 : options.deduplicationId,

}
getEmitPayload(eventName, options, ...args) {
getEmitPayload(eventName, options, payload) {
const topic = {

@@ -282,3 +282,3 @@ name: eventName,

eventName: topic.name,
data: args,
data: payload,
messageAttributes: options === null || options === void 0 ? void 0 : options.MessageAttributes,

@@ -298,4 +298,5 @@ deduplicationId: options === null || options === void 0 ? void 0 : options.deduplicationId,

}
async emit(eventName, options, ...args) {
async emit(eventName, options, payload) {
var _a, _b, _c, _d;
let modifiedArgs;
try {

@@ -309,10 +310,10 @@ const topic = {

let response = false;
const modifiedArgs = (await ((_b = (_a = this.options.hooks) === null || _a === void 0 ? void 0 : _a.beforeEmit) === null || _b === void 0 ? void 0 : _b.call(_a, eventName, args))) || args;
modifiedArgs = (await ((_b = (_a = this.options.hooks) === null || _a === void 0 ? void 0 : _a.beforeEmit) === null || _b === void 0 ? void 0 : _b.call(_a, eventName, payload))) || payload;
if (topic.exchangeType === types_1.ExchangeType.Queue) {
response = await this.emitToQueue(topic, options, ...modifiedArgs);
response = await this.emitToQueue(topic, options, modifiedArgs);
}
else {
response = await this.emitToTopic(topic, options, ...args);
response = await this.emitToTopic(topic, options, modifiedArgs);
}
await ((_d = (_c = this.options.hooks) === null || _c === void 0 ? void 0 : _c.afterEmit) === null || _d === void 0 ? void 0 : _d.call(_c, eventName, args));
await ((_d = (_c = this.options.hooks) === null || _c === void 0 ? void 0 : _c.afterEmit) === null || _d === void 0 ? void 0 : _d.call(_c, eventName, modifiedArgs));
return response;

@@ -325,6 +326,6 @@ }

topic: eventName,
event: args,
event: modifiedArgs !== null && modifiedArgs !== void 0 ? modifiedArgs : payload,
error: error,
});
return false;
throw error;
}

@@ -517,3 +518,3 @@ }

async onMessageReceived(receivedMessage, queueUrl, executionContext) {
var _a, _b, _c, _d;
var _a, _b, _c, _d, _e, _f, _g, _h;
let message;

@@ -534,2 +535,7 @@ try {

}
const payloadStructureVersion = ((_b = (_a = message.messageAttributes) === null || _a === void 0 ? void 0 : _a.PayloadVersion) === null || _b === void 0 ? void 0 : _b.StringValue) ||
((_d = (_c = message.messageAttributes) === null || _c === void 0 ? void 0 : _c.PayloadVersion) === null || _d === void 0 ? void 0 : _d.stringValue);
if (payloadStructureVersion !== constants_1.PAYLOAD_STRUCTURE_VERSION_V2) {
message.data = message.data[0];
}
const listeners = this.getTopicListeners(message.eventName, this.getQueueNameFromUrl(queueUrl));

@@ -548,3 +554,3 @@ if (!listeners) {

try {
const data = await ((_b = (_a = this.options.hooks) === null || _a === void 0 ? void 0 : _a.beforeConsume) === null || _b === void 0 ? void 0 : _b.call(_a, message.eventName, message.data));
const data = await ((_f = (_e = this.options.hooks) === null || _e === void 0 ? void 0 : _e.beforeConsume) === null || _f === void 0 ? void 0 : _f.call(_e, message.eventName, message.data));
for (const listener of listeners) {

@@ -557,3 +563,3 @@ await listener(data || message.data, {

}
await ((_d = (_c = this.options.hooks) === null || _c === void 0 ? void 0 : _c.afterConsume) === null || _d === void 0 ? void 0 : _d.call(_c, message.eventName, message.data));
await ((_h = (_g = this.options.hooks) === null || _g === void 0 ? void 0 : _g.afterConsume) === null || _h === void 0 ? void 0 : _h.call(_g, message.eventName, message.data));
}

@@ -582,2 +588,7 @@ catch (error) {

}
// TODO: Remove message.messageAttributes in future release
message.messageAttributes =
receivedMessage.MessageAttributes ||
receivedMessage.messageAttributes ||
message.messageAttributes;
return message;

@@ -584,0 +595,0 @@ }

@@ -10,2 +10,3 @@ import { SNS, SNSClientConfig, PublishResponse, PublishInput, SubscribeResponse, PublishBatchResponse, PublishBatchInput } from "@aws-sdk/client-sns";

getPublishInput(topicArn: string, message: ISNSMessage): PublishInput;
private getMessageAttributesForPublish;
sendBatch: (topicArn: string, messages: ISNSMessage[]) => Promise<PublishBatchResponse>;

@@ -12,0 +13,0 @@ getBatchPublishInput(topicArn: string, messages: ISNSMessage[]): PublishBatchInput;

@@ -6,2 +6,3 @@ "use strict";

const uuid_1 = require("uuid");
const constants_1 = require("../constants");
class SNSProducer {

@@ -74,5 +75,11 @@ constructor(logger, config) {

}
params.MessageAttributes = message.messageAttributes;
params.MessageAttributes = this.getMessageAttributesForPublish(message.messageAttributes);
return params;
}
getMessageAttributesForPublish(messageAttributes) {
return Object.assign(Object.assign({}, messageAttributes), { PayloadVersion: {
DataType: "String",
StringValue: constants_1.PAYLOAD_STRUCTURE_VERSION_V2,
} });
}
getBatchPublishInput(topicArn, messages) {

@@ -83,3 +90,3 @@ const isFifo = this.isFifoTopic(topicArn);

PublishBatchRequestEntries: messages.map((message) => {
return Object.assign({ Id: message.id, Message: JSON.stringify(message), MessageAttributes: message.messageAttributes }, (isFifo && {
return Object.assign({ Id: message.id, Message: JSON.stringify(message), MessageAttributes: this.getMessageAttributesForPublish(message.messageAttributes) }, (isFifo && {
MessageDeduplicationId: message.deduplicationId || (0, uuid_1.v4)(),

@@ -86,0 +93,0 @@ MessageGroupId: message.messageGroupId,

@@ -204,2 +204,5 @@ "use strict";

StringValue: queueUrl,
}, PayloadVersion: {
DataType: "String",
StringValue: constants_1.PAYLOAD_STRUCTURE_VERSION_V2,
} });

@@ -206,0 +209,0 @@ }

@@ -230,11 +230,2 @@ /// <reference types="node" />

contentBasedDeduplication?: boolean;
/**
* For Fanout based Topics, when this is set to true, the message is delivered
* in the format that SQS expects, as is
* When set to false, the message will be delivered with SNS Metadata as well
* The broker will parse both messages for the body
*
* Refer: https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html
*/
deliverRawMessage?: boolean;
}

@@ -369,3 +360,3 @@ export interface Hooks {

}
export type EventListener<T> = (args: T[], metadata?: MessageMetaData) => Promise<void>;
export type EventListener<T> = (args: T, metadata?: MessageMetaData) => Promise<void>;
export interface IEmitter {

@@ -378,3 +369,3 @@ /**

bootstrap(topics?: Topic[]): Promise<void>;
emit(eventName: string, options?: IEmitOptions, ...args: any[]): Promise<boolean>;
emit(eventName: string, options?: IEmitOptions, payload?: any): Promise<boolean>;
/**

@@ -388,3 +379,3 @@ * @param eventName Name of the topic/event to emit in batch

removeAllListener(): void;
removeListener(eventName: string, listener: EventListener<any>): void;
removeListener(eventName: string, listener: EventListener<any>, consumeOptions?: ConsumeOptions): void;
/**

@@ -460,3 +451,3 @@ * Use this method to when you need to consume messages by yourself

*/
getEmitPayload(eventName: string, options?: IEmitOptions, ...args: any[]): EmitPayload;
getEmitPayload(eventName: string, options?: IEmitOptions, payload?: any): EmitPayload;
/**

@@ -484,2 +475,5 @@ * @return Returns an exact copy of batch payload handed to aws client for sending.

UnsubscribeURL: string;
MessageAttributes: {
[key: string]: MessageAttributeValue;
};
}

@@ -486,0 +480,0 @@ export type QueueEmitPayload = SendMessageRequest;

{
"name": "@rewaa/event-broker",
"version": "6.0.1",
"version": "6.1.0",
"description": "A broker for all the events that Rewaa will ever produce or consume",

@@ -5,0 +5,0 @@ "main": "dist/index.js",

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc