@jetit/publisher
Advanced tools
Comparing version
{ | ||
"name": "@jetit/publisher", | ||
"version": "5.3.2", | ||
"version": "5.4.0", | ||
"type": "commonjs", | ||
"dependencies": { | ||
"@jetit/id": "^0.0.12", | ||
"@jetit/id": "^0.0.13", | ||
"ioredis": "^5.3.0", | ||
@@ -8,0 +8,0 @@ "rxjs": "^7.8.0", |
@@ -203,2 +203,6 @@ import { Observable } from 'rxjs'; | ||
processStoredEvents(): Promise<void>; | ||
/** | ||
* Acknowledges a message and updates the last acknowledged message ID. | ||
* This is used to track cleanup progress and ensure we don't delete unprocessed messages. | ||
*/ | ||
acknowledgeMessage(ackKey: string): Promise<void>; | ||
@@ -205,0 +209,0 @@ private frameMessageKey; |
@@ -296,7 +296,11 @@ "use strict"; | ||
listenInternals(eventName, subscriptionId, eventFilter, filterKeepAlive = 24 * 60 * 60 * 1000, publishOnceGuarantee = false, externalAcknowledgement = false) { | ||
if (!this.subscriptions.has(eventName)) { | ||
this.subscriptions.set(eventName, new Map()); | ||
// Get or create subscription map for this event | ||
const eventSubscriptions = this.subscriptions.get(eventName) || new Map(); | ||
const isNewSubscription = !this.subscriptions.has(eventName); | ||
if (isNewSubscription) { | ||
this.subscriptions.set(eventName, eventSubscriptions); | ||
} | ||
const bs = new rxjs_1.BehaviorSubject(null); | ||
const subscription = { | ||
// Making the subscription Immutable | ||
const subscription = Object.freeze({ | ||
subject: bs, | ||
@@ -306,14 +310,34 @@ filter: eventFilter, | ||
keepAlive: filterKeepAlive, | ||
}; | ||
this.subscriptions.get(eventName).set(subscriptionId, subscription); | ||
const timer = (0, rxjs_1.interval)(10000).subscribe(async () => { | ||
/** Clear earlier unprocessed messages. Runs every 10 seconds */ | ||
await processMessage(this.redisGroups, '0', new tracker_1.MetricsTracker(), false); | ||
}); | ||
eventSubscriptions.set(subscriptionId, subscription); | ||
// Return early if not first subscription | ||
if (!isNewSubscription) { | ||
return bs.asObservable().pipe((0, rxjs_1.skip)(1)); | ||
} | ||
const cleanupInterval = 10000; // 10 seconds | ||
const timer = (0, rxjs_1.interval)(cleanupInterval).subscribe({ | ||
next: async () => { | ||
try { | ||
await processMessage(this.redisGroups, '0', new tracker_1.MetricsTracker(), false); | ||
} | ||
catch (error) { | ||
logger_1.PUBLISHER_LOGGER.error('Error in running recurring cleanup task:', error); | ||
} | ||
}, | ||
error: (error) => { | ||
logger_1.PUBLISHER_LOGGER.error('Fatal error in cleanup timer:', error); | ||
}, | ||
}); | ||
// Create observable with proper cleanup | ||
const observable = bs.asObservable().pipe((0, rxjs_1.skip)(1), (0, rxjs_1.finalize)(() => { | ||
/** Cleanup timer */ | ||
timer.unsubscribe(); | ||
// Clean up subscription on completion | ||
this.removeSubscription(eventName, subscriptionId); | ||
})); | ||
const streamName = `${eventName}:${this.consumerGroupName}`; | ||
const processMessage = async (redisClient, messageId, tracker, multicast = false, processPending = false) => { | ||
// Skip processing if subscription was removed. This is needed because the processing is independent of the subscription | ||
if (!this.subscriptions.has(eventName)) { | ||
return; | ||
} | ||
const streamName = `${eventName}:${this.consumerGroupName}`; | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Processing message ${messageId} for ${streamName}`); | ||
@@ -339,31 +363,46 @@ try { | ||
let eventData; | ||
/** | ||
* Both multicast messages and pending messages cannot be read by xreadgroup | ||
* Multicast messages should not be claimed by a single consumer. And pending messages | ||
* are usually behind in the stream so XREADGROUP will not read them and hence | ||
* they need to be read using XRANGE. | ||
*/ | ||
tracker.startRedisOperation(); | ||
if (multicast || processPending) { | ||
const messages = await redisClient.xrange(streamName, messageId, messageId); | ||
if (messages && messages.length) { | ||
try { | ||
eventData = JSON.parse(messages[0][1][1]); | ||
try { | ||
/** | ||
* Both multicast messages and pending messages cannot be read by xreadgroup | ||
* Multicast messages should not be claimed by a single consumer. And pending messages | ||
* are usually behind in the stream so XREADGROUP will not read them and hence | ||
* they need to be read using XRANGE. | ||
*/ | ||
if (multicast || processPending) { | ||
const messages = await redisClient.xrange(streamName, messageId, messageId); | ||
if (messages?.length) { | ||
try { | ||
eventData = JSON.parse(messages[0][1][1]); | ||
} | ||
catch (error) { | ||
logger_1.PUBLISHER_LOGGER.error(`JSON parsing failed for message: ${messages[0][1][1]}`); | ||
return; | ||
} | ||
} | ||
catch (e) { | ||
console.error(`JSON parsing failed for the following message ${messages[0][1][1]} in the publisher.`); | ||
} | ||
} | ||
} | ||
else { | ||
const messages = (await redisClient.xreadgroup('GROUP', this.consumerGroupName, this.instanceId, 'COUNT', 1, 'STREAMS', streamName, '>')); | ||
if (messages && messages.length) { | ||
if (messageId === '0') { | ||
messageId = messages[0][1][0][0]; | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Reprocessing unprocessed message with id: ${messageId}`); | ||
else { | ||
const messages = (await redisClient.xreadgroup('GROUP', this.consumerGroupName, this.instanceId, 'COUNT', 1, 'STREAMS', streamName, '>')); | ||
if (messages?.length) { | ||
if (messageId === '0') { | ||
messageId = messages[0][1][0][0]; | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Reprocessing unprocessed message with id: ${messageId}`); | ||
} | ||
try { | ||
eventData = JSON.parse(messages[0][1][0][1][1]); | ||
} | ||
catch (error) { | ||
logger_1.PUBLISHER_LOGGER.error(`JSON parsing failed for message: ${messages[0][1][0][1][1]}`); | ||
return; | ||
} | ||
} | ||
eventData = JSON.parse(messages[0][1][0][1][1]); | ||
} | ||
} | ||
tracker.endRedisOperation(); | ||
catch (error) { | ||
logger_1.PUBLISHER_LOGGER.error('Error retrieving or parsing event data:', error); | ||
return; | ||
} | ||
finally { | ||
tracker.endRedisOperation(); | ||
} | ||
tracker.startProcessing(); | ||
@@ -383,29 +422,38 @@ if (eventData) { | ||
if (subscriptions) { | ||
const currentTime = Date.now(); | ||
const subscriptionEntries = Array.from(subscriptions.entries()); | ||
for (let i = 0; i < subscriptionEntries.length; i++) { | ||
const [subId, sub] = subscriptionEntries[i]; | ||
if (!sub.filter || sub.filter(eventData)) { | ||
sub.subject.next({ ...eventData, ackKey }); | ||
sub.lastMatchTime = Date.now(); | ||
// Process subscriptions in parallel for better performance | ||
await Promise.all(subscriptionEntries.map(async ([subId, sub]) => { | ||
try { | ||
if (!sub.filter || sub.filter(eventData)) { | ||
sub.subject.next({ ...eventData, ackKey }); | ||
sub.lastMatchTime = currentTime; | ||
} | ||
else if (currentTime - sub.lastMatchTime > sub.keepAlive) { | ||
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: No matching events for ${eventName} (Subscription ${subId}) in the last ${sub.keepAlive / 1000 / 60 / 60} hours`); | ||
sub.lastMatchTime = currentTime; | ||
} | ||
} | ||
else if (Date.now() - sub.lastMatchTime > sub.keepAlive) { | ||
/** | ||
* Reset the lastMatch time every day by default. For now only | ||
* log the data. Should add functionality to remove the filter | ||
* if its not used at all to gain minor improvements in | ||
* performace | ||
*/ | ||
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: No matching events for ${eventName} (Subscription ${subId}) in the last ${sub.keepAlive / 1000 / 60 / 60} hours`); | ||
sub.lastMatchTime = Date.now(); | ||
catch (error) { | ||
// Log error but don't fail entire processing | ||
logger_1.PUBLISHER_LOGGER.error(`Error processing subscription ${subId}:`, error); | ||
} | ||
} | ||
})); | ||
} | ||
if (!externalAcknowledgement) | ||
// Acknowledge message if needed | ||
if (!externalAcknowledgement) { | ||
await this.acknowledgeMessage(ackKey); | ||
} | ||
// Update metrics | ||
const currentTime = Date.now(); | ||
tracker.incrementMessageRate('subscribe', eventData.eventName); | ||
const processingTime = currentTime - eventData.createdAt; | ||
tracker.addProcessingTime(processingTime); | ||
tracker.setConsumerLag(this.consumerGroupName, currentTime - eventData.createdAt); | ||
} | ||
catch (processingError) { | ||
logger_1.PUBLISHER_LOGGER.error(`Processing error for message ${messageId}:`, processingError); | ||
catch (error) { | ||
logger_1.PUBLISHER_LOGGER.error(`Processing error for message ${messageId}:`, error); | ||
const dlqEvent = { | ||
...eventData, | ||
failureReason: processingError.message, | ||
failureReason: error.message, | ||
retryCount: (eventData.retryCount || 0) + 1, | ||
@@ -417,8 +465,4 @@ originalStream: streamName, | ||
await this.dlq.addToDLQ(dlqEvent); | ||
// Don't rethrow to prevent message loss | ||
} | ||
tracker.incrementMessageRate('subscribe', eventData.eventName); | ||
const processingTime = Date.now() - eventData.createdAt; | ||
tracker.addProcessingTime(processingTime); | ||
const lag = Date.now() - eventData.createdAt; | ||
tracker.setConsumerLag(this.consumerGroupName, lag); | ||
} | ||
@@ -429,3 +473,3 @@ else { | ||
tracker.endProcessing(); | ||
/** Process Unprocessed Message if this is a main tree, otherwise limit to processing 100 messages that are unacknowledged */ | ||
/** Process Unprocessed Messages with rate limiting */ | ||
if (!processPending) { | ||
@@ -437,6 +481,8 @@ const unprocessedMessageIds = await (0, utils_1.getUnacknowledgedMessages)(redisClient, this.consumerGroupName, streamName, this.instanceId); | ||
} | ||
for (const id of unprocessedMessageIds.messageIds) { | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Reprocessing unprocessed message with id: ${id}`); | ||
// Process messages with rate limiting | ||
const processWithDelay = async (id, index) => { | ||
await new Promise((resolve) => setTimeout(resolve, index * 20)); | ||
await processMessage(redisClient, id, new tracker_1.MetricsTracker(), multicast, true); | ||
} | ||
}; | ||
unprocessedMessageIds.messageIds.map((id, index) => processWithDelay(id, index)); | ||
} | ||
@@ -541,34 +587,23 @@ } | ||
const streamName = `${eventName}:${this.consumerGroupName}`; | ||
const cleanupThreshold = Date.now() - interval; | ||
const CHUNK_SIZE = 10000; // Process messages in chunks to avoid memory issues | ||
const lastAckKey = `last_ack:${streamName}`; | ||
const oneHourAgo = Date.now() - interval; | ||
try { | ||
// Get pending info for this consumer group once | ||
const pendingInfo = await this.redisGroups.xpending(streamName, this.consumerGroupName); | ||
const hasPendingMessages = pendingInfo && pendingInfo[0] > 0; | ||
let lastId = '-'; // Start from the beginning of the stream | ||
while (true) { | ||
// Get a chunk of messages | ||
const messages = await this.redisGroups.xrange(streamName, lastId, cleanupThreshold.toString(), 'COUNT', CHUNK_SIZE); | ||
if (!messages || messages.length === 0) | ||
break; | ||
// Update lastId for next iteration (exclusive) | ||
lastId = messages[messages.length - 1][0]; | ||
if (hasPendingMessages) { | ||
// If there are pending messages, we need to check each message | ||
const pendingDetails = (await this.redisGroups.xpending(streamName, this.consumerGroupName, lastId, '+', CHUNK_SIZE, this.instanceId)); | ||
const pendingIds = new Set(pendingDetails.map((detail) => detail[0])); | ||
const acknowledgedIds = messages.map((msg) => msg[0]).filter((id) => !pendingIds.has(id)); | ||
if (acknowledgedIds.length > 0) { | ||
await this.redisGroups.xdel(streamName, ...acknowledgedIds); | ||
} | ||
} | ||
else { | ||
// If no pending messages, we can safely delete all messages in this chunk | ||
const messageIds = messages.map((msg) => msg[0]); | ||
await this.redisGroups.xdel(streamName, ...messageIds); | ||
} | ||
// If we got less than CHUNK_SIZE messages, we've reached the end | ||
if (messages.length < CHUNK_SIZE) | ||
break; | ||
// Get consumer group info to check if consumers are active | ||
const groupInfo = (await this.redisGroups.xinfo('GROUPS', streamName)); | ||
// If no active consumers, leave stream as is | ||
if (!groupInfo || !groupInfo.some((group) => group.consumers > 0)) { | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: No active consumers for ${streamName}, leaving stream as is`); | ||
return; | ||
} | ||
// Get last acknowledged message ID | ||
const lastAckId = await this.redisGroups.get(lastAckKey); | ||
if (!lastAckId) { | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: No acknowledged messages for ${streamName}`); | ||
return; | ||
} | ||
// Extract timestamp from message ID | ||
const [timestamp] = lastAckId.split('-').map(Number); | ||
const cleanupThreshold = Math.min(timestamp, oneHourAgo); | ||
await this.redisGroups.xtrim(streamName, 'MINID', cleanupThreshold); | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Cleaned up messages before last acknowledged message ${timestamp} from ${streamName}`); | ||
} | ||
@@ -689,5 +724,17 @@ catch (error) { | ||
} | ||
/** | ||
* Acknowledges a message and updates the last acknowledged message ID. | ||
* This is used to track cleanup progress and ensure we don't delete unprocessed messages. | ||
*/ | ||
async acknowledgeMessage(ackKey) { | ||
const { streamName, messageId } = this.demergeMessageKey(ackKey); | ||
await this.redisGroups.xack(streamName, this.consumerGroupName, messageId); | ||
const lastAckKey = `last_ack:${streamName}`; | ||
try { | ||
// Update last acknowledged ID and acknowledge message atomically | ||
await Promise.all([this.redisGroups.xack(streamName, this.consumerGroupName, messageId), this.redisGroups.set(lastAckKey, messageId)]); | ||
} | ||
catch (error) { | ||
logger_1.PUBLISHER_LOGGER.error(`Error acknowledging message ${messageId} for ${streamName}:`, error); | ||
throw error; | ||
} | ||
} | ||
@@ -694,0 +741,0 @@ frameMessageKey(streamName, messageId) { |
118574
1.87%2463
2.11%+ Added
+ Added
- Removed
- Removed
- Removed
Updated