🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more →

@jetit/publisher

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@jetit/publisher - npm Package Compare versions

Comparing version

to
4.0.0

{
"name": "@jetit/publisher",
"version": "3.3.5",
"version": "4.0.0",
"type": "commonjs",

@@ -5,0 +5,0 @@ "dependencies": {

@@ -46,6 +46,7 @@ import { Observable } from 'rxjs';

*/
publish<TData = unknown, TName extends string = string>(data: PublishData<TData, TName>, multicast?: boolean): Promise<void>;
publish<TData = unknown, TName extends string = string>(data: PublishData<TData, TName>, multicast?: boolean): Promise<string>;
/**
* Schedules an event to be published at a specified future time. Thee event gets published if the
* differnece between the current time and the scheduled time is less than 500ms.
* differnece between the current time and the scheduled time is less than 500ms. The granularity
* of scheduled publish is 5 seconds. So it doesnt make sense to run anything less than the 5 secs time
*

@@ -124,2 +125,12 @@ * @param scheduledTime - The Date object representing the future time when the event should be published.

private cleanupAcknowledgedMessages;
/**
* This function should be added to Surf Signal to publish periodic diagnostic information
* on the health of the stream
*/
getUnacknowledgedMessagesForStream(eventName: string): Promise<{
count: number;
countOnThisConsumer?: number | undefined;
messageIds: string[];
messages?: unknown[] | undefined;
}>;
}

@@ -91,2 +91,3 @@ "use strict";

logger_1.PERFORMANCE_LOGGER.log(`PTIME;${key};${data.eventName};${Date.now()};${elapsedTime}`);
return key;
}

@@ -179,3 +180,10 @@ async scheduledPublish(scheduledTime, eventData, uniquePerInstance = false, repeatInterval = 0, multicast = false) {

const bs = new rxjs_1.BehaviorSubject(null);
const observable = bs.asObservable().pipe((0, rxjs_1.skip)(1));
const timer = (0, rxjs_1.interval)(10000).subscribe(async () => {
/** Clear earlier unprocessed messages. Runs every 10 seconds */
await processMessage(this.redisGroups, '0', false);
});
const observable = bs.asObservable().pipe((0, rxjs_1.skip)(1), (0, rxjs_1.finalize)(() => {
/** Cleanup timer */
timer.unsubscribe();
}));
const streamName = `${eventName}:${this.consumerGroupName}`;

@@ -186,3 +194,6 @@ const processMessage = async (redisClient, messageId, multicast = false, processPending = false) => {

try {
const pendingDetails = await redisClient.xpending(streamName, this.consumerGroupName, messageId, messageId, 1, this.instanceId);
/**
* Check if the message is already acquired by another client and is pending.
*/
const pendingDetails = await redisClient.xpending(streamName, this.consumerGroupName, messageId, messageId, 1);
if (pendingDetails[2] === 0 && multicast === false) {

@@ -198,5 +209,22 @@ logger_1.PUBLISHER_LOGGER.warn(`PUBLISHER: MACK ${messageId} for ${streamName}`);

}
const messages = await redisClient.xrange(streamName, messageId, messageId);
if (messages && messages.length) {
const eventData = JSON.parse(messages[0][1][1]);
let eventData;
/**
* Both multicast messages and pending messages cannot be read by xreadgroup
* Multicast messages should not be claimed by a single consumer. And pending messages
* are usually behind in the stream so XREADGROUP will not read them and hence
* they need to be read using XRANGE.
*/
if (multicast === true || processPending) {
const messages = await redisClient.xrange(streamName, messageId, messageId);
if (messages && messages.length) {
eventData = JSON.parse(messages[0][1][1]);
}
}
else {
const messages = (await redisClient.xreadgroup('GROUP', this.consumerGroupName, this.instanceId, 'COUNT', 1, 'STREAMS', streamName, '>'));
if (messages && messages.length) {
eventData = JSON.parse(messages[0][1][0][1][1]);
}
}
if (eventData) {
bs.next(eventData);

@@ -207,8 +235,8 @@ await redisClient.xack(streamName, this.consumerGroupName, messageId);

else {
logger_1.PUBLISHER_LOGGER.warn(`PUBLISHER: Message ${messageId} not found for ${streamName}`);
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Message ${messageId} not found for ${streamName}`);
}
/** Process Unprocessed Message if this is a main tree, otherwise limit to processing 100 messages that are unacknowledged */
if (!processPending) {
const unprocessedMessageIds = await (0, utils_1.getUnacknowledgedMessages)(redisClient, this.consumerGroupName, streamName, 25);
if (unprocessedMessageIds.count > 25) {
const unprocessedMessageIds = await (0, utils_1.getUnacknowledgedMessages)(redisClient, this.consumerGroupName, streamName, this.instanceId);
if (unprocessedMessageIds.countOnThisConsumer && unprocessedMessageIds.countOnThisConsumer > 25) {
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: Too many unprocessed events for ${streamName}: count: ${unprocessedMessageIds.count}`);

@@ -297,3 +325,12 @@ }

}
/**
* This function should be added to Surf Signal to publish periodic diagnostic information
* on the health of the stream
*/
async getUnacknowledgedMessagesForStream(eventName) {
const streamName = `${eventName}:${this.consumerGroupName}`;
const unprocessedMessageIds = await (0, utils_1.getUnacknowledgedMessages)(this.redisGroups, this.consumerGroupName, streamName, this.instanceId);
return unprocessedMessageIds;
}
}
exports.Streams = Streams;
import { RedisType } from './registry';
import { EventData } from './types';
export declare function getAllConsumerGroups(eventName: string, redisConnection: RedisType): Promise<string[]>;
export declare function getUnacknowledgedMessages(redisClient: RedisType, consumerGroupName: string, streamName: string, count?: number): Promise<{
export declare function getUnacknowledgedMessages(redisClient: RedisType, consumerGroupName: string, streamName: string, consumerName: string, count?: number): Promise<{
count: number;
countOnThisConsumer?: number;
messageIds: string[];

@@ -7,0 +8,0 @@ messages?: unknown[];

@@ -10,7 +10,7 @@ "use strict";

exports.getAllConsumerGroups = getAllConsumerGroups;
async function getUnacknowledgedMessages(redisClient, consumerGroupName, streamName, count = 500) {
async function getUnacknowledgedMessages(redisClient, consumerGroupName, streamName, consumerName, count = 500) {
try {
// Get pending messages summary
const summary = await redisClient.xpending(streamName, consumerGroupName);
if (!summary || summary[1] === 0) {
if (!summary || summary[0] === 0) {
// If count is zero

@@ -20,7 +20,13 @@ return { count: 0, messageIds: [] };

// Use the smallest and largest IDs to get a detailed range
const pendingMessageCount = summary[1];
const pendingMessageCount = summary[0];
// Get detailed information in the range
const pendingMessages = (await redisClient.xpending(streamName, consumerGroupName, '-', '+', count));
let pendingMessages = (await redisClient.xpending(streamName, consumerGroupName, '-', '+', count, consumerName));
/** If no pending messages on consumer, fetch messages from other consumers that haven't been claimed for more than 10s */
if (count > pendingMessages.length && pendingMessages.length === 0) {
await redisClient.xautoclaim(streamName, consumerGroupName, consumerName, 10000, '0-0', 'COUNT', 100);
pendingMessages = (await redisClient.xpending(streamName, consumerGroupName, '-', '+', count, consumerName));
}
return {
count: pendingMessageCount,
countOnThisConsumer: pendingMessages.length,
messageIds: pendingMessages.map((message) => message[0]),

@@ -27,0 +33,0 @@ messages: pendingMessages,