@jetit/publisher
Advanced tools
Comparing version
{ | ||
"name": "@jetit/publisher", | ||
"version": "3.3.3", | ||
"version": "3.3.5", | ||
"type": "commonjs", | ||
"dependencies": { | ||
"@jetit/id": "0.0.11", | ||
"@jetit/id": "^0.0.12", | ||
"ioredis": "^5.3.0", | ||
"rxjs": "^7.8.0" | ||
}, | ||
"peerDependencies": { | ||
"rxjs": "^7.8.0", | ||
"tslib": "1.14.1" | ||
@@ -12,0 +10,0 @@ }, |
@@ -201,2 +201,4 @@ # publisher | ||
8. Message broadcasting: The publisher can be used to broadcast messages to multiple consumers or subscribers, allowing for efficient and scalable communication in applications with many components or services. | ||
8. Message broadcasting: The publisher can be used to broadcast messages to multiple consumers or subscribers, allowing for efficient and scalable communication in applications with many components or services. | ||
9. Multicast Publishing: This is the existing PUB/SUB implementation but with the event data being stored into streams for additional processing |
@@ -5,2 +5,3 @@ "use strict"; | ||
const ioredis_1 = require("ioredis"); | ||
const logger_1 = require("./logger"); | ||
class RedisRegistry { | ||
@@ -29,4 +30,5 @@ static attemptConnection(connectionKey, storeRef = 0) { | ||
connection.on('error', (error) => { | ||
console.error(`PUBLISHER: Redis connection error : ${error.message}`); | ||
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Redis connection error : ${error.message}`); | ||
connection.removeAllListeners(); | ||
connection.disconnect(); | ||
RedisRegistry.attemptConnection(connectionKey, storeRef); | ||
@@ -47,3 +49,3 @@ }); | ||
connection.disconnect(true); | ||
console.error('PUBLISHER: failed to ping redis, disconnecting and restarting service.'); | ||
logger_1.PUBLISHER_LOGGER.error('PUBLISHER: failed to ping redis, disconnecting and restarting service.'); | ||
process.exit(0); | ||
@@ -50,0 +52,0 @@ } |
@@ -6,2 +6,3 @@ "use strict"; | ||
const rxjs_1 = require("rxjs"); | ||
const logger_1 = require("./logger"); | ||
const registry_1 = require("./registry"); | ||
@@ -22,3 +23,3 @@ const utils_1 = require("./utils"); | ||
this.scheduledMessagesTimer = (0, rxjs_1.interval)(duration).subscribe(() => { | ||
console.log('Checking Streams messages at ', new Date().toISOString(), '...'); | ||
logger_1.PUBLISHER_LOGGER.log('Checking Streams messages at ', new Date().toISOString(), '...'); | ||
/** Do not run scheduler if the previous run is not completed */ | ||
@@ -29,3 +30,3 @@ if (this.previousTaskCompleted) { | ||
.catch((error) => { | ||
console.error('Error while processing scheduled events:', error); | ||
logger_1.PUBLISHER_LOGGER.error('Error while processing scheduled events:', error); | ||
}) | ||
@@ -37,3 +38,3 @@ .then(() => { | ||
else { | ||
console.log('Skipping current scheduler run because previous run is in progress'); | ||
logger_1.PUBLISHER_LOGGER.log('Skipping current scheduler run because previous run is in progress'); | ||
} | ||
@@ -45,3 +46,3 @@ }); | ||
const events = await this.redisPublisher.zrangebyscore('se', 0, currentTime); | ||
console.log('Events to process:', events.length); | ||
logger_1.PUBLISHER_LOGGER.log('Events to process:', events.length); | ||
for (const eventString of events) { | ||
@@ -60,3 +61,3 @@ const eventData = (0, utils_1.decodeScheduledMessage)(eventString); | ||
const consumerGroups = await (0, utils_1.getAllConsumerGroups)(eventData.eventName, this.redisPublisher); | ||
console.log('Scheduled Publishing to consumer groups: ', consumerGroups, 'with id ', eventData.eventId, '...'); | ||
logger_1.PUBLISHER_LOGGER.log('Scheduled Publishing to consumer groups: ', consumerGroups, 'with id ', eventData.eventId, '...'); | ||
let key = '*'; | ||
@@ -68,3 +69,3 @@ for (const consumerGroup of consumerGroups) { | ||
.xadd(streamName, '*', 'data', JSON.stringify(eventData)) | ||
.catch((e) => console.error(`PUBLISHER: Publishing event ${eventData.eventName} to consumer groups: ${consumerGroups.join(', ')} failed with data ${JSON.stringify(eventData)}, ${e} `)); | ||
.catch((e) => logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Publishing event ${eventData.eventName} to consumer groups: ${consumerGroups.join(', ')} failed with data ${JSON.stringify(eventData)}, ${e} `)); | ||
if (key === '*') | ||
@@ -71,0 +72,0 @@ key = generatedKey ?? key; |
@@ -46,3 +46,3 @@ import { Observable } from 'rxjs'; | ||
*/ | ||
publish<TData = unknown, TName extends string = string>(data: PublishData<TData, TName>): Promise<void>; | ||
publish<TData = unknown, TName extends string = string>(data: PublishData<TData, TName>, multicast?: boolean): Promise<void>; | ||
/** | ||
@@ -71,3 +71,3 @@ * Schedules an event to be published at a specified future time. Thee event gets published if the | ||
*/ | ||
scheduledPublish<TData = unknown, TName extends string = string>(scheduledTime: Date, eventData: PublishData<TData, TName>, uniquePerInstance?: boolean, repeatInterval?: number): Promise<void>; | ||
scheduledPublish<TData = unknown, TName extends string = string>(scheduledTime: Date, eventData: PublishData<TData, TName>, uniquePerInstance?: boolean, repeatInterval?: number, multicast?: boolean): Promise<void>; | ||
/** | ||
@@ -80,3 +80,3 @@ * Listens for events with the given name and returns an Observable that emits an EventData<T> object | ||
* | ||
* If an error occurs while subscribing, the method logs the error to the console and throws | ||
* If an error occurs while subscribing, the method logs the error to the PUBLISHER_LOGGER and throws | ||
* an error. This is done to prevent the service from continuing without a proper event subscription. | ||
@@ -98,3 +98,3 @@ * | ||
* orderCreated.subscribe((event) => { | ||
* console.log('New order created:', event.data); | ||
* PUBLISHER_LOGGER.log('New order created:', event.data); | ||
* }); | ||
@@ -115,8 +115,8 @@ */ | ||
* async function shutdown(): Promise<void> { | ||
* console.log('Graceful shutdown initiated.'); | ||
* PUBLISHER_LOGGER.log('Graceful shutdown initiated.'); | ||
* try { | ||
* await streams.close(); | ||
* console.log('Resources and connections successfully closed.'); | ||
* PUBLISHER_LOGGER.log('Resources and connections successfully closed.'); | ||
* } catch (error) { | ||
* console.error('Error during graceful shutdown:', error); | ||
* PUBLISHER_LOGGER.error('Error during graceful shutdown:', error); | ||
* } | ||
@@ -123,0 +123,0 @@ * process.exit(0); |
@@ -6,6 +6,7 @@ "use strict"; | ||
const rxjs_1 = require("rxjs"); | ||
const logger_1 = require("./logger"); | ||
const registry_1 = require("./registry"); | ||
const utils_1 = require("./utils"); | ||
function publisherErrorHandler(error) { | ||
console.error('PUBLISHER UNHANDLED ERROR: ', JSON.stringify(error)); | ||
logger_1.PUBLISHER_LOGGER.error('PUBLISHER UNHANDLED ERROR: ', JSON.stringify(error)); | ||
} | ||
@@ -42,3 +43,3 @@ class Streams { | ||
this.consumerGroupName = `cg-${serviceName}`; | ||
console.log(`PUBLISHER: Instance ID: ${this.instanceId}`); | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Instance ID: ${this.instanceId}`); | ||
const cleanUpInterval = parseInt(process.env['CLEANUP_INTERVAL'] || `${1000 * 60 * 60}`, 10) ?? 1000 * 60 * 60; | ||
@@ -50,3 +51,3 @@ this.cleanUpTimer = setInterval(() => { | ||
async runClear(cleanUpInterval) { | ||
console.log('PUBLISHER: Running Clearance', this.eventsListened); | ||
logger_1.PUBLISHER_LOGGER.log('PUBLISHER: Running Clearance', this.eventsListened); | ||
for (const eventName of this.eventsListened) { | ||
@@ -56,7 +57,8 @@ process.nextTick(async () => { | ||
await this.cleanupAcknowledgedMessages(eventName, cleanUpInterval).catch(publisherErrorHandler); | ||
console.log(`Cleanup process for Acknowledged messages completed for ${eventName}`); | ||
logger_1.PUBLISHER_LOGGER.log(`Cleanup process for Acknowledged messages completed for ${eventName}`); | ||
}); | ||
} | ||
} | ||
async publish(data) { | ||
async publish(data, multicast = false) { | ||
const publishStartTime = process.hrtime(); | ||
if (data.eventId) | ||
@@ -70,3 +72,3 @@ data.republishEvent = data.eventId; | ||
if (consumerGroups.length > 0) { | ||
console.log(`PUBLISHER: Publishing event ${data.eventName} to consumer groups: ${consumerGroups.join(', ')}`); | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Publishing event ${data.eventName} to consumer groups: ${consumerGroups.join(', ')}`); | ||
try { | ||
@@ -78,10 +80,10 @@ for (const consumerGroup of consumerGroups) { | ||
.xadd(streamName, key, 'data', JSON.stringify(data)) | ||
.catch((e) => console.error(`PUBLISHER: Publishing event ${data.eventName} to consumer groups: ${consumerGroups.join(', ')} failed with data ${JSON.stringify(data)}, ${e} `)); | ||
.catch((e) => logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Publishing event ${data.eventName} to consumer groups: ${consumerGroups.join(', ')} failed with data ${JSON.stringify(data)}, ${e} `)); | ||
if (key === '*') | ||
key = generatedKey ?? key; | ||
} | ||
await (0, utils_1.notifySubscribers)(this.redisPublisher, data.eventName, key); | ||
await (0, utils_1.notifySubscribers)(this.redisPublisher, data.eventName, key, multicast); | ||
} | ||
catch (error) { | ||
console.error(`PUBLISHER: Error while publishing event for service ${this.consumerGroupName} with instance ${this.instanceId}: `, error); | ||
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Error while publishing event for service ${this.consumerGroupName} with instance ${this.instanceId}: `, error); | ||
throw new Error('Publisher Error'); | ||
@@ -91,5 +93,8 @@ } | ||
else | ||
console.log(`PUBLISHER: Event publish failed for event ${data.eventName}, reason: no consumers ${consumerGroups}`); | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Event publish failed for event ${data.eventName}, reason: no consumers ${consumerGroups}`); | ||
const publishEndTime = process.hrtime(publishStartTime); | ||
const elapsedTime = publishEndTime[0] * 1000 + publishEndTime[1] / 1000000; | ||
logger_1.PERFORMANCE_LOGGER.log(`PTIME;${key};${data.eventName};${Date.now()};${elapsedTime}`); | ||
} | ||
async scheduledPublish(scheduledTime, eventData, uniquePerInstance = false, repeatInterval = 0) { | ||
async scheduledPublish(scheduledTime, eventData, uniquePerInstance = false, repeatInterval = 0, multicast = false) { | ||
const currentTime = new Date(); | ||
@@ -104,3 +109,3 @@ delete eventData.repeatInterval; | ||
else if (Math.abs(scheduledTime.getTime() - currentTime.getTime()) <= 500) { | ||
await this.publish(eventData); | ||
await this.publish(eventData, multicast); | ||
} | ||
@@ -112,3 +117,3 @@ else { | ||
if (existingJob) { | ||
console.log(`PUBLISHER: Job with data '${eventData}' already exists. Skipping.`); | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Job with data '${eventData}' already exists. Skipping.`); | ||
return; | ||
@@ -127,3 +132,3 @@ } | ||
* | ||
* If an error occurs while subscribing, the method logs the error to the console and throws | ||
* If an error occurs while subscribing, the method logs the error to the PUBLISHER_LOGGER and throws | ||
* an error. This is done to prevent the service from continuing without a proper event subscription. | ||
@@ -145,3 +150,3 @@ * | ||
* orderCreated.subscribe((event) => { | ||
* console.log('New order created:', event.data); | ||
* PUBLISHER_LOGGER.log('New order created:', event.data); | ||
* }); | ||
@@ -154,7 +159,7 @@ */ | ||
const delay = initialDelay * Math.pow(2, retryAttempt); | ||
console.error(`PUBLISHER: Error in listen: ${error.message}. Retrying in ${delay}ms (attempt ${retryAttempt + 1})`); | ||
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Error in listen: ${error.message}. Retrying in ${delay}ms (attempt ${retryAttempt + 1})`); | ||
return (0, rxjs_1.timer)(delay); | ||
}, | ||
}), (0, rxjs_1.catchError)((error) => { | ||
console.error(`PUBLISHER: Error in listen after ${maxRetries} retries: ${error.message}`); | ||
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Error in listen after ${maxRetries} retries: ${error.message}`); | ||
return (0, rxjs_1.throwError)(() => new Error(error.message)); | ||
@@ -171,6 +176,6 @@ })); | ||
.then(() => { | ||
console.log(`Group created created for ${JSON.stringify({ streamName, cgn: this.consumerGroupName })}`); | ||
logger_1.PUBLISHER_LOGGER.log(`Group created created for ${JSON.stringify({ streamName, cgn: this.consumerGroupName })}`); | ||
}) | ||
.catch((e) => { | ||
console.error(`PUBLISHER: Group creation failed with error ${e.message} for ${JSON.stringify({ streamName, cgn: this.consumerGroupName })}`); | ||
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Group creation failed with error ${e.message} for ${JSON.stringify({ streamName, cgn: this.consumerGroupName })}`); | ||
}); | ||
@@ -181,3 +186,3 @@ const createConsumerStatus = (await this.redisGroups.xgroup('CREATECONSUMER', streamName, this.consumerGroupName, this.instanceId)); | ||
const addToFlushSet = await this.redisGroups.set(setKeyForK8sHandling, this.consumerGroupName); | ||
console.log(`PUBLISHER: Consumer Registered and created with ${this.instanceId} under ${this.consumerGroupName} with ${createConsumerStatus} consumers and with the following status ${JSON.stringify({ addToCGSet, addToFlushSet })}`); | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Consumer Registered and created with ${this.instanceId} under ${this.consumerGroupName} with ${createConsumerStatus} consumers and with the following status ${JSON.stringify({ addToCGSet, addToFlushSet })}`); | ||
return createConsumerStatus === 0 || createConsumerStatus === 1; | ||
@@ -189,9 +194,9 @@ } | ||
const streamName = `${eventName}:${this.consumerGroupName}`; | ||
const processMessage = async (redisClient, messageId, processPending = false) => { | ||
console.log(`PUBLISHER: Processing message ${messageId} for ${streamName}`); | ||
const processMessage = async (redisClient, messageId, multicast = false, processPending = false) => { | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Processing message ${messageId} for ${streamName}`); | ||
try { | ||
try { | ||
const pendingDetails = await redisClient.xpending(streamName, this.consumerGroupName, messageId, messageId, 1, this.instanceId); | ||
if (pendingDetails[2] === 0) { | ||
console.warn(`PUBLISHER: Message ${messageId} for ${streamName} already acknowledged.`); | ||
if (pendingDetails[2] === 0 && multicast === false) { | ||
logger_1.PUBLISHER_LOGGER.warn(`PUBLISHER: MACK ${messageId} for ${streamName}`); | ||
return; | ||
@@ -202,4 +207,4 @@ } | ||
// Ignore the xpending error and continue | ||
console.error('XPENDING ERROR: To be handled'); | ||
console.warn(JSON.stringify(e)); | ||
logger_1.PUBLISHER_LOGGER.error('XPENDING ERROR: To be handled'); | ||
logger_1.PUBLISHER_LOGGER.warn(JSON.stringify(e)); | ||
} | ||
@@ -214,3 +219,3 @@ const messages = await redisClient.xrange(streamName, messageId, messageId); | ||
else { | ||
console.warn(`PUBLISHER: Message ${messageId} not found for ${streamName}`); | ||
logger_1.PUBLISHER_LOGGER.warn(`PUBLISHER: Message ${messageId} not found for ${streamName}`); | ||
} | ||
@@ -221,7 +226,7 @@ /** Process Unprocessed Message if this is a main tree, otherwise limit to processing 100 messages that are unacknowledged */ | ||
if (unprocessedMessageIds.count > 25) { | ||
console.error(`PUBLISHER: Too many unprocessed events for ${streamName}: count: ${unprocessedMessageIds.count}`); | ||
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Too many unprocessed events for ${streamName}: count: ${unprocessedMessageIds.count}`); | ||
} | ||
for (const id of unprocessedMessageIds.messageIds) { | ||
console.log(`PUBLISHER: Reporcessing unprocessed message with id: ${id}`); | ||
await processMessage(redisClient, id, true); | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Reporcessing unprocessed message with id: ${id}`); | ||
await processMessage(redisClient, id, multicast, true); | ||
} | ||
@@ -231,3 +236,3 @@ } | ||
catch (e) { | ||
console.error(`PUBLISHER: Error processing message ${messageId} for ${streamName}`, e); | ||
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Error processing message ${messageId} for ${streamName}`, e); | ||
} | ||
@@ -242,11 +247,16 @@ }; | ||
eventStreamClient.subscribe(eventName).then(() => { | ||
console.log(`PUBLISHER: Redis Subscription connection initiated for ${eventName}`); | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Redis Subscription connection initiated for ${eventName}`); | ||
}); | ||
eventStreamClient.on('message', async (channel, messageId) => { | ||
console.log(`PUBLISHER: Stream Notification Received for event ${eventName} with message ID ${messageId}`); | ||
await processMessage(this.redisGroups, messageId); | ||
eventStreamClient.on('message', async (channel, data) => { | ||
const subscribeStartTime = process.hrtime(); | ||
const { messageId, multicast } = JSON.parse(data); | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Stream Notification Received for event ${eventName} with message ID ${messageId}`); | ||
await processMessage(this.redisGroups, messageId, multicast); | ||
const subscribendTime = process.hrtime(subscribeStartTime); | ||
const elapsedTime = subscribendTime[0] * 1000 + subscribendTime[1] / 1000000; | ||
logger_1.PERFORMANCE_LOGGER.log(`STIME;${messageId};${data.eventName};${Date.now()};${elapsedTime}`); | ||
}); | ||
}) | ||
.catch((e) => { | ||
console.error(`PUBLISHER: Error during consumer registration for ${eventName}`, e); | ||
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Error during consumer registration for ${eventName}`, e); | ||
}); | ||
@@ -265,8 +275,8 @@ return observable; | ||
* async function shutdown(): Promise<void> { | ||
* console.log('Graceful shutdown initiated.'); | ||
* PUBLISHER_LOGGER.log('Graceful shutdown initiated.'); | ||
* try { | ||
* await streams.close(); | ||
* console.log('Resources and connections successfully closed.'); | ||
* PUBLISHER_LOGGER.log('Resources and connections successfully closed.'); | ||
* } catch (error) { | ||
* console.error('Error during graceful shutdown:', error); | ||
* PUBLISHER_LOGGER.error('Error during graceful shutdown:', error); | ||
* } | ||
@@ -273,0 +283,0 @@ * process.exit(0); |
@@ -13,3 +13,3 @@ import { RedisType } from './registry'; | ||
}>; | ||
export declare function notifySubscribers(redisClient: RedisType, eventName: string, messageId: string): Promise<void>; | ||
export declare function notifySubscribers(redisClient: RedisType, eventName: string, messageId: string, multicast?: boolean): Promise<void>; | ||
export declare function removedScheduledJob(redisClient: RedisType, eventString: string): Promise<void>; | ||
@@ -16,0 +16,0 @@ export declare function encodeScheduledMessage<TData, TName extends string>(data: EventData<TData, TName>): string; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.UTILS = exports.decodeScheduledMessage = exports.encodeScheduledMessage = exports.removedScheduledJob = exports.notifySubscribers = exports.getMessageStatesCount = exports.getUnacknowledgedMessages = exports.getAllConsumerGroups = void 0; | ||
const logger_1 = require("./logger"); | ||
async function getAllConsumerGroups(eventName, redisConnection) { | ||
@@ -28,3 +29,3 @@ const consumerGroups = await redisConnection.smembers(`${eventName}`); | ||
catch (error) { | ||
console.error(`PUBLISHER: Error fetching unacknowledged messages for ${streamName}`, error); | ||
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Error fetching unacknowledged messages for ${streamName}`, error); | ||
return { count: 0, messageIds: [] }; | ||
@@ -44,3 +45,3 @@ } | ||
catch (error) { | ||
console.error(`PUBLISHER: Error fetching message states count for ${streamName}`, error); | ||
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Error fetching message states count for ${streamName}`, error); | ||
return { acknowledged: 0, unacknowledged: 0 }; | ||
@@ -50,4 +51,4 @@ } | ||
exports.getMessageStatesCount = getMessageStatesCount; | ||
async function notifySubscribers(redisClient, eventName, messageId) { | ||
await redisClient.publish(eventName, messageId); | ||
async function notifySubscribers(redisClient, eventName, messageId, multicast = false) { | ||
await redisClient.publish(eventName, JSON.stringify({ messageId, multicast })); | ||
} | ||
@@ -58,6 +59,6 @@ exports.notifySubscribers = notifySubscribers; | ||
const events = await redisClient.zrangebyscore('se', 0, currentTime); | ||
console.log(`Total Events in scheduled queue: ${events.length}`); | ||
logger_1.PUBLISHER_LOGGER.log(`Total Events in scheduled queue: ${events.length}`); | ||
await redisClient.zrem('se', eventString); | ||
const eventsLater = await redisClient.zrangebyscore('se', 0, currentTime); | ||
console.log(`Total Events in scheduled queue: ${eventsLater.length}`); | ||
logger_1.PUBLISHER_LOGGER.log(`Total Events in scheduled queue: ${eventsLater.length}`); | ||
} | ||
@@ -64,0 +65,0 @@ exports.removedScheduledJob = removedScheduledJob; |
47553
6.39%18
12.5%819
5.81%203
1%+ Added
+ Added
+ Added
- Removed
- Removed
- Removed
Updated