🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
Book a DemoInstallSign in
Socket

@jetit/publisher

Package Overview
Dependencies
Maintainers
2
Versions
53
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@jetit/publisher - npm Package Compare versions

Comparing version

to
3.3.5

src/lib/redis/logger.d.ts

8

package.json
{
"name": "@jetit/publisher",
"version": "3.3.3",
"version": "3.3.5",
"type": "commonjs",
"dependencies": {
"@jetit/id": "0.0.11",
"@jetit/id": "^0.0.12",
"ioredis": "^5.3.0",
"rxjs": "^7.8.0"
},
"peerDependencies": {
"rxjs": "^7.8.0",
"tslib": "1.14.1"

@@ -12,0 +10,0 @@ },

@@ -201,2 +201,4 @@ # publisher

8. Message broadcasting: The publisher can be used to broadcast messages to multiple consumers or subscribers, allowing for efficient and scalable communication in applications with many components or services.
8. Message broadcasting: The publisher can be used to broadcast messages to multiple consumers or subscribers, allowing for efficient and scalable communication in applications with many components or services.
9. Multicast Publishing: This is the existing PUB/SUB implementation but with the event data being stored into streams for additional processing

@@ -5,2 +5,3 @@ "use strict";

const ioredis_1 = require("ioredis");
const logger_1 = require("./logger");
class RedisRegistry {

@@ -29,4 +30,5 @@ static attemptConnection(connectionKey, storeRef = 0) {

connection.on('error', (error) => {
console.error(`PUBLISHER: Redis connection error : ${error.message}`);
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Redis connection error : ${error.message}`);
connection.removeAllListeners();
connection.disconnect();
RedisRegistry.attemptConnection(connectionKey, storeRef);

@@ -47,3 +49,3 @@ });

connection.disconnect(true);
console.error('PUBLISHER: failed to ping redis, disconnecting and restarting service.');
logger_1.PUBLISHER_LOGGER.error('PUBLISHER: failed to ping redis, disconnecting and restarting service.');
process.exit(0);

@@ -50,0 +52,0 @@ }

@@ -6,2 +6,3 @@ "use strict";

const rxjs_1 = require("rxjs");
const logger_1 = require("./logger");
const registry_1 = require("./registry");

@@ -22,3 +23,3 @@ const utils_1 = require("./utils");

this.scheduledMessagesTimer = (0, rxjs_1.interval)(duration).subscribe(() => {
console.log('Checking Streams messages at ', new Date().toISOString(), '...');
logger_1.PUBLISHER_LOGGER.log('Checking Streams messages at ', new Date().toISOString(), '...');
/** Do not run scheduler if the previous run is not completed */

@@ -29,3 +30,3 @@ if (this.previousTaskCompleted) {

.catch((error) => {
console.error('Error while processing scheduled events:', error);
logger_1.PUBLISHER_LOGGER.error('Error while processing scheduled events:', error);
})

@@ -37,3 +38,3 @@ .then(() => {

else {
console.log('Skipping current scheduler run because previous run is in progress');
logger_1.PUBLISHER_LOGGER.log('Skipping current scheduler run because previous run is in progress');
}

@@ -45,3 +46,3 @@ });

const events = await this.redisPublisher.zrangebyscore('se', 0, currentTime);
console.log('Events to process:', events.length);
logger_1.PUBLISHER_LOGGER.log('Events to process:', events.length);
for (const eventString of events) {

@@ -60,3 +61,3 @@ const eventData = (0, utils_1.decodeScheduledMessage)(eventString);

const consumerGroups = await (0, utils_1.getAllConsumerGroups)(eventData.eventName, this.redisPublisher);
console.log('Scheduled Publishing to consumer groups: ', consumerGroups, 'with id ', eventData.eventId, '...');
logger_1.PUBLISHER_LOGGER.log('Scheduled Publishing to consumer groups: ', consumerGroups, 'with id ', eventData.eventId, '...');
let key = '*';

@@ -68,3 +69,3 @@ for (const consumerGroup of consumerGroups) {

.xadd(streamName, '*', 'data', JSON.stringify(eventData))
.catch((e) => console.error(`PUBLISHER: Publishing event ${eventData.eventName} to consumer groups: ${consumerGroups.join(', ')} failed with data ${JSON.stringify(eventData)}, ${e} `));
.catch((e) => logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Publishing event ${eventData.eventName} to consumer groups: ${consumerGroups.join(', ')} failed with data ${JSON.stringify(eventData)}, ${e} `));
if (key === '*')

@@ -71,0 +72,0 @@ key = generatedKey ?? key;

@@ -46,3 +46,3 @@ import { Observable } from 'rxjs';

*/
publish<TData = unknown, TName extends string = string>(data: PublishData<TData, TName>): Promise<void>;
publish<TData = unknown, TName extends string = string>(data: PublishData<TData, TName>, multicast?: boolean): Promise<void>;
/**

@@ -71,3 +71,3 @@ * Schedules an event to be published at a specified future time. Thee event gets published if the

*/
scheduledPublish<TData = unknown, TName extends string = string>(scheduledTime: Date, eventData: PublishData<TData, TName>, uniquePerInstance?: boolean, repeatInterval?: number): Promise<void>;
scheduledPublish<TData = unknown, TName extends string = string>(scheduledTime: Date, eventData: PublishData<TData, TName>, uniquePerInstance?: boolean, repeatInterval?: number, multicast?: boolean): Promise<void>;
/**

@@ -80,3 +80,3 @@ * Listens for events with the given name and returns an Observable that emits an EventData<T> object

*
* If an error occurs while subscribing, the method logs the error to the console and throws
* If an error occurs while subscribing, the method logs the error to the PUBLISHER_LOGGER and throws
* an error. This is done to prevent the service from continuing without a proper event subscription.

@@ -98,3 +98,3 @@ *

* orderCreated.subscribe((event) => {
* console.log('New order created:', event.data);
* PUBLISHER_LOGGER.log('New order created:', event.data);
* });

@@ -115,8 +115,8 @@ */

* async function shutdown(): Promise<void> {
* console.log('Graceful shutdown initiated.');
* PUBLISHER_LOGGER.log('Graceful shutdown initiated.');
* try {
* await streams.close();
* console.log('Resources and connections successfully closed.');
* PUBLISHER_LOGGER.log('Resources and connections successfully closed.');
* } catch (error) {
* console.error('Error during graceful shutdown:', error);
* PUBLISHER_LOGGER.error('Error during graceful shutdown:', error);
* }

@@ -123,0 +123,0 @@ * process.exit(0);

@@ -6,6 +6,7 @@ "use strict";

const rxjs_1 = require("rxjs");
const logger_1 = require("./logger");
const registry_1 = require("./registry");
const utils_1 = require("./utils");
function publisherErrorHandler(error) {
console.error('PUBLISHER UNHANDLED ERROR: ', JSON.stringify(error));
logger_1.PUBLISHER_LOGGER.error('PUBLISHER UNHANDLED ERROR: ', JSON.stringify(error));
}

@@ -42,3 +43,3 @@ class Streams {

this.consumerGroupName = `cg-${serviceName}`;
console.log(`PUBLISHER: Instance ID: ${this.instanceId}`);
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Instance ID: ${this.instanceId}`);
const cleanUpInterval = parseInt(process.env['CLEANUP_INTERVAL'] || `${1000 * 60 * 60}`, 10) ?? 1000 * 60 * 60;

@@ -50,3 +51,3 @@ this.cleanUpTimer = setInterval(() => {

async runClear(cleanUpInterval) {
console.log('PUBLISHER: Running Clearance', this.eventsListened);
logger_1.PUBLISHER_LOGGER.log('PUBLISHER: Running Clearance', this.eventsListened);
for (const eventName of this.eventsListened) {

@@ -56,7 +57,8 @@ process.nextTick(async () => {

await this.cleanupAcknowledgedMessages(eventName, cleanUpInterval).catch(publisherErrorHandler);
console.log(`Cleanup process for Acknowledged messages completed for ${eventName}`);
logger_1.PUBLISHER_LOGGER.log(`Cleanup process for Acknowledged messages completed for ${eventName}`);
});
}
}
async publish(data) {
async publish(data, multicast = false) {
const publishStartTime = process.hrtime();
if (data.eventId)

@@ -70,3 +72,3 @@ data.republishEvent = data.eventId;

if (consumerGroups.length > 0) {
console.log(`PUBLISHER: Publishing event ${data.eventName} to consumer groups: ${consumerGroups.join(', ')}`);
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Publishing event ${data.eventName} to consumer groups: ${consumerGroups.join(', ')}`);
try {

@@ -78,10 +80,10 @@ for (const consumerGroup of consumerGroups) {

.xadd(streamName, key, 'data', JSON.stringify(data))
.catch((e) => console.error(`PUBLISHER: Publishing event ${data.eventName} to consumer groups: ${consumerGroups.join(', ')} failed with data ${JSON.stringify(data)}, ${e} `));
.catch((e) => logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Publishing event ${data.eventName} to consumer groups: ${consumerGroups.join(', ')} failed with data ${JSON.stringify(data)}, ${e} `));
if (key === '*')
key = generatedKey ?? key;
}
await (0, utils_1.notifySubscribers)(this.redisPublisher, data.eventName, key);
await (0, utils_1.notifySubscribers)(this.redisPublisher, data.eventName, key, multicast);
}
catch (error) {
console.error(`PUBLISHER: Error while publishing event for service ${this.consumerGroupName} with instance ${this.instanceId}: `, error);
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Error while publishing event for service ${this.consumerGroupName} with instance ${this.instanceId}: `, error);
throw new Error('Publisher Error');

@@ -91,5 +93,8 @@ }

else
console.log(`PUBLISHER: Event publish failed for event ${data.eventName}, reason: no consumers ${consumerGroups}`);
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Event publish failed for event ${data.eventName}, reason: no consumers ${consumerGroups}`);
const publishEndTime = process.hrtime(publishStartTime);
const elapsedTime = publishEndTime[0] * 1000 + publishEndTime[1] / 1000000;
logger_1.PERFORMANCE_LOGGER.log(`PTIME;${key};${data.eventName};${Date.now()};${elapsedTime}`);
}
async scheduledPublish(scheduledTime, eventData, uniquePerInstance = false, repeatInterval = 0) {
async scheduledPublish(scheduledTime, eventData, uniquePerInstance = false, repeatInterval = 0, multicast = false) {
const currentTime = new Date();

@@ -104,3 +109,3 @@ delete eventData.repeatInterval;

else if (Math.abs(scheduledTime.getTime() - currentTime.getTime()) <= 500) {
await this.publish(eventData);
await this.publish(eventData, multicast);
}

@@ -112,3 +117,3 @@ else {

if (existingJob) {
console.log(`PUBLISHER: Job with data '${eventData}' already exists. Skipping.`);
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Job with data '${eventData}' already exists. Skipping.`);
return;

@@ -127,3 +132,3 @@ }

*
* If an error occurs while subscribing, the method logs the error to the console and throws
* If an error occurs while subscribing, the method logs the error to the PUBLISHER_LOGGER and throws
* an error. This is done to prevent the service from continuing without a proper event subscription.

@@ -145,3 +150,3 @@ *

* orderCreated.subscribe((event) => {
* console.log('New order created:', event.data);
* PUBLISHER_LOGGER.log('New order created:', event.data);
* });

@@ -154,7 +159,7 @@ */

const delay = initialDelay * Math.pow(2, retryAttempt);
console.error(`PUBLISHER: Error in listen: ${error.message}. Retrying in ${delay}ms (attempt ${retryAttempt + 1})`);
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Error in listen: ${error.message}. Retrying in ${delay}ms (attempt ${retryAttempt + 1})`);
return (0, rxjs_1.timer)(delay);
},
}), (0, rxjs_1.catchError)((error) => {
console.error(`PUBLISHER: Error in listen after ${maxRetries} retries: ${error.message}`);
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Error in listen after ${maxRetries} retries: ${error.message}`);
return (0, rxjs_1.throwError)(() => new Error(error.message));

@@ -171,6 +176,6 @@ }));

.then(() => {
console.log(`Group created created for ${JSON.stringify({ streamName, cgn: this.consumerGroupName })}`);
logger_1.PUBLISHER_LOGGER.log(`Group created created for ${JSON.stringify({ streamName, cgn: this.consumerGroupName })}`);
})
.catch((e) => {
console.error(`PUBLISHER: Group creation failed with error ${e.message} for ${JSON.stringify({ streamName, cgn: this.consumerGroupName })}`);
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Group creation failed with error ${e.message} for ${JSON.stringify({ streamName, cgn: this.consumerGroupName })}`);
});

@@ -181,3 +186,3 @@ const createConsumerStatus = (await this.redisGroups.xgroup('CREATECONSUMER', streamName, this.consumerGroupName, this.instanceId));

const addToFlushSet = await this.redisGroups.set(setKeyForK8sHandling, this.consumerGroupName);
console.log(`PUBLISHER: Consumer Registered and created with ${this.instanceId} under ${this.consumerGroupName} with ${createConsumerStatus} consumers and with the following status ${JSON.stringify({ addToCGSet, addToFlushSet })}`);
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Consumer Registered and created with ${this.instanceId} under ${this.consumerGroupName} with ${createConsumerStatus} consumers and with the following status ${JSON.stringify({ addToCGSet, addToFlushSet })}`);
return createConsumerStatus === 0 || createConsumerStatus === 1;

@@ -189,9 +194,9 @@ }

const streamName = `${eventName}:${this.consumerGroupName}`;
const processMessage = async (redisClient, messageId, processPending = false) => {
console.log(`PUBLISHER: Processing message ${messageId} for ${streamName}`);
const processMessage = async (redisClient, messageId, multicast = false, processPending = false) => {
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Processing message ${messageId} for ${streamName}`);
try {
try {
const pendingDetails = await redisClient.xpending(streamName, this.consumerGroupName, messageId, messageId, 1, this.instanceId);
if (pendingDetails[2] === 0) {
console.warn(`PUBLISHER: Message ${messageId} for ${streamName} already acknowledged.`);
if (pendingDetails[2] === 0 && multicast === false) {
logger_1.PUBLISHER_LOGGER.warn(`PUBLISHER: MACK ${messageId} for ${streamName}`);
return;

@@ -202,4 +207,4 @@ }

// Ignore the xpending error and continue
console.error('XPENDING ERROR: To be handled');
console.warn(JSON.stringify(e));
logger_1.PUBLISHER_LOGGER.error('XPENDING ERROR: To be handled');
logger_1.PUBLISHER_LOGGER.warn(JSON.stringify(e));
}

@@ -214,3 +219,3 @@ const messages = await redisClient.xrange(streamName, messageId, messageId);

else {
console.warn(`PUBLISHER: Message ${messageId} not found for ${streamName}`);
logger_1.PUBLISHER_LOGGER.warn(`PUBLISHER: Message ${messageId} not found for ${streamName}`);
}

@@ -221,7 +226,7 @@ /** Process Unprocessed Message if this is a main tree, otherwise limit to processing 100 messages that are unacknowledged */

if (unprocessedMessageIds.count > 25) {
console.error(`PUBLISHER: Too many unprocessed events for ${streamName}: count: ${unprocessedMessageIds.count}`);
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Too many unprocessed events for ${streamName}: count: ${unprocessedMessageIds.count}`);
}
for (const id of unprocessedMessageIds.messageIds) {
console.log(`PUBLISHER: Reporcessing unprocessed message with id: ${id}`);
await processMessage(redisClient, id, true);
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Reporcessing unprocessed message with id: ${id}`);
await processMessage(redisClient, id, multicast, true);
}

@@ -231,3 +236,3 @@ }

catch (e) {
console.error(`PUBLISHER: Error processing message ${messageId} for ${streamName}`, e);
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Error processing message ${messageId} for ${streamName}`, e);
}

@@ -242,11 +247,16 @@ };

eventStreamClient.subscribe(eventName).then(() => {
console.log(`PUBLISHER: Redis Subscription connection initiated for ${eventName}`);
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Redis Subscription connection initiated for ${eventName}`);
});
eventStreamClient.on('message', async (channel, messageId) => {
console.log(`PUBLISHER: Stream Notification Received for event ${eventName} with message ID ${messageId}`);
await processMessage(this.redisGroups, messageId);
eventStreamClient.on('message', async (channel, data) => {
const subscribeStartTime = process.hrtime();
const { messageId, multicast } = JSON.parse(data);
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Stream Notification Received for event ${eventName} with message ID ${messageId}`);
await processMessage(this.redisGroups, messageId, multicast);
const subscribendTime = process.hrtime(subscribeStartTime);
const elapsedTime = subscribendTime[0] * 1000 + subscribendTime[1] / 1000000;
logger_1.PERFORMANCE_LOGGER.log(`STIME;${messageId};${data.eventName};${Date.now()};${elapsedTime}`);
});
})
.catch((e) => {
console.error(`PUBLISHER: Error during consumer registration for ${eventName}`, e);
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Error during consumer registration for ${eventName}`, e);
});

@@ -265,8 +275,8 @@ return observable;

* async function shutdown(): Promise<void> {
* console.log('Graceful shutdown initiated.');
* PUBLISHER_LOGGER.log('Graceful shutdown initiated.');
* try {
* await streams.close();
* console.log('Resources and connections successfully closed.');
* PUBLISHER_LOGGER.log('Resources and connections successfully closed.');
* } catch (error) {
* console.error('Error during graceful shutdown:', error);
* PUBLISHER_LOGGER.error('Error during graceful shutdown:', error);
* }

@@ -273,0 +283,0 @@ * process.exit(0);

@@ -13,3 +13,3 @@ import { RedisType } from './registry';

}>;
export declare function notifySubscribers(redisClient: RedisType, eventName: string, messageId: string): Promise<void>;
export declare function notifySubscribers(redisClient: RedisType, eventName: string, messageId: string, multicast?: boolean): Promise<void>;
export declare function removedScheduledJob(redisClient: RedisType, eventString: string): Promise<void>;

@@ -16,0 +16,0 @@ export declare function encodeScheduledMessage<TData, TName extends string>(data: EventData<TData, TName>): string;

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.UTILS = exports.decodeScheduledMessage = exports.encodeScheduledMessage = exports.removedScheduledJob = exports.notifySubscribers = exports.getMessageStatesCount = exports.getUnacknowledgedMessages = exports.getAllConsumerGroups = void 0;
const logger_1 = require("./logger");
async function getAllConsumerGroups(eventName, redisConnection) {

@@ -28,3 +29,3 @@ const consumerGroups = await redisConnection.smembers(`${eventName}`);

catch (error) {
console.error(`PUBLISHER: Error fetching unacknowledged messages for ${streamName}`, error);
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Error fetching unacknowledged messages for ${streamName}`, error);
return { count: 0, messageIds: [] };

@@ -44,3 +45,3 @@ }

catch (error) {
console.error(`PUBLISHER: Error fetching message states count for ${streamName}`, error);
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Error fetching message states count for ${streamName}`, error);
return { acknowledged: 0, unacknowledged: 0 };

@@ -50,4 +51,4 @@ }

exports.getMessageStatesCount = getMessageStatesCount;
async function notifySubscribers(redisClient, eventName, messageId) {
await redisClient.publish(eventName, messageId);
async function notifySubscribers(redisClient, eventName, messageId, multicast = false) {
await redisClient.publish(eventName, JSON.stringify({ messageId, multicast }));
}

@@ -58,6 +59,6 @@ exports.notifySubscribers = notifySubscribers;

const events = await redisClient.zrangebyscore('se', 0, currentTime);
console.log(`Total Events in scheduled queue: ${events.length}`);
logger_1.PUBLISHER_LOGGER.log(`Total Events in scheduled queue: ${events.length}`);
await redisClient.zrem('se', eventString);
const eventsLater = await redisClient.zrangebyscore('se', 0, currentTime);
console.log(`Total Events in scheduled queue: ${eventsLater.length}`);
logger_1.PUBLISHER_LOGGER.log(`Total Events in scheduled queue: ${eventsLater.length}`);
}

@@ -64,0 +65,0 @@ exports.removedScheduledJob = removedScheduledJob;