@jetit/publisher
Advanced tools
Comparing version
{ | ||
"name": "@jetit/publisher", | ||
"version": "3.3.5", | ||
"version": "4.0.0", | ||
"type": "commonjs", | ||
@@ -5,0 +5,0 @@ "dependencies": { |
@@ -46,6 +46,7 @@ import { Observable } from 'rxjs'; | ||
*/ | ||
publish<TData = unknown, TName extends string = string>(data: PublishData<TData, TName>, multicast?: boolean): Promise<void>; | ||
publish<TData = unknown, TName extends string = string>(data: PublishData<TData, TName>, multicast?: boolean): Promise<string>; | ||
/** | ||
* Schedules an event to be published at a specified future time. Thee event gets published if the | ||
* differnece between the current time and the scheduled time is less than 500ms. | ||
* differnece between the current time and the scheduled time is less than 500ms. The granularity | ||
* of scheduled publish is 5 seconds. So it doesnt make sense to run anything less than the 5 secs time | ||
* | ||
@@ -124,2 +125,12 @@ * @param scheduledTime - The Date object representing the future time when the event should be published. | ||
private cleanupAcknowledgedMessages; | ||
/** | ||
* This function should be added to Surf Signal to publish periodic diagnostic information | ||
* on the health of the stream | ||
*/ | ||
getUnacknowledgedMessagesForStream(eventName: string): Promise<{ | ||
count: number; | ||
countOnThisConsumer?: number | undefined; | ||
messageIds: string[]; | ||
messages?: unknown[] | undefined; | ||
}>; | ||
} |
@@ -91,2 +91,3 @@ "use strict"; | ||
logger_1.PERFORMANCE_LOGGER.log(`PTIME;${key};${data.eventName};${Date.now()};${elapsedTime}`); | ||
return key; | ||
} | ||
@@ -179,3 +180,10 @@ async scheduledPublish(scheduledTime, eventData, uniquePerInstance = false, repeatInterval = 0, multicast = false) { | ||
const bs = new rxjs_1.BehaviorSubject(null); | ||
const observable = bs.asObservable().pipe((0, rxjs_1.skip)(1)); | ||
const timer = (0, rxjs_1.interval)(10000).subscribe(async () => { | ||
/** Clear earlier unprocessed messages. Runs every 10 seconds */ | ||
await processMessage(this.redisGroups, '0', false); | ||
}); | ||
const observable = bs.asObservable().pipe((0, rxjs_1.skip)(1), (0, rxjs_1.finalize)(() => { | ||
/** Cleanup timer */ | ||
timer.unsubscribe(); | ||
})); | ||
const streamName = `${eventName}:${this.consumerGroupName}`; | ||
@@ -186,3 +194,6 @@ const processMessage = async (redisClient, messageId, multicast = false, processPending = false) => { | ||
try { | ||
const pendingDetails = await redisClient.xpending(streamName, this.consumerGroupName, messageId, messageId, 1, this.instanceId); | ||
/** | ||
* Check if the message is already acquired by another client and is pending. | ||
*/ | ||
const pendingDetails = await redisClient.xpending(streamName, this.consumerGroupName, messageId, messageId, 1); | ||
if (pendingDetails[2] === 0 && multicast === false) { | ||
@@ -198,5 +209,22 @@ logger_1.PUBLISHER_LOGGER.warn(`PUBLISHER: MACK ${messageId} for ${streamName}`); | ||
} | ||
const messages = await redisClient.xrange(streamName, messageId, messageId); | ||
if (messages && messages.length) { | ||
const eventData = JSON.parse(messages[0][1][1]); | ||
let eventData; | ||
/** | ||
* Both multicast messages and pending messages cannot be read by xreadgroup | ||
* Multicast messages should not be claimed by a single consumer. And pending messages | ||
* are usually behind in the stream so XREADGROUP will not read them and hence | ||
* they need to be read using XRANGE. | ||
*/ | ||
if (multicast === true || processPending) { | ||
const messages = await redisClient.xrange(streamName, messageId, messageId); | ||
if (messages && messages.length) { | ||
eventData = JSON.parse(messages[0][1][1]); | ||
} | ||
} | ||
else { | ||
const messages = (await redisClient.xreadgroup('GROUP', this.consumerGroupName, this.instanceId, 'COUNT', 1, 'STREAMS', streamName, '>')); | ||
if (messages && messages.length) { | ||
eventData = JSON.parse(messages[0][1][0][1][1]); | ||
} | ||
} | ||
if (eventData) { | ||
bs.next(eventData); | ||
@@ -207,8 +235,8 @@ await redisClient.xack(streamName, this.consumerGroupName, messageId); | ||
else { | ||
logger_1.PUBLISHER_LOGGER.warn(`PUBLISHER: Message ${messageId} not found for ${streamName}`); | ||
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Message ${messageId} not found for ${streamName}`); | ||
} | ||
/** Process Unprocessed Message if this is a main tree, otherwise limit to processing 100 messages that are unacknowledged */ | ||
if (!processPending) { | ||
const unprocessedMessageIds = await (0, utils_1.getUnacknowledgedMessages)(redisClient, this.consumerGroupName, streamName, 25); | ||
if (unprocessedMessageIds.count > 25) { | ||
const unprocessedMessageIds = await (0, utils_1.getUnacknowledgedMessages)(redisClient, this.consumerGroupName, streamName, this.instanceId); | ||
if (unprocessedMessageIds.countOnThisConsumer && unprocessedMessageIds.countOnThisConsumer > 25) { | ||
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Too many unprocessed events for ${streamName}: count: ${unprocessedMessageIds.count}`); | ||
@@ -297,3 +325,12 @@ } | ||
} | ||
/** | ||
* This function should be added to Surf Signal to publish periodic diagnostic information | ||
* on the health of the stream | ||
*/ | ||
async getUnacknowledgedMessagesForStream(eventName) { | ||
const streamName = `${eventName}:${this.consumerGroupName}`; | ||
const unprocessedMessageIds = await (0, utils_1.getUnacknowledgedMessages)(this.redisGroups, this.consumerGroupName, streamName, this.instanceId); | ||
return unprocessedMessageIds; | ||
} | ||
} | ||
exports.Streams = Streams; |
import { RedisType } from './registry'; | ||
import { EventData } from './types'; | ||
export declare function getAllConsumerGroups(eventName: string, redisConnection: RedisType): Promise<string[]>; | ||
export declare function getUnacknowledgedMessages(redisClient: RedisType, consumerGroupName: string, streamName: string, count?: number): Promise<{ | ||
export declare function getUnacknowledgedMessages(redisClient: RedisType, consumerGroupName: string, streamName: string, consumerName: string, count?: number): Promise<{ | ||
count: number; | ||
countOnThisConsumer?: number; | ||
messageIds: string[]; | ||
@@ -7,0 +8,0 @@ messages?: unknown[]; |
@@ -10,7 +10,7 @@ "use strict"; | ||
exports.getAllConsumerGroups = getAllConsumerGroups; | ||
async function getUnacknowledgedMessages(redisClient, consumerGroupName, streamName, count = 500) { | ||
async function getUnacknowledgedMessages(redisClient, consumerGroupName, streamName, consumerName, count = 500) { | ||
try { | ||
// Get pending messages summary | ||
const summary = await redisClient.xpending(streamName, consumerGroupName); | ||
if (!summary || summary[1] === 0) { | ||
if (!summary || summary[0] === 0) { | ||
// If count is zero | ||
@@ -20,7 +20,13 @@ return { count: 0, messageIds: [] }; | ||
// Use the smallest and largest IDs to get a detailed range | ||
const pendingMessageCount = summary[1]; | ||
const pendingMessageCount = summary[0]; | ||
// Get detailed information in the range | ||
const pendingMessages = (await redisClient.xpending(streamName, consumerGroupName, '-', '+', count)); | ||
let pendingMessages = (await redisClient.xpending(streamName, consumerGroupName, '-', '+', count, consumerName)); | ||
/** If no pending messages on consumer, fetch messages from other consumers that haven't been claimed for more than 10s */ | ||
if (count > pendingMessages.length && pendingMessages.length === 0) { | ||
await redisClient.xautoclaim(streamName, consumerGroupName, consumerName, 10000, '0-0', 'COUNT', 100); | ||
pendingMessages = (await redisClient.xpending(streamName, consumerGroupName, '-', '+', count, consumerName)); | ||
} | ||
return { | ||
count: pendingMessageCount, | ||
countOnThisConsumer: pendingMessages.length, | ||
messageIds: pendingMessages.map((message) => message[0]), | ||
@@ -27,0 +33,0 @@ messages: pendingMessages, |
50573
6.35%874
6.72%